I've been using Storm lately which contains a concept called fields grouping (afaict unrelated to the group() concept in Celery), where messages with a certain key will always be routed to the same worker.
Just to get a clearer definition of what I mean, here it is from the Storm wiki.
Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
For example, reading from a word list, I would like to route words starting with a, b, c to only worker process, d, e, f to another, etc.
A reason for wanting this might be because I would like one process to be responsible for database reading/writing for a set of same data, so that there aren't race conditions between processes.
I'm trying to work out the best way to achieve this within Celery.
My best solution so far is to use a queue for each "group" (e.g. letters.a, letters.d), and ensure that the number of worker processes exactly match the number of queues. The downside is that it would have to run only one process per worker, along with various scenarios like when workers die, or when workers are added/removed.
I'm new to Celery so please correct me if the concepts I refer to are incorrect.
There's a bit of glue involved, but here is the concept:
There is a way to send tasks directly to different workers by using CELERY_WORKER_DIRECT. Setting it to True creates a route to each worker.
I periodically determine active workers by using celery.current_app.control.inspect().ping() or determine active hosts. e.g.:
>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']
When I need to route by a key, I would hash the value then modulo by the number of workers. This would distribute the tasks evenly, and keep the same key going to the same worker. e.g.:
>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'
Then when executing the task, I simply specify the exchange and routing key like so:
my_task.apply_async(exchange='C.dq', routing_key=host)
There are a few downsides:
ping() and an apply_async, the message will be sent to a route that doesn't exist. A fix for this is to catch the timeout, reassert the available hosts, rehash, and resend.The point of celery is that you don't need to manage individual workers.
If you need tasks to take ownership of data, then the tasks should take ownership at the beginning of their run.
If you want to individually manage workers, probably don't use celery. You should probably just write the workers yourself, and just use a message queue (or maybe storm). Use the right tool for the right job.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With