Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to enforce logger format during Celery task execution?

I have some service which uses Python logging module to log debug logs.

my_service.py:

import logging

logger = logging.getLogger(__name__)

class SomeService:
    def synchronize(self):
        logger.debug('synchronizing stuff')
        external_library.call('do it')
        logger.debug('found x results')

Then, I use this service from celery task

tasks.py:

@shared_task
def synchronize_stuff():
    stuff = some_service.synchronize()

Worker then outputs log like this:

worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

Which is good enough for debugging, but I'd like to include the task name and uuid in these logs. This can be achieved by using celery task logger like this:

my_service.py:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

class SomeService:
    def synchronize(self):
        logger.debug('synchronizing stuff')
        external_library.call('do it')
        logger.debug('found x results')

Which does exactly what I want in terms of logging:

worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

But I have 2 problems with this:

  1. I don't want to use celery logger inside the service. The service can be used even in environments where Celery is not installed at all (then it's fine that the task name and uuid is not included in logs)

  2. Logs from external libraries executed during the same task do not use the same logger, therefore do not include the task name and uuid in logs.

Which leads me to this question: Is it possible to specify (force) logger at the task level (in tasks.py) that will be used no matter how do I log in my service or how do external libraries log? Something like this would be fine:

tasks.py:

@shared_task
def synchronize_stuff():
    logging.enforce_logger(get_task_logger(__name__))
    stuff = some_service.synchronize()
    logging.restore_logger()

Also it might be worth noting that I use Django in the project.

Thanks!

like image 716
Martin Janeček Avatar asked Oct 26 '25 11:10

Martin Janeček


1 Answers

Martin Janeček's proposed solution here is the only one I found that works for me

from logging import Filter
from celery.signals import setup_logging

class CeleryTaskFilter(Filter):
    def filter(self, record):
        return record.processName.find("Worker") != -1

celery_log_config = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "celeryTask": {
            "()": "celery.app.log.TaskFormatter",
            "fmt": "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]:%(module)s:%(funcName)s: %(message)s",
        },
    },
    "filters": {
        "celeryTask": {
            "()": CeleryTaskFilter,
        },
    },
    "handlers": {
        "console": {
            "level": "INFO",
            "class": stream_handler,
            "formatter": "celeryTask",
            "filters": ["celeryTask"],
        },
    },
    "loggers": {
        "": {
            "handlers": ["console"],
            "level": "DEBUG",
            "propagate": False,
        }
    },
}

Then I just make sure this is set up when Celery initializes

from logging.config import dictConfig

@setup_logging.connect
def setup_logging(**_kwargs):
    dictConfig(celery_log_config)

Note that I also took the filter for record.processName == 'MainProcess' with DEFAULT_PROCESS_LOG_FMT, otherwise we lose the logs. Those are the only logs I can get from within my Celery container, but I could easily imagine the need for a record.processName != 'MainProcess' and record.processName.find('Worker') == -1 filter as well in other use cases.

like image 104
Martin Lafrance Avatar answered Oct 29 '25 02:10

Martin Lafrance



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!