Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make sure a Celery task was enqueued in Pytest?

I have an API endpoint to register new user. The "welcome email" will enqueue and do this task async. I have 2 unit tests to check:

  1. Api does save user's information to DB OK
  2. The Celery task does send email with right content+template

I want to add 3rd unit test to ensure "The endpoint has to enqueue email-sending after saving user form to DB"

I try with celery.AsyncResult but it ask me to run the worker. For further, even if the worker is ready, we still can't verify the task was enqueued or not because the ambiguous PENDING state:

  1. Task exists in queue but not execute yet: PENDING
  2. Task doesn't exist in queue: PENDING

Does anyone face this problem? How do I solve it?

like image 754
Davuz Avatar asked Dec 06 '25 08:12

Davuz


1 Answers

Common way to solve this problem in testing environments is to use the task_always_eager configuration setting, which basically instructs Celery to run the task like a regular function. Instead of the AsyncResult, Celery will make an object of the EagerResult type that behaves the same, but has completely different execution logic.

like image 165
DejanLekic Avatar answered Dec 07 '25 20:12

DejanLekic



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!