I like the simplicity of the Parallel.For and Parallel.ForEach extension methods in the TPL. I was wondering if there was a way to take advantage of something similar or even with the slightly more advanced Tasks.
Below is a typical usage for the SqlDataReader, and I was wondering if it was possible and if so how to replace the while loop below with something in the TPL. Because the reader can't provide a fixed number of iterations the For extension method is not possible which leaves dealing with Tasks I would gather. I was hoping someone may have tackled this already and worked out some do's and don''s with ADO.net.
using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();
    SqlDataReader reader = comm.ExecuteReader();
    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}
The Task Parallel Library (TPL) is a set of public types and APIs in the System. Threading and System. Threading. Tasks namespaces. The purpose of the TPL is to make developers more productive by simplifying the process of adding parallelism and concurrency to applications.
The SqlDataReader is used to read a row of record at a time which is got using SqlCommand. It is read only, which means we can only read the record; it can not be edited. And also it is forward only, which means you can not go back to a previous row (record).
Compared to the classic threading model in . NET, Task Parallel Library minimizes the complexity of using threads and provides an abstraction through a set of APIs that help developers focus more on the application program instead of focusing on how the threads will be provisioned.
ADO.NET SqlDataReader Class. This class is used to read data from SQL Server database. It reads data in forward-only stream of rows from a SQL Server database. it is sealed class so that cannot be inherited. It inherits DbDataReader class and implements IDisposable interface.
You're going to have difficulty replacing that while loop directly. SqlDataReader is not a thread safe class, so you cannot use it directly from multiple threads.
That being said, you could potentially process the data you read using the TPL.  There are a few options, here.  The easiest might be to make your own IEnumerable<T> implementation that works on the reader, and returns a class or struct containing your data.  You could then use PLINQ or a Parallel.ForEach statement to process your data in parallel:
public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();
        SqlDataReader reader = comm.ExecuteReader();
        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}
Once you have that method, you can process this directly, via PLINQ or TPL:
Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});
Or:
this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});
You're almost there. Wrap the code you posted in a function with this signature:
IEnumerable<IDataRecord> MyQuery()
and then replace your // Do something with Reader code with this:
yield return reader;
Now you have something that works in a single thread. Unfortunately, as you read through the query results it's return a reference to the same object each time, and the object just mutates itself for each iteration. This means that if you try to run it in parallel you'll get some really odd results as parallel reads mutate the object used in different threads. You need code to take a copy of the record to send to your parallel loop.
At this point, though, what I like to do is skip the extra copy of the record and go straight to a strongly-typed class. More than that, I like to use a generic method to do it:
IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);
        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}
Assuming your factory methods create a copy as expected, this code should be safe to use in a Parallel.ForEach loop. Calling the method would look something like this (assuming a an Employee class with a static factory method named "Create"):
var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());
Important Update:
I'm not as confident in this code as I once was. A separate thread could still mutate the reader while another thread is in the process of making it's copy. I could put a lock around that, but I'm also concerned that another thread could call update the reader after the original has itself called Read() but before it begins to make the copy. Therefore, the critical section here consists of the entire while loop... and at this point, you're back to single-threaded again. I expect there is a way to modify this code to work as expected for multi-threaded scenarios, but it will need more study.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With