Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python sys.excepthook working only on main process but not on subprocesses

I have an app with some subprocess running and I have successfully set the sys.excepthook exception handling for the main process. Now, I want to set it for the same hook on the subprocesses. I would expect it to be as simple copying the exact lines of code I used on the main process and that's it but it didn't work.

Next is my code:

    class Consumer(multiprocessing.Process):

        def __init__(self, codec_status_queue, logger_queue):
            multiprocessing.Process.__init__(self)
            self.codec_status_queue = codec_status_queue
            self.logger_queue = logger_queue

        def run(self):
            # Set default unhandled exceptions handler
            uncaughtErrorHandler = UncaughtErrorHandler(self.logger_queue)
            sys.excepthook = uncaughtErrorHandler.error_handler

            1/0


    class UncaughtErrorHandler(object):

        def __init__(self, logger_queue, child_processes=None):
            self.logger_queue = logger_queue
            self.child_processes = child_processes

        def error_handler(self, type, value, trace_back):
            trace_formatted = "".join(traceback.format_tb(trace_back))
            exeption_message = "Unhandled Exception:\n Type: %s\n Value: %s\n Line: %s\n Traceback:\n %s" % (type, value.message, trace_back.tb_lineno, trace_formatted)
            logger_queue.put((LoggerThread.CRITICAL, exeption_message))

            if self.child_processes:
                self.stop_children()

            # Stopping this process
            sys.exit()

        def stop_children(self):
            num_children = len(self.child_processes)
            logger_queue.put((LoggerThread.DEBUG, "Terminating child processes (%s)" % num_children))
            for process in self.child_processes:
                log_message = "Terminating %s with PID %s" % (process.name, process.pid)
                logger_queue.put((LoggerThread.DEBUG, log_message))
                process.terminate()


    if __name__ == '__main__':
        ...
        # Create processes and communication queues
        codec_status_queue = multiprocessing.Queue()

        num_consumers = multiprocessing.cpu_count() * 2
        print 'Creating %d consumers' % num_consumers
        consumers = [ Consumer(codec_status_queue, logger_queue)
                      for i in xrange(num_consumers) ]

        # Set default unhandled exceptions handler
        uncaughtErrorHandler = UncaughtErrorHandler(logger_queue, consumers)
        sys.excepthook = uncaughtErrorHandler.error_handler

        # Start processes
        for consumer in consumers:
            consumer.daemon = True
            consumer.start()

If I put the 1/0 on the __main__ part the UncaughtErrorHandler catches the exception but when the 1/0 is put as shown above, it doesn't.

Maybe someone can tell me what am I doing wrong?

like image 976
Storo Avatar asked Sep 18 '25 08:09

Storo


1 Answers

The following code was written for Python 3.x but can be adapted to work with Python 3.x instead. It provides an alternative solution to overriding sys.excepthook in child processes. A simple fix involves catching all exceptions and handing the data from sys.exc_info over to the exception handler. The main process could use a similar pattern for exceptions but retains the original design from your program. The example shown below should be a full working demonstration you can play around with and adapt to your needs.

#! /usr/bin/env python3
import logging
import multiprocessing
import queue
import sys
import threading
import time
import traceback


def main():
    """Demonstrate exception handling and logging in several processes."""
    logger_queue = multiprocessing.Queue()
    logger_thread = LoggerThread(logger_queue)
    logger_thread.start()
    try:
        # Create processes and communication queues
        codec_status_queue = multiprocessing.Queue()
        num_consumers = multiprocessing.cpu_count() * 2
        print('Creating {} consumers'.format(num_consumers))
        consumers = [Consumer(codec_status_queue, logger_queue)
                     for _ in range(num_consumers)]
        # Set default unhandled exceptions handler
        uncaught_error_handler = UncaughtErrorHandler(logger_queue, consumers)
        sys.excepthook = uncaught_error_handler.error_handler
        # Start processes
        for consumer in consumers:
            consumer.start()
        time.sleep(2)
    finally:
        logger_thread.shutdown()


def get_message(value):
    """Retrieve an exception's error message and return it."""
    if hasattr(value, 'message'):
        return value.message
    if hasattr(value, 'args') and value.args:
        return value.args[0]


class LoggerThread(threading.Thread):

    """Handle logging messages coming from various sources via a queue."""

    CRITICAL = logging.CRITICAL
    DEBUG = logging.DEBUG

    def __init__(self, logger_queue):
        """Initialize an instance of the LoggerThread class."""
        super().__init__()
        self.logger_queue = logger_queue
        self.mutex = threading.Lock()
        self.running = False

    def run(self):
        """Process messages coming through the queue until shutdown."""
        self.running = True
        while self.running:
            try:
                while True:
                    self.handle_message(*self.logger_queue.get(True, 0.1))
            except queue.Empty:
                pass

    def handle_message(self, level, message):
        """Show the message while ensuring a guaranteed order on screen."""
        with self.mutex:
            print('Level:', level)
            print('Message:', message)
            print('=' * 80, flush=True)

    def shutdown(self):
        """Signal the thread to exit once it runs out of messages."""
        self.running = False


class Consumer(multiprocessing.Process):

    """Simulate a consumer process that handles data from a queue."""

    def __init__(self, codec_status_queue, logger_queue):
        """Initialize an instance of the Consumer class."""
        super().__init__()
        self.codec_status_queue = codec_status_queue
        self.logger_queue = logger_queue
        self.daemon = True

    def run(self):
        """Begin working as a consumer while handling any exceptions."""
        # Set default unhandled exceptions handler
        uncaught_error_handler = UncaughtErrorHandler(self.logger_queue)
        try:
            self.do_consumer_work()
        except:
            uncaught_error_handler.error_handler(*sys.exc_info())

    def do_consumer_work(self):
        """Pretend to be doing the work of a consumer."""
        junk = 1 / 0
        print('Process', self.ident, 'calculated', junk)


class UncaughtErrorHandler:

    """Organize error handling to automatically terminate child processes."""

    def __init__(self, logger_queue, child_processes=None):
        """Initialize an instance of the UncaughtErrorHandler class."""
        self.logger_queue = logger_queue
        self.child_processes = child_processes

    def error_handler(self, kind, value, trace_back):
        """Record errors as they happen and terminate the process tree."""
        trace_formatted = ''.join(traceback.format_tb(trace_back))
        exception_message = ('Unhandled Exception:\n'
                             '    Type: {}\n'
                             '    Value: {}\n'
                             '    Line: {}\n'
                             '    Traceback:\n{}').format(
            kind, get_message(value), trace_back.tb_lineno, trace_formatted)
        self.logger_queue.put((LoggerThread.CRITICAL, exception_message))
        if self.child_processes:
            self.stop_children()
        # Stopping this process
        sys.exit()

    def stop_children(self):
        """Terminate all children associated with this error handler."""
        num_children = len(self.child_processes)
        log_message = 'Terminating child processes({})'.format(num_children)
        self.logger_queue.put((LoggerThread.DEBUG, log_message))
        for process in self.child_processes:
            log_message = 'Terminating {} with PID {}'.format(
                process.name, process.pid)
            self.logger_queue.put((LoggerThread.DEBUG, log_message))
            process.terminate()


if __name__ == '__main__':
    main()
like image 120
Noctis Skytower Avatar answered Sep 19 '25 22:09

Noctis Skytower