Given the numerous new ways of performing asynchronous operations in C#, TPL, Parallel Extensions, Async CTP, Reactive Extensions I was wonder what the simplest way to parallelize the fetching and processing portions of the following would be:
foreach(string url in urls)
{
var file = FetchFile(url);
ProcessFile(file);
}
The proviso is that whilst files can be fetched at anytime ProcessFile can only handle one file at a time and should be called sequentially.
In short what is the simplest way to get FetchFile and ProcessFile to behave in a pipelined way i.e. happen concurrently?
Here's RX way. This extension will transform a steam of uri's into a stream of streams:
public static IObservable<Stream> RequestToStream(this IObservable<string> source,
TimeSpan timeout)
{
return
from wc in source.Select(WebRequest.Create)
from s in Observable
.FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
wc.EndGetResponse)()
.Timeout(timeout, Observable.Empty<WebResponse>())
.Catch(Observable.Empty<WebResponse>())
select s.GetResponseStream();
}
Usage:
new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Do(stream = > ProcessStream(stream))
.Subscribe();
Edit: oops, haven't noticed the file write serialization requirement. This part can be done by employing .Concat which is essentially an RX queue (another one is .Zip)
Let's have a .StreamToFile extension:
public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
{
return Observable.Defer(() =>
source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
}
now you can have web requests parallel but serialize file writing that comes from them:
new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
.Select(x => x.StreamToFile())
.Concat()
.Subscribe();
Given the constraint on ProcessFile I would say you should fetch the data asynchronously using TPL and then enqueue a token which references the preloaded data. You can then have a background thread that pulls items off the queue and hands them to the ProcessFile one by one. This is a producer/consumer pattern.
For the queue you can take a look at BlockingCollection which can provide a threadsafe queue which also has the nice effect of being able to throttle the workload.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With