Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use ray with celery tasks?

Tags:

airflow

ray

Is there a way to use ray with celery. Which executor to use. I've tried a toy implementation and ran into following error:

[2020-05-25 00:21:07,473: ERROR/ForkPoolWorker-5] Task mytasks.add[4a2f6fba-4f1b-4a77-95a0-0ee3c488a927] raised unexpected: AttributeError("'LoggingProxy' object has no attribute 'fileno'",)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/home/akshay/Desktop/mytasks.py", line 15, in add
ray.init()
File "/home/akshay/.local/lib/python3.5/site-packages/ray/worker.py", line 1453, in init
driver_id=driver_id)
File "/home/akshay/.local/lib/python3.5/site-packages/ray/worker.py", line 1716, in connect
faulthandler.enable(all_threads=False)
AttributeError: 'LoggingProxy' object has no attribute 'fileno'

The Code implementation is here:

from celery import Celery, signals
import logging
import ray
celery = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost')
@celery.task
def add(value):
    ray.init()
    to = remote_chain_function.remote(value)
    return to


@ray.remote
def remote_chain_function(value):
    return value + 1
like image 870
noobwithskills Avatar asked Dec 01 '25 10:12

noobwithskills


1 Answers

You can get rid of the AttributeError: 'LoggingProxy' object has no attribute 'fileno' by setting this setting:

CELERY_WORKER_REDIRECT_STDOUTS = False

https://docs.celeryproject.org/en/stable/userguide/configuration.html?highlight=REDIRECT_STDOUTS#std-setting-worker_redirect_stdouts

like image 172
keyvanm Avatar answered Dec 05 '25 00:12

keyvanm



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!