Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python logging with multithreading + multiprocessing

Please take time to read full question to understand the exact issue. Thankyou.

I have a runner/driver program that listens to a Kafka topic and dispatches tasks using a ThreadPoolExecuter whenever a new message is received on the topic ( as shown below ) :



consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
                                 bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
                                 value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                                 enable_auto_commit=False,
                                 auto_offset_reset='latest',
                                 max_poll_records=1,
                                 max_poll_interval_ms=300000)


with ThreadPoolExecutor(max_workers=10) as executor:
     futures = []
     for message in consumer:
         futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))

There is a bunch of code in between but that code is not important here so I have skipped it.

Now, the SOME_FUNCTION is from another python script that is imported ( infact there is a hierarchy of imports that happen in later stages ). What is important is that at some point in these scripts, I call the Multiprocessing Pool because I need to do parallel processing on data ( SIMD - single instruction multiple data ) and use the apply_async function to do so.

for loop_message_chunk in loop_message_chunks:
    res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))

Now, I have 2 versions of the runner/driver program :

  1. Kafka based ( the one shown above )

    • This version spawns threads that start multiprocessing

    Listen To Kafka -> Start A Thread -> Start Multiprocessing

  2. REST based ( using flask to achieve same task with a REST call )

    • This version does not start any threads and calls multiprocessing right away

    Listen to REST endpoint -> Start Multiprocessing

Why 2 runner/driver scripts you ask? - this microservice will be used by multiple teams and some want synchronous REST based while some teams want a real time and asynchronous system that is KAFKA based

When I do logging from the parallelized function ( self.one_matching.match in above example ) it works when called through the REST version but not when called using the KAFKA version ( basically when multiprocessing is kicked off by a thread - it does not work ).

Also notice that only the logging from the parallelized function does not work. rest of the scripts in the hierarchy from runner to the script that calls apply_async - which includes scripts that are called from within the thread - log successfully.

Other details :

  • I configure loggers using yaml file
  • I configure the logger in the runner script itself for either KAFKA or REST version
  • I do a logging.getLogger in every other script called after the runner script to get specific loggers to log to different files

Logger Config ( values replaced with generic since I cannot chare exact names ):

version: 1
formatters:
  simple:
    format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
  custom1:
    format: '%(asctime)s | %(filename)s :: %(message)s'
  time-message:
    format: '%(asctime)s | %(message)s'
handlers:
  console:
    class: logging.StreamHandler
    level: DEBUG
    formatter: simple
    stream: ext://sys.stdout
  handler1:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile1.log
  handler2:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: custom1
    level: INFO
    filename: logs/logfile2.log
  handler3:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile3.log
  handler4:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile4.log
  handler5:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile5.log
loggers:
  logger1:
    level: DEBUG
    handlers: [console, handler1]
    propagate: no
  logger2:
    level: DEBUG
    handlers: [console, handler5]
    propagate: no
  logger3:
    level: INFO
    handlers: [handler2]
    propagate: no
  logger4:
    level: DEBUG
    handlers: [console, handler3]
    propagate: no
  logger5:
    level: DEBUG
    handlers: [console, handler4]
    propagate: no
  kafka:
    level: WARNING
    handlers: [console]
    propogate: no
root:
  level: INFO
  handlers: [console]
  propogate: no
like image 295
Nirmik Avatar asked May 09 '26 19:05

Nirmik


1 Answers

Possible answer: get rid of the threads and use asyncio instead

example pseudocode structure (cobbled together from these examples)


#pseudocode example structure: probably has bugs...
from aiokafka import AIOKafkaConsumer
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial

async def SOME_FUNCTION_CO(executor, **kwargs):
    res_list = []
    for loop_message_chunk in loop_message_chunks:
        res_list.append(executor.submit(self.one_matching.match, hash_set, loop_message_chunk, fields))
    #call concurrent.futures.wait on res_list later, and cancel unneeded futures (regarding one of your prior questions)
    return res_list
    

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()

    #Global executor:
    #I would also suggest using a "spawn" context unless you really need the
    #performance of "fork".
    ctx = multiprocessing.get_context("spawn")
    tasks = [] #similar to futures in your example (Task subclasses asyncio.Future which is similar to concurrent.futures.Future as well)
    with ProcessPoolExecutor(mp_context=ctx) as executor:
        try:
            # Consume messages
            async for msg in consumer:
                tasks.append(asyncio.create_task(SOME_FUNCTION_CO(executor, **kwargs)))
        finally:
            # Will leave consumer group; perform autocommit if enabled.
            await consumer.stop()

if __name__ == "__main__":
    asyncio.run(consume())

I keep going back and forth on how I think I should represent SOME_FUNCTION in this example, but the key point here is that in the loop over msg in consumer, you are scheduling the tasks to be complete eventually. If any of these tasks take a long time it could block the main loop (which is also running the async for msg in consumer line). Instead; any of these tasks that could take a long time should return a future of some type quickly so you can simply access the result once it's ready.

like image 68
Aaron Avatar answered May 11 '26 10:05

Aaron