Ok here is an overview of what's going on:
    M <-- Message with unique id of 1234
    |
    +-Start Queue
    |
    |
    | <-- Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues
Q1  Q2  Q3
\   |   / <-- start of the problem is here
 \  |  / 
  \ | /
   \|/
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 
So I have an exchange that pushes to multiple queues, each queue has a task, once all tasks are completed, only then can Queue 4 start.
So message with unique id of 1234 gets sent to the exchange, the exchange routes it to all the task queues ( Q1, Q2, Q3, etc... ), when all the tasks for message id 1234 have completed, run Q4 for message id 1234.
How can I implement this?
Using Symfony2, RabbitMQBundle and RabbitMQ 3.x
Resources:
UPDATE #1
Ok I think this is what I'm looking for:
RPC with Parallel Processing, but how do I set the Correlation Id to be my unique id to group the messages and also identify what queue?
You need to implement this: http://www.eaipatterns.com/Aggregator.html but the RabbitMQBundle for Symfony doesn't support that so you would have to use the underlying php-amqplib.
A normal consumer callback from the bundle will get an AMQPMessage. From there you can access the channel and manually publish to whatever exchanges comes next in your "pipes and filters" implementation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With