I am trying to run a simple workflow using celery and using this documentation. I am using chain to sequentially run the tasks, with following workflow
Extract a file, tokenize it and load JSON dumps of sentence tokens of a doc to another (new)file. Iterate the workflow over list of files in a folder
Following is my code:-
folder structure
celery-pipeline/
├── celeryapp.py
├── celeryconfig.py
├── data/
├── output/
└── tasks.py
celeryapp.py
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
celeryconfig.py
imports = ('tasks',)
broker_url =  'redis://localhost:6379/0'
result_backend = 'db+postgresql://celery_user:[email protected]:5432/celery_db'
task_ignore_result = False
task_track_started = True
task_default_queue = 'default'
task_default_rate_limit = '20/s'
task_time_limit = 7200
worker_pool_restarts = True
tasks.py
import os
import json
import spacy
import logging
from datetime import datetime, timedelta
from celeryapp import app
sp = spacy.load('en_core_web_sm')
@app.task(bind=True)
def extract(self, filename):
    file_path = os.path.join(os.getcwd(), 'data', filename)
    doc = open(file_path).read()
    print('Extract called')
    return doc
@app.task(bind=True)
def transform_tokenize_doc(self, doc:str):
    sentences = []
    for sent in sp(doc).sents:
        sentences.append(str(sent).strip())
    return sentences
@app.task(bind=True)
def load(self, filename, *args):
    with open(os.path.join(os.getcwd(), 'output', filename), 'a+') as file:
        file.write(json.dumps(args, indent=4))
if __name__ == '__main__':
    tasks = []
    for filename in os.listdir(os.path.join(os.getcwd(), 'data'))[:10]:
        print(f'filename is {filename}')
        etl = (extract.s(filename) | transform_tokenize_doc.s() | load.s(filename)).apply_async()
        tasks.append(etl)
    for task in tasks:
        task.get()
On running celery -A tasks worker --loglevel=info inside root folder - celery-pipeline/, I am getting following error:-
Traceback (most recent call last):
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/objects.py", line 41, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'control'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/ubuntu/Documents/projects/celery-venv/bin/celery", line 8, in <module>
    sys.exit(main())
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/__main__.py", line 15, in main
    sys.exit(_main())
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bin/celery.py", line 213, in main
    return celery(auto_envvar_prefix="CELERY")
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/decorators.py", line 21, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bin/base.py", line 132, in caller
    return f(ctx, *args, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bin/worker.py", line 326, in worker
    **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/worker.py", line 99, in __init__
    self.setup_instance(**self.prepare_args(**kwargs))
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/worker.py", line 139, in setup_instance
    self.blueprint.apply(self, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 211, in apply
    step.include(parent)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 379, in include
    inc, ret = self._should_include(parent)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 335, in _should_include
    return True, self.create(parent)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/components.py", line 238, in create
    prefetch_multiplier=w.prefetch_multiplier,
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 331, in instantiate
    return instantiate(name, *args, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/utils/imports.py", line 44, in instantiate
    return symbol_by_name(name)(*args, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 212, in __init__
    self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 205, in apply
    step = S(parent, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/consumer/control.py", line 25, in __init__
    self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/pidbox.py", line 28, in __init__
    self.node = c.app.control.mailbox.Node(
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/objects.py", line 43, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/app/base.py", line 1230, in control
    return instantiate(self.control_cls, app=self)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/utils/imports.py", line 44, in instantiate
    return symbol_by_name(name)(*args, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/imports.py", line 56, in symbol_by_name
    module = imp(module_name, package=package, **kwargs)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/app/control.py", line 9, in <module>
    from kombu.matcher import match
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/matcher.py", line 132, in <module>
    for ep, args in entrypoints('kombu.matchers'):
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/compat.py", line 93, in entrypoints
    for ep in importlib_metadata.entry_points().get(namespace, [])
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/__init__.py", line 865, in entry_points
    return SelectableGroups.load(eps).select(**params)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/__init__.py", line 340, in load
    ordered = sorted(eps, key=by_group)
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/__init__.py", line 863, in <genexpr>
    dist.entry_points for dist in unique(distributions())
  File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/_itertools.py", line 16, in unique_everseen
    k = key(element)
AttributeError: 'PathDistribution' object has no attribute 'name'
I tried to search the error on stackoverflow but could find not much insight about the error. Would appreciate some hint on it
The problem is probably related to importlib-metadata. Try adding a requirement to your venv to restrict it to an earlier version. In a similar case, importlib-metadata<3.4.0 worked for me.
The next release of spacy (v3.0.6) should fix this problem (at least if it's only related to spacy) by removing importlib-metadata as a requirement.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With