Building on the script from this answer, I have the following scenario: A folder containing 2500 large text files (~ 55Mb each), all tab delimited. Web logs, basically.
I need to md5 hash the second 'column' in each row of each file, saving the modified files elsewhere. The source files are on a mechanical disk and the destination files are on an SSD.
The script processes the first 25 (or so) files really quickly. It then slows WAY down. Based on the first 25 files, it should complete all of them in 2 minutes or so. However, based on the performance after that, it will take 15 minutes (or so) to complete them all.
It's running on a server with 32 Gb of RAM and task manager rarely shows over 6 Gb in use. I have it set to launch 6 processes, but the CPU usage on the cores is low, rarely going over 15%.
Why is this slowing down? Read/write issues to the disk? Garbage collector? Bad code? Any ideas about how to speed it up?
Here's the script
import os
import multiprocessing
from multiprocessing import Process
import threading
import hashlib
class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, fileset, filedirectory):
        threading.Thread.__init__(self)
        self.files_to_process = fileset
        self.filedir          = filedirectory
    def run(self):
        for current_file in self.files_to_process:
            # Open the current file as read only
            active_file_name = self.filedir + "/" + current_file
            output_file_name = "D:/hashed_data/" + "hashed_" + current_file
            active_file = open(active_file_name, "r")
            output_file = open(output_file_name, "ab+")
            for line in active_file:
                # Load the line, hash the username, save the line
                lineList = line.split("\t")
                if not lineList[1] == "-":
                    lineList[1] = hashlib.md5(lineList[1]).hexdigest()
                lineOut = '\t'.join(lineList)
                output_file.write(lineOut)
            # Always close files after you open them
            active_file.close()
            output_file.close()
            print "\nCompleted " + current_file
class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads, fileset, filedirectory):
        mythreads = []
        for tid in range(numThreads):
            th = ThreadRunner(fileset, filedirectory)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()
class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads, filedirectory):
        myprocs = []
        prunner = ProcessRunner()
        # Store the file names from that directory in a list that we can iterate
        file_names = os.listdir(filedirectory)
        file_sets = []
        for i in range(numProcesses):
            file_sets.append([])
        for index, name in enumerate(file_names):
            num = index % numProcesses
            file_sets[num].append(name)
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads, file_sets[pid], filedirectory)) 
            myprocs.append(pr) 
        for i in myprocs:
            i.start()
        for i in myprocs:
            i.join()
if __name__ == '__main__':    
    file_directory = "E:/original_data"
    processes = 6
    threads   = 1
    extractor = ParallelExtractor()
    extractor.runInParallel(numProcesses=processes, numThreads=threads, filedirectory=file_directory)
Hashing is a relatively simple task, and modern CPUs are very fast, compared to the speed of spinning disks. A quick-and-dirty benchmark on a i7 shows that it can hash about 450 MB/s using MD5, or 290 MB/s using SHA-1. Comparatively, spinning disk have a typical (sequencial raw read) speed of about 70-150 MB/s. This means that, even ignoring the overhead of the filesystem and eventual disk seeks, the CPU can hash a file about 3x faster than the disk can read it.
The performance boost you get on processing the first files probably happens because the first files are cached in memory by the operating system, so no disk I/O happens. This can be confirmed by either:
Now, since the performance bottleneck for hashing files is the disk, performing the hashing in multiple processes or threads is useless, because they'll all use the same disk. As @Max Noel mentioned, it can actually lower performance, because you'll be reading several files in parallel, so your disk will have to seek between the files. The performance will also vary depending on the I/O scheduler of the operating system you're using, as he mentioned.
Now, if you're still generating data, you have some possible solutions:
These solutions, however, are useless if all you want to do is hash those 2500 files and you already have them on a single disk. Reading them from the disk to other disks and then performing the hashing is slower, since you'll be reading the files twice, and you can hash as fast as you can read them.
Finally, based on @yaccz 's idea, I guess you could have avoided the trouble of writing a program to perform the hashing if you had installed cygwin binaries of find, xargs and md5sum. 
Why do things simple when one can make them complicated?
mount the drives via smbfs or whatnot on linux host and run
#! /bin/sh
SRC="" # FIXME
DST="" # FIXME
convert_line() {
    new_line=`echo $i | cut -f 1 -d "\t"`
    f2=`echo $i | cut -f 2 -d "\t"`
    frest=`echo $i | cut -f 1,2 --complement -d "\t"`
    if [ ! "x${f2}" = "-" ] ; then
        f2=`echo "${f2}" | md5sum | head -c-1`
        # might wanna throw in some memoization
    fi
    echo "${new_line}\t$f2\t${frest}"
}
convert_file() {
    for i in `cat $1`; do
        convert_line "${i}" >> $DST/hashed-$1
    done
}
for i in $SRC/*; do
    convert_file $i
done
not tested. might need polishing some rough edges.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With