The runtime of this code on my laptop for a 4.2 GB input file is 48 seconds. The input file is tab-delimited, with each value appearing in quotes. Each record ends with a newline, e.g. '"val1"\t"val2"\t"val3"\t..."valn"\n'
I've tried using multiprocessing with 10 threads: One to queue up the lines, 8 to parse individual lines and fill an output queue, and one to reduce the output queue into the defaultdict shown below, but the code took 300 seconds to run, over 6 times longer than the following:
from collections import defaultdict
def get_users(log):
users = defaultdict(int)
f = open(log)
# Read header line
h = f.readline().strip().replace('"', '').split('\t')
ix_profile = h.index('profile.type')
ix_user = h.index('profile.id')
# If either ix_* is the last field in h, it will include a newline.
# That's fine for now.
for (i, line) in enumerate(f):
if i % 1000000 == 0: print "Line %d" % i # progress notification
l = line.split('\t')
if l[ix_profile] != '"7"': # "7" indicates a bad value
# use list slicing to remove quotes
users[l[ix_user][1:-1]] += 1
f.close()
return users
I've checked that I'm not I/O-bound by removing everything but the print statement from the for loop. That code ran in 9 seconds, which I'll consider a lower-bound for how fast this code can run.
I have a lot of these 5 GB files to process, so even a pretty small improvement in runtime (I know, I can remove the print!) will help. The machine I am running on has 4 cores, so I can't help but wonder if there's a way to get the multithread/multiprocess code to run faster than the code above.
UPDATE:
I rewrote the multiprocessing code as follows:
from multiprocessing import Pool, cpu_count
from collections import defaultdict
def parse(line, ix_profile=10, ix_user=9):
"""ix_profile and ix_user predetermined; hard-coding for expedience."""
l = line.split('\t')
if l[ix_profile] != '"7"':
return l[ix_user][1:-1]
def get_users_mp():
f = open('20110201.txt')
h = f.readline() # remove header line
pool = Pool(processes=cpu_count())
result_iter = pool.imap_unordered(parse, f, 100)
users = defaultdict(int)
for r in result_iter:
if r is not None:
users[r] += 1
return users
It runs in 26 seconds, a 1.85x speedup. Not bad, but with 4 cores, not as much as I had hoped for.