In celery i want to get the task status for all the tasks for specific task name. For that tried below code.
import celery.events.state
# Celery status instance.
stat = celery.events.state.State()
# task_by_type will return list of tasks.
query = stat.tasks_by_type("my_task_name")
# Print tasks.
print query
Now i'm getting empty list in this code.
Celery needs a backend to store the state of your task if you want to track it. There are two main operation models for the result backend: RPC (like RabbitMQ/QPid) or a database. Both have its pros and cons and you should check the documentation to get the right one for your application.
Now, open yet another terminal and enter command cd examples/queue-based-distribution/ and then enter celery -A celery-task-queue status as the basic monitoring command is status , which returns the state of the workers. The result of celery -A celery-task-queue status will be the status and the number of nodes online.
The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.
To pass arguments to task with apply_async() you need to wrap them in a list and then pass the list as first argument, I.e. apply_async([arg1, arg2, arg3]) . See the documentation for more details and examples. Use delay() as an alternative.
celery.events.state.State() is a data-structure used to keep track of the state of celery workers and tasks. When calling State(), you get an empty state object with no data.
You should use app.events.Receiver(Stream Processing) or celery.events.snapshot(Batch Processing) to capture state that contains tasks.
Sample Code:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With