Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery one broker multiple queues and workers

I have a python file called tasks.py in which I am defining 4 single tasks. I would like to configure celery in order to use 4 queues because each queue would have a different number of workers assigned. I was reading I should use route_task property but I tried several options and not a success.

I was following this doc celery route_tasks docs

My goal would be run 4 workers, one for each task, and don't mix tasks from different workers in different queues. It's possible? It's a good approach?

If I am doing something wrong I would be happy to change my code to make it work

Here is my config so far

tasks.py

app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('queueA',    routing_key='tasks.task_1'),
    Queue('queueB',    routing_key='tasks.task_2'),
    Queue('queueC',    routing_key='tasks.task_3'),
    Queue('queueD',    routing_key='tasks.task_4')
)


@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"


@app.task
def task_3():
    print "Task of level 3"


@app.task
def task_4():
    print "Task of level 4"

Run celery one worker for each queue

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
celery -A tasks worker --loglevel=debug -Q queueC --logfile=celery-C.log -n W3&
celery -A tasks worker --loglevel=debug -Q queueD --logfile=celery-D.log -n W4&
like image 819
cralfaro Avatar asked Sep 07 '25 20:09

cralfaro


1 Answers

There is no need to get into complex routing for submitting tasks into different queues. Define your tasks as usual.

from celery import celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"

Now while queuing the tasks, put the tasks in proper queue. Here is an example on how to do it.

In [12]: from tasks import *

In [14]: result = task_1.apply_async(queue='queueA')

In [15]: result = task_2.apply_async(queue='queueB')

This will put the task_1 in queue named queueA and task_2 in queueB.

Now you can just start your workers to consume them.

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
like image 68
Pandikunta Anand Reddy Avatar answered Sep 10 '25 03:09

Pandikunta Anand Reddy