I'm using the following code to monitor file access from a running job.
When the job is stopped my code receive a SIGINT.
As this job is very intensive, there's buffered IO and I can't unbuffered those writes, and I want a precise log.
So I tried to catch SIGINT and flush the file before shutting down my script I end up with :
RuntimeError: reentrant call inside <_io.BufferedWriter name=
As I understand from several articles I read, it's impossible to consistently use write/print/flush command as they are not thread safe in a signal handler.
My question is how can I ensure that my file is written properly before shutting down the script ?
Here's a simpler version of my script:
import signal
import sys
import os
import time
from time import strftime
import inotify.adapters
separator = ';'
jump = '\n'
logfile_pointer = open("path/to/log/file", 'w')
#Try to close nicely everything
def signal_handler(signal, frame):
logfile_pointer.flush()
logfile_pointer.close()
sys.exit(0)
#Register signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
eventHandler = inotify.adapters.InotifyTrees(["/folder/one","/folder/two"])
for event in eventHandler.event_gen():
if event is not None:
(_, type_names, path, filename) = event
try:
timestamp = '%.2f'%(time.time())
filepath=path +'/'+ filename
logfile_pointer.write ("{}{}{}{}{}{}{}{}".format(timestamp, separator, filepath , separator , type_names[0] ,separator, os.path.getsize(filepath) , jump )
except os.error as e:
pass
The typical approach here is to have the signal handler set a flag, and return without exiting. The main loop checks the flag and when it’s set, cleans up and exits.
In this particular instance this means you need to have the event producer yield regularly; with PyInotify you can do this by setting a short timeout. This would end up looking like
[...]
exit_requested = False
def signal_handler(signal, frame):
# Perhaps check which signal was received...
exit_requested = True
[...]
for event in eventHandler.event_gen(timeout_s = 1):
if exit_requested:
# Clean up and exit
if event:
...
When event_gen returns None because it timed out, inotify events which occur before the next call to event_gen will be queued and not lost: inotify events are consumed when they are read from the inotify file descriptor, and the event handler here keeps this open.
I had several issue to solve one being the way to stop my script from running as Python have some strange thread conception, here's my solution :
define a thread that will be the inotify watcher:
import os
import sys
import time
import signal
import argparse
import inotify.adapters
from time import strftime
from threading import Thread
from argparse import RawTextHelpFormatter
class EventMonitor(Thread):
separator = ';'
jump = '\n'
def __init__(self, folders, logfile):
Thread.__init__(self)
check_message=''
self.eventHandler = None
self.stop = False
self.logfile = open(logfile,'w',buffering=bufferSize)
self.line_count = 0
self.alive=True
self.eventHandler = inotify.adapters.InotifyTrees(folders)
def run(self):
while not self.stop:
for event in self.eventHandler.event_gen( timeout_s = 3 ):
try:
if event is not None:
(_, type_names, path, filename) = event
timestamp = '%.2f'%(time.time())
filepath=path +'/'+ filename
self.logfile.write ("{}{}{}{}{}{}{}{}".format(timestamp, self.separator, filepath , self.separator , type_names[0] ,self.separator, os.path.getsize(filepath) , self.jump ))
except os.error as e:
pass
for event in self.eventHandler.event_gen( timeout_s = 1 ):
try:
if event is not None:
(_, type_names, path, filename) = event
timestamp = '%.2f'%(time.time())
filepath=path +'/'+ filename
self.logfile.write ("{}{}{}{}{}{}{}{}".format(timestamp, self.separator, filepath , self.separator , type_names[0] ,self.separator, os.path.getsize(filepath) , self.jump ))
except os.error as e:
pass
self.logfile.flush()
self.logfile.close()
self.alive=False
def stopped(self):
if not self.stop:
self.stop = True
else:
print("Event Monitoring is already disabled")
def isAlive(self):
return self.alive
Then in my main script :
import os
import sys
import time
import signal
import argparse
import traceback
from time import strftime
from CPUMonitor import CPUMonitor
from EventMonitor import EventMonitor
from argparse import RawTextHelpFormatter
#define argument
parser = argparse.ArgumentParser(description='attache spies on multiple folders in argument and generate a csv log file containing a list of event on files.File is formatted like this: \ntimestamp;fullpath;event;size\n123456897.25;/path/file;IN_OPEN;0\n/123456899.25;path/file;IN_CLOSE;1234\n.....\nFor more info about inotify events => `man inotify`',formatter_class=RawTextHelpFormatter)
parser.add_argument("-l", "--log-folder",type=str, help="Destination folder for the logs. If no value /tmp is used", default='/tmp')
parser.add_argument("-e", "--event", help="enable file event watch ",action="store_true")
parser.add_argument( 'folders', metavar='folderpath', type=str ,help='a list of folder path to spy on if -e is not set this will be ignore.', nargs = '*', default=[os.getcwd()])
args = parser.parse_args()
#Try to close nicely everything
def signal_handler(signal, frame):
if CPU_thread is not None:
CPU_thread.stopped()
if Event_thread is not None:
Event_thread.stopped()
print('Kill signal receive.{}CPU and Event monitoring stopped.{}'.format(jump,jump))
sys.exit(0)
#Register signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
try:
#define variable
separator = ';'
jump = '\n'
logDest = ''
go = True
Event_logfile = None
Event_logfile_debug = None
Event_thread = None
jobname = ''
check_message=''
if not os.path.isdir(args.log_folder):
go=False
check_message = check_message + "/!\ Log folder {} is not a directory. Monitoring won't start{}".format(args.log_folder,jump)
elif not os.access(args.log_folder, os.W_OK | os.X_OK) :
go=False
check_message = check_message + "/!\ Log folder {} is not writable. Monitoring won't start{}".format(args.log_folder,jump)
else:
check_message = check_message + "Log folder is a proper directory and can be RW. {}".format(jump)
if not go :
print(check_message)
sys.exit(-2)
if go :
event_logfile = args.log_folder + '/Event_'+os.environ['JOB_ID'] + '_' + strftime("%Y-%m-%d_%H:%M:%S") + '-log.txt'
print('Event logfile: {}{}'.format(event_logfile,jump) )
print( 'Start monitoring of the event on: {} {}'.format( args.folders, jump ))
Event_thread = EventMonitor(args.folders, event_logfile)
Event_thread.start()
else:
print(("Error detected, monitoring hasn't started{}".format(jump)))
sys.exit(-4)
while Event_thread is not None and Event_thread.isAlive() :
time.sleep(5)
if Event_thread is not None:
Event_thread.join()
except Exception as error:
traceback.print_exc()
print(str(error))
sys.exit(-5)
In the thread as long as the thread is not stopped it will look for event and write them inside the file.
When stopped() is called the loop will time out after 3 seconds without event then I start the event loop a last time with a shorter timeout of 1 seconds, once all events are treated, the thread stops and isAlive() return False.
In the main program when SIGINT or SIGHUP is received it ask the thread to stop, and the python script only stops once the thread stops properly.
This code work both in Python 2.7.15 and 3.6.7 and above; however, keep in mind that this is a simplified version of my code and it might not work as is and might need some adjustment.
PS: thanks to Stephen answer which helps me a lot.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With