Im using amqp.node library to integrate rabbitmq into my system.
But in consumer i want to process just one message at the time, then acknowledge the message then consume the next message from the queue.
The current code is:
// Consumer
open.then(function(conn) {
  var ok = conn.createChannel();
  ok = ok.then(function(ch) {
    ch.assertQueue(q);
    ch.consume(q, function(msg) {
      if (msg !== null) {
        othermodule.processMessage(msg, function(error, response){
          console.log(msg.content.toString());
          ch.ack(msg);
        });
      }
    });
  });
  return ok;
}).then(null, console.warn);
The ch.consume will process all messages in the channel at one time and the function of the module call it here othermodule will not be executed in the same time line.
I want to wait for the othermodule function to finish before consume the next message in the queue.
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.
At this moment (2018), I think RabbitMQ team has an option to do that:
https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html
ch.prefetch(1);
In order to defeat that we can use the prefetch method with the value of 1. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
Follow up the example here :
https://www.npmjs.com/package/amqplib
// Consumer
function consumer(conn) {
  var ok = conn.createChannel(on_open);
  function on_open(err, ch) {
    if (err != null) bail(err);
    ch.assertQueue(q);
    
     // IMPORTANT
    ch.prefetch(1);
    ch.consume(q, function(msg) {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch.ack(msg);
      }
    });
  }
}
Refs: http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch
You need to set a prefetch value as shown in this example:
https://github.com/squaremo/amqp.node/blob/master/examples/tutorials/rpc_server.js#L22
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With