Использование модуля Python Multiprocessing для выполнения одновременных и отдельных прогонов модели SEAWAT/MODFLOW

Я пытаюсь выполнить 100 прогонов модели на своем 8-процессорном 64-битном компьютере с Windows 7. Я хотел бы запустить 7 экземпляров модели одновременно, чтобы уменьшить общее время выполнения (примерно 9,5 мин на запуск модели). Я просмотрел несколько потоков, относящихся к многопроцессорному модулю Python, но все еще чего-то не хватает.

Использование модуля multiprocessing

Как создавать параллельные дочерние процессы в многопроцессорной системе?

Многопроцессорная очередь Python

Мой процесс:

У меня есть 100 различных наборов параметров, которые я хотел бы запустить через SEAWAT/MODFLOW, чтобы сравнить результаты. Я предварительно создал входные файлы модели для каждого запуска модели и сохранил их в своих собственных каталогах. Что я хотел бы сделать, так это запустить 7 моделей одновременно, пока все реализации не будут завершены. Нет необходимости в обмене данными между процессами или отображении результатов. До сих пор мне удавалось создавать модели только последовательно:

import os,subprocess
import multiprocessing as mp

ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
    if f.endswith('.npy'):
        files.append(f)

## def work(cmd):
##     return subprocess.call(cmd, shell=False)

def run(f,def_param=ws):
    real = f.split('_')[2].split('.')[0]
    print 'Realization %s' % real

    mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
    mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
    seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '

    exe = seawatV4x64
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real

    os.system( exe + swt_nam )


if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    tasks = range(len(files))
    results = []
    for f in files:
        r = p.map_async(run(f), tasks, callback=results.append)

Я изменил if __name__ == 'main':на следующее в надежде, что это исправит недостаток параллелизма, который, как я чувствую, передается в приведенном выше сценарии с помощью цикла for.Однако модель даже не запускается (ошибки Python нет):

if __name__ == '__main__':
    p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
    p.map_async(run,((files[f],) for f in range(len(files))))

Мы будем очень признательны за любую помощь!

РЕДАКТИРОВАТЬ 26.03.2012 13:31 EST

Использование метода «Ручной пул» в @J.F. Ответ Себастьяна ниже. Я получаю параллельное выполнение моего внешнего .exe. Реализации модели вызываются партиями по 8 за раз, но не ожидает завершения этих 8 прогонов перед вызовом следующей партии и т. д.:

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam])

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()

    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':

    mp.freeze_support() # optional if the program is not frozen
    main()

Нет доступной трассировки ошибок. Функция run()выполняет свои обязанности при вызове одного файла реализации модели, как и в случае нескольких файлов. Единственная разница заключается в том, что с несколькими файлами он вызывается len(files)раз, хотя каждый из экземпляров немедленно закрывается, и разрешено завершить только один запуск модели, после чего скрипт корректно завершает работу (код выхода 0). ).

Добавление некоторых операторов печати в main()показывает некоторую информацию об активных счетчиках потоков, а также о статусе потоков (обратите внимание, что это тест только для 8 файлов реализации, чтобы сделать снимок экрана более управляемым). , теоретически все 8 файлов должны запускаться одновременно, однако поведение сохраняется там, где они появляются и немедленно умирают, кроме одного):

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\test')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    print('Active Count a',threading.activeCount())
    for _ in threads:
        print(_)
        q.put_nowait(None) # signal no more files
    for t in threads: 
        print(t)
        t.join() # wait for completion
    print('Active Count b',threading.activeCount())

screenshot

**Строка, которая читает " D:\\Data\\Users... " — это информация об ошибке, выдаваемая, когда я вручную останавливаю выполнение модели до ее завершения. Как только я останавливаю работу модели, сообщаются оставшиеся строки состояния потока, и скрипт завершает работу.

РЕДАКТИРОВАТЬ 26.03.2012 16:24 EST

SEAWAT позволяет параллельное выполнение, как я делал это в прошлом, создавая экземпляры вручную с помощью iPython и запуская их из каждой папки с файлами моделей. На этот раз я запускаю все запуски модели из одного места, а именно из каталога, в котором находится мой скрипт. Похоже, что виновником может быть способ, которым SEAWAT сохраняет часть выходных данных. При запуске SEAWAT сразу же создаются файлы, относящиеся к запуску модели. Один из этих файлов сохраняется не в каталоге, в котором находится реализация модели, а в верхнем каталоге, где находится скрипт. Это препятствует тому, чтобы любые последующие потоки сохраняли одно и то же имя файла в том же месте (что они все хотят сделать, поскольку эти имена файлов являются общими и неспецифичными для каждой реализации). Окна SEAWAT не оставались открытыми достаточно долго, чтобы я мог прочитать или даже увидеть сообщение об ошибке, я понял это только тогда, когда вернулся и попытался запустить код с помощью iPython, который напрямую отображает распечатку из SEAWAT вместо открытия окна. новое окно для запуска программы.

Я принимаю @J.F. Ответ Себастьяна, поскольку вполне вероятно, что как только я решу эту проблему с исполняемой моделью, предоставленный им потоковый код приведет меня туда, где мне нужно быть.

ОКОНЧАТЕЛЬНЫЙ КОД

В subprocess.check_call добавлен аргумент cwd для запуска каждого экземпляра SEAWAT в своем собственном каталоге. Очень ключевой.

from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading

def run(f,ws):
    real = f.split('_')[-1].split('.')[0]
    print('Realization %s' % real)
    seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
    cwd = ws + r'\reals\real%s\ss' % real
    swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
    subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)

def worker(queue):
    """Process files from the queue."""
    for args in iter(queue.get, None):
        try:
            run(*args)
        except Exception as e: # catch exceptions to avoid exiting the
                               # thread prematurely
            print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
    # populate files
    ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
    wdir = os.path.join(ws, r'fieldgen\reals')
    q = Queue()
    for f in os.listdir(wdir):
        if f.endswith('.npy'):
            q.put_nowait((os.path.join(wdir, f), ws))

    # start threads
    threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
    for t in threads:
        t.daemon = True # threads die if the program dies
        t.start()
    for _ in threads: q.put_nowait(None) # signal no more files
    for t in threads: t.join() # wait for completion

if __name__ == '__main__':
    mp.freeze_support() # optional if the program is not frozen
    main()

17
задан 14 revs 23 May 2017 в 11:46
поделиться