Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Correct Go/RabbitMQ way to "pop" one message off the queue?

The first question I have is a design question really. This is my first time writing a service that uses a Queue and I am also new to Go. I am trying to determine whether I should write my worker in such a way that it simply pops one message off the queue, processes it, and the dies off. With things like Kubernetes this seems fairly trivial.

Or should I have a long-lived worker constantly waiting for new messages but that is relaunched if it dies (by a bug or accident)?

The reason I ask this question is that in order to implement the former it feels a little more "hacked up" because I have to write the following using the common go AMQP library from streadway/amqp (read the comments):

// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
    msgs, err := v.Channel.Consume(
        v.QueueName, // queue
        v.ConsmerID, // consumer
        true,        // auto-ack
        false,       // exclusive
        false,       // no-local
        false,       // no-wait
        nil,         // args
    )
    if err != nil {
        return nil, err
    }

    // We have to use for .. range because Consume returns
    // "<-chan Delivery" but if we only want ONE message popped off
    // we return on the first one
    for data := range msgs {
        return data.Body, nil
    }

    // We should never get this far...
    return nil, errors.New("Something went wrong")
}

Furthermore what is <-chan Delivery in this case? It seems like some sort of "stream" or object that you can plug into. Is there a way to not have to write a for-loop for these data types?

EDIT: I have also discovered that it appears that this code will dequeue the ENTIRE queue even though it only does a for-loop iteration once (as show in the code above). I am not sure why this happens either?

Relevant links to code:

  • https://github.com/streadway/amqp/blob/master/channel.go#L1049
  • https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/worker.go
  • https://github.com/fatih/amqp-examples/blob/master/consumer.go#L132
like image 692
the_real_one Avatar asked Oct 23 '25 00:10

the_real_one


1 Answers

To simply take a single object from a <-chan Delivery, don't use a range loop, but the channel operator <-:

data := <- msgs
return data.Body, nil

As to why your entire queue is emptied as soon as you fetch one message: This is most likely due to the Consumer prefetch. When consuming messages, the client will actually not pop them from the broker one-by-one, but in batches of configurable size (if I recall correctly, around the order of 32 or 64 messages by default). As soon as the broker has published this batch of messages to your consumer, they'll be in your msgs channel; and if you don't read from that channel any more after getting the first message, the rest of them will be gone (at least, with auto-ack enabled -- otherwise, they'll be requeued after the channel is closed).

To only fetch one message at a time, use the channel's QoS function (with the first parameter being the prefetch count):

err := v.Channel.Qos(1, 0, false)
like image 133
helmbert Avatar answered Oct 25 '25 20:10

helmbert



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!