Запись в файл с многопроцессорной обработкой

У меня следующая проблема в python.

Мне нужно параллельно провести некоторые вычисления, результаты которых нужно записать последовательно в файл. Итак, я создал функцию, которая получает multiprocessing.Queue и дескриптор файла, выполняет вычисления и выводит результат в файл:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

Но файл оказывается пустым после запуска сценария. Я попытался изменить функцию worker () на:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

и передать имя файла в качестве параметра. Тогда все работает так, как я задумал. Когда я пытаюсь сделать то же самое последовательно, без многопроцессорности, это также работает нормально.

Почему не сработало в первой версии? Я не вижу проблемы.

Также: могу ли я гарантировать, что два процесса не будут пытаться записать файл одновременно?


РЕДАКТИРОВАТЬ:

Спасибо. Я понял. Это рабочая версия:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()
12
задан Ryan M 25 September 2014 в 18:23
поделиться