I have an object model_test_api which is new by as following.
import test_script
model_test_api = test_script.TestAPI(api)
the api which requires to new TestAPI is also an object.
import model_api
api = model_api.Model_API()
I am trying the following two methods to pass model_test_api to a celery task.
1. another_function.apply_async(model_test_api)
2. another_function.apply_async(lambda: test_script.TestAPI(api))
However, both of them gave me an errorkombu.exceptions.EncodeError: Object of type function is not JSON serializable. Then I am packing model_test_api to a json object.
import json
json_test_api = json.dumps(model_test_api.__dict__)
another_function.apply_async(json_test_api)
It gives me another error TypeError: Object of type Model_API is not JSON serializable. Anyone has an idea of what should I do? Thanks in advance.
Gonna cover a few things before getting stuck in with answer, so celery enables event streaming for python. You have a database which contains a queue of tasks that need processing, normally this db is postgres or redis. There are workers that are watching the queue and when something gets added to the queue a worker on the network with available processing resource picks the task up, performs the processing and then that task is marked as completed.
So when you trigger the line another_function.apply_async that means the task called another_function is added to the queue and any parameter values that are passed in are as well. This means that all parameters need to be serialised in order to be stored in the database postgres or redis. Celery cannot store in memory python objects in a queue because it could be a completely different machine that performs the processing and it will not have the object in its RAM.
I will give an example of recommended implementation, say you have a model called CIA_Agent_Model and it has three attributes a primary key and some text fields. You have a celery task called another_function and you want to pass in the CIA_Agent_Model instance, you would call the celery task using another_function.delay(agent_model_instance.id) where the parameter being passed in is the primary key of the object rather than the object itself. Then inside the celery task you get the object via the ID again from your database. You can also serialise the model instance https://www.django-rest-framework.org/api-guide/serializers/ and the result of serialisation can be passed into another_function.delay(model_serialisation_result).
You can create a custom function that converts the returned object of test_script.TestAPI(api) to a json dict and then pass that json dict into a celery task call parameter.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With