многопроцессорность Python :некоторые функции не возвращаются после завершения (слишком большой материал в очереди)

Я использую многопроцессорный процесс и очередь. Я запускаю несколько функций параллельно, и большинство из них ведут себя хорошо :они завершают работу, их вывод направляется в их очередь, и они отображаются как.is _живые ()== False. Но по какой-то причине пара функций не работает. Они всегда показывают.is _жив ()== True, даже после того, как последняя строка в функции (завершает оператор печати, говорящий «Готово» ). Это происходит независимо от того, какой набор функций я запускаю, даже если он всего один. Если не работать параллельно, функции ведут себя нормально и возвращаются нормально. Какая вещь может быть проблемой?

Вот общая функция, которую я использую для управления заданиями. Все, что я не показываю, это функции, которые я ему передаю. Они длинные, часто используют matplotlib, иногда запускают какие-то команды оболочки, но я не могу понять, что общего у неудачных.

def  runFunctionsInParallel(listOf_FuncAndArgLists):
    """
    Take a list of lists like [function, arg1, arg2,...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.   
    """
    from multiprocessing import Process, Queue

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
        print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
        que.put(fff(*theArgs)) #we're putting return value into queue
        print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
        # We get this far even for "bad" functions
        return

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
    for job in jobs: job.start() # Launch them all
    import time
    from math import sqrt
    n=1
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
        n+=1
        time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
        print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
        print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
        print('---------------------------------------------------\n')
    # I never get to the following line when one of the "bad" functions is running.
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
    # And now, collect all the outputs:
    return([queue.get() for queue in queues])
24
задан Oblivious Sage 27 January 2014 в 21:08
поделиться