Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sending bytes in a string to Redis in order to using it as parameter for Celery's task

I want to send bytes in a string to Celery's task using Redis as broker, but I'm receiving error shown below:

[2017-06-17 21:27:13,826] ERROR in app: Exception on /endpoint_method [POST]
Traceback (most recent call last):
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1614, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1517, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1612, in full_dispatch_request
    rv = self.dispatch_request()
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/flask/app.py", line 1598, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/Users/developer/my_project/application.py", line 23, in endpoint_method
    task = my_task.execute.delay(request.data)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/celery/app/task.py", line 412, in delay
    return self.apply_async(args, kwargs)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/celery/app/task.py", line 535, in apply_async
    **options
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/celery/app/base.py", line 737, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 558, in send_task_message
    **properties
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/kombu/messaging.py", line 169, in publish
    compression, headers)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/kombu/messaging.py", line 252, in _prepare
    body) = dumps(body, serializer=serializer)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/kombu/serialization.py", line 221, in dumps
    payload = encoder(data)
  File "/Users/developer/my_project/venv/lib/python2.7/site-packages/kombu/utils/json.py", line 72, in dumps
    **dict(default_kwargs, **kwargs))
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/json/__init__.py", line 250, in dumps
    sort_keys=sort_keys, **kw).encode(obj)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/json/encoder.py", line 207, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/json/encoder.py", line 270, in iterencode
    return _iterencode(o, 0)
EncodeError: 'utf8' codec can't decode byte 0x80 in position 5: invalid start byte

Shortened (mostly it's up to 1 mln characters string) example data which I want to send to:

'RIFF$\x80\r\x00WAVEfmt \x10\x00\x00\x00\x01\x00\x02\x00D\xac\x00\x00\x10\xb1\x02\x00\x04\x00\x10\x00data\x00\x80\r\x00z\xefz\xef\xd5\xec\xd5\xec\xc1\xee\xc1\xee\xe6\xf3\xe6\xf3\xb4\xfa\xb4\xfa\x92\x03\x92\x03\xab\x0e\xab\x0e\xf9\x18\xf9\x18D\x1eD\x1ev\x1dv\x1d@\x19@\x19\x86\x13\x86\x13w\nw\n\xf0\xfd\xf0\xfd\xf9\xf2\xf9\xf2\xd0\xed\xd0\xedK\xedK\xed{\xed{\xed%\xee%\xeeP\xf3P\xf3\xeb\xfc\xeb\xfc!\x05!\x05\xa4\x08\xa4\x08\xe5\t\xe5\t\x84\x0b\x84\x0bF\x0bF\x0b\xfb\x04\xfb\x040\xfa0\xfa\x86\xf1\x86\xf1T\xeeT'

Part of the code responsible for calling Celery's task:

task = my_task.execute.delay(request.data)

This is how tasks/my_task.py file looks like:

from __future__ import absolute_import, unicode_literals
from my_module.celery import app

@app.task
def execute(request_data):
    return <some operations on request_data>

and celery.py:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery(
    'my_module',
    broker='redis://127.0.0.1:6379/1',
    backend='redis://127.0.0.1:6379/2',
    include=[
        'my_module.tasks.my_task'
    ]
)

app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

Maybe is there some clever way to encode that string before sending it to Redis and decode it in while running the task?

like image 394
dawlib Avatar asked Oct 21 '25 15:10

dawlib


1 Answers

Finally, I came up with a solution using binascii methods binascii.b2a_base64(data) and binascii.a2b_base64(string). This is what I had to change to make this working:

task = my_task.execute.delay(binascii.b2a_base64(request.data))

and in tasks/my_task.py I had to convert data back to be able to make proper operations on it:

binascii.a2b_base64(request_data)
like image 101
dawlib Avatar answered Oct 23 '25 07:10

dawlib



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!