Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Long-running MSMQ message processing. Limiting worker threads count

The environment

  • The host application is a Windows Service (.NET 3.5);
  • There's no possibility to switch to .NET 4+ framework yet.

The process

  • Processing one message might take up to 5 minutes.
  • Messages are received using an async pattern (ReceiveCompleted event subscription):

    private void OnMessageReceived(object source, ReceiveCompletedEventArgs eventArgs) {
    
        var queue = (MessageQueue)source;
        Message m = null;
    
        try {                
            var processor = new Thread(() => {
                try {
                    // 1. process the message
                    // 2. send a feedback to m's response queue
                }
                catch(Exception ex) {
                    Logger.Log.Error(ex);
                    // 1. send a feedback to m's response queue
                } 
            };
            processor.Start();
        }
        catch(Exception ex) {
            Logger.Log.Error(ex);
        }
    
        queue.BeginReceive();
    }
    

The problem

Spawning separate worker threads should be followed by some limiting I guess.

Let's say I want to process messages using 1-5 worker threads (the maximum), if all available workers are busy, then:

  • the processing for the current message is omitted and feedback is sent to the response queue (message is lost);
  • message is sent back to the queue, feedback is sent (processing is postponed);

Does the part if all available workers are busy mean that I must implement something similar to a thread pool?

I've added system.reactive to the question. This is due to some code I've seen where Rx Buffering is used. I don't quite understand if this might be applied in my case. Can I utilize Rx in my case?

like image 387
lexeme Avatar asked Jan 17 '26 22:01

lexeme


1 Answers

If your service is hosted by IIS, IIS might prevent this, but you could try using ThreadPool and particularly using its SetMaxThreads method. Note that you can't set them lower than the current number of processors though, the limit is application wide (the remarks note that this may have adverse implications for libraries your application is using), and depending on what's hosing your service the host might prevent you from setting the threading this way.

You could try to run your own with this, implementing a private static Queue<Thread> threadPool;, statically initialize it with your desired number of threads, and do things like:

var processor = threadPool.Dequeue();
...
// there's no easy thread-safe way to check this in .net 3.5 - 
// if you can use concurrent queue you could do this after checking if 
// there's anything available in the queue with a ConcurrentQueue<Thread>
//queue.BeginReceive(); // call this from the end of your thread execution now

Your threads could Enqueue themselves back in the threadPool queue when they're done processing, and call BeginReceive() on the MSMQ queue object so that the process could continue (pass the queue to the thread as documented here https://msdn.microsoft.com/en-us/library/ts553s52(v=vs.85).aspx).

And if you're interested in how you could implement a ConcurrentQueue in .NET 3.5, you could always take a peek at the Reference Source - but personally I'd rather bite the bullet on upgrading to the latest .NET version.

like image 76
Dan Field Avatar answered Jan 19 '26 16:01

Dan Field



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!