Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What type of IProducerConsumerCollection<T> to use for my task?

Tags:

c#

I have exactly 100 Sensors each "measuring" own data. I have exactly one DataSender which should send information from "sensors". The most recent information should be sent.

Bandwidth of the channel may be less than data produced by 100 sensors. In this case some data can be skipped - but we should be "roughly fair". For example, we could skip every second measurement from each sensor.

I don't know how often each sensor generates data, but in general they generate data pretty often.

After my other posts:

  • how to create singleton which always running in separate thread?
  • Modified Producer/Consumer example, any problems with it?

I have decided that I have classical Producer/Consumer problem, with:

  • 100 Producers, and
  • 1 Consumer

I've been suggested to use BlockingCollection for this. The only problem with BlockingCollection - once you have added item, you cannot replace it. But in my application, if sensor produces a new value, and previous value was not processed by the Consumer, the value should be replaced.

Should I use use a ConcurentDictionary or ConcurentBag for that task?

Conceptually, all I need is an array of 100 elements.

Sensor #33 should replace it's value into array[33]:

| Sensor | Value |
|--------|-------|
|      1 |       |
|      2 |       |
|      3 |       |
/......../......./
|     32 |       |
|     33 | 101.9 |
|     34 |       |
/......../......./
|     98 |       |
|     99 |       |
|    100 |       |

Consumer should take value from array[33] and if not null, then send it and set array[33] to null. Consumer should react on any not null values in array asap.

like image 245
Oleg Vazhnev Avatar asked Jan 27 '26 16:01

Oleg Vazhnev


1 Answers

I think you should implement your own IProducerConsumerCollection<T>. That's why it's an interface: so that you could easily make your own.

You could do it using Dictionary<K,V> and Queue<T> to make sure receiving the data is fair, i.e. if you have just one device that produces data very fast, you won't send data just from this one.

public class DeviceDataQueue<TDevice, TData>
    : IProducerConsumerCollection<Tuple<TDevice, TData>>
{
    private readonly object m_lockObject = new object();
    private readonly Dictionary<TDevice, TData> m_data
        = new Dictionary<TDevice, TData>();
    private readonly Queue<TDevice> m_queue = new Queue<TDevice>();

    //some obviously implemented methods elided, just make sure they are thread-safe

    public int Count { get { return m_queue.Count; } }

    public object SyncRoot { get { return m_lockObject; } }

    public bool IsSynchronized { get { return true; } }

    public bool TryAdd(Tuple<TDevice, TData> item)
    {
        var device = item.Item1;
        var data = item.Item2;

        lock (m_lockObject)
        {
            if (!m_data.ContainsKey(device))
                m_queue.Enqueue(device);

            m_data[device] = data;
        }

        return true;
    }

    public bool TryTake(out Tuple<TDevice, TData> item)
    {
        lock (m_lockObject)
        {
            if (m_queue.Count == 0)
            {
                item = null;
                return false;
            }

            var device = m_queue.Dequeue();
            var data = m_data[device];
            m_data.Remove(device);
            item = Tuple.Create(device, data);
            return true;
        }
    }
}

When used along these lines:

Queue = new BlockingCollection<Tuple<IDevice, Data>>(
    new DeviceDataQueue<IDevice, Data>());

Device1 = new Device(1, TimeSpan.FromSeconds(3), Queue);
Device2 = new Device(2, TimeSpan.FromSeconds(5), Queue);

while (true)
{
    var tuple = Queue.Take();
    var device = tuple.Item1;
    var data = tuple.Item2;

    Console.WriteLine("{0}: Device {1} produced data at {2}.",
        DateTime.Now, device.Id, data.Created);

    Thread.Sleep(TimeSpan.FromSeconds(2));
}

it produces the following output:

30.4.2011 20:40:43: Device 1 produced data at 30.4.2011 20:40:43.
30.4.2011 20:40:45: Device 2 produced data at 30.4.2011 20:40:44.
30.4.2011 20:40:47: Device 1 produced data at 30.4.2011 20:40:47.
30.4.2011 20:40:49: Device 2 produced data at 30.4.2011 20:40:49.
30.4.2011 20:40:51: Device 1 produced data at 30.4.2011 20:40:51.
30.4.2011 20:40:54: Device 2 produced data at 30.4.2011 20:40:54.
like image 148
svick Avatar answered Jan 30 '26 04:01

svick



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!