I have two services, Manager and Collector.
COLLECTED_USER with routingKey user.collected and invokes a UserCollected handler.COLLECT_USER with routingKey user.collect and invokes a CollectUser handler.There can be multiple collectors so I have set exclusive to false (see below for code).
There are also other services that listen for events like
user.created,user.updated,user.deletedIn addition there are services that listen for more general events like
#.createduser.#and so on.
So I am using a topic exchange.
| exchange | type | routingKey | queueName |
| -------- | ----- | -------------- | ------------- |
| MY_APP | topic | user.collect | COLLECT_USER |
| MY_APP | topic | user.collected | COLLECTED_USER |
user.collectuser.collect message and invokes a CollectUser handlerCollectUser handler does work, then publishes a message with routingKey user.collecteduser.collected message and invokes the UserCollected handleruser.collect (correct)user.collect message and invokes a CollectUser handler (correct)user.collect message and invokes the UserCollected handler with the wrong data. (wrong)CollectUser handler does work, then publishes a message with routingKey user.collected (correct)user.collected message and invokes the UserCollected handler (correct)Why does the Manager get the user.collect message, given:
COLLECTED_USER queue not the COLLECT_USER queue, andCOLLECT_USER queue, has already handled the message.I create the subscribers and publishers as follows (trimmed for relevance)
given the AMQP url and params url, exchange, type, routingKey, queueName and handler
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
channel.assertExchange(exchange, type, { durable: true })
const result = await channel.assertQueue(queueName, { exclusive: false })
channel.bindQueue(result.queue, exchange, routingKey)
channel.prefetch(1)
channel.consume(result.queue, handler)
given the AMQP url and params url, exchange, and type
const connection = await amqp.connect(url)
const channel = await connection.createChannel()
await channel.assertExchange(exchange, type, { durable: true })
given the channel and params exchange, routingKey, and message
await channel.publish(exchange, routingKey, message)
This question is a follow-on from RabbitMQ — Why are my Routing Keys being ignored when using topic exchange .
I finally worked out what my problem was. A dirty exchange. While experimenting with this I'd inadvertently added an exchange that was routing messages to the wrong queue, and this was causing my confusion.
To fix it I fired up the RabbitMQ admin GUI and deleted all of the queues and let my code create the ones it needed. There was no issue with the code as outlined above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With