Ok, playing around with the .Net 4.0 Parellel Extensions in System.Threading.Tasks. I'm finding what seems like weird behaivor, but I assume I'm jsut doing something wrong. I have an interface and a couple implementing clases, they're simple for this.
interface IParallelPipe
{
void Process(ref BlockingCollection<Stream> stream, long stageId);
}
class A:IParallelPipe
{
public void Process(ref BlockingCollection<Stream> stream, long stageId)
{
//do stuff
}
}
class B:IParallelPipe
{
public void Process(ref BlockingCollection<Stream> stream, long stageId)
{
//do stuff
}
}
I then have my class that starts things off on these. This is where the problem arises. I essentially get information about what implementing class to invoke from a type passed in and then call a factory to instantiate it and then I create a task with it and start it up. Shown here:
BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
foreach (Stage s in pipeline.Stages)
{
IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
Task.Factory.StartNew(() => p.Process(ref bcs, s.id));
}
In each run of this in my sample, pipeline.Stages contains two elements, one that gets instantiated as class A and the other as class B. This is fine, I see it in te debugger as p coming away with the two different types. However, class B never gets called, instead I get two invocations of the A.Process(...) method. Both contain the stageId for the that was passed in (ie. the two invocations have different stageIds).
Now, if I take and separate things out a bit, just for testing I can get things to work by doing something like this:
BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
A a = null;
B b = null;
foreach (Stage s in pipeline.Stages)
{
IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
if(p is A)
a = p;
else
b = p;
}
Task.Factory.StartNew(() => a.Process(ref bcs, idThatINeed));
Task.Factory.StartNew(() => b.Process(ref bcs, idThatINeed));
This invokes the appropriate class!
Any thoughts???
The behaviour you're describing seems odd to me - I'd expect the right instances to be used, but potentially with the wrong stage ID - the old foreach variable capture problem. The variable s is being captured, and by the time the task factory evaluates the closure, the value of s has changed.
This is definitely a problem in your code, but it doesn't explain why you're seeing a problem. Just to check, you really are declaring p within the loop, and not outside it? If you were declaring p outside the loop, that would explain everything.
Here's the fix for the capture problem though:
BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
foreach (Stage s in pipeline.Stages)
{
Stage copy = s;
IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
Task.Factory.StartNew(() => p.Process(ref bcs, copy.id));
}
Note that we're just taking a copy inside the loop, and capturing that copy, to get a different "instance" of the variable each time.
Alternatively, instead of capturing the stage, we could just capture the ID as that's all we need:
BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
foreach (Stage s in pipeline.Stages)
{
long id = s.id;
IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
Task.Factory.StartNew(() => p.Process(ref bcs, id));
}
If that doesn't help, could you post a short but complete program which demonstrates the problem? That would make it a lot easier to track down.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With