Я пытаюсь выполнить 100 прогонов модели на своем 8-процессорном 64-битном компьютере с Windows 7. Я хотел бы запустить 7 экземпляров модели одновременно, чтобы уменьшить общее время выполнения (примерно 9,5 мин на запуск модели). Я просмотрел несколько потоков, относящихся к многопроцессорному модулю Python, но все еще чего-то не хватает.
Использование модуля multiprocessing
Как создавать параллельные дочерние процессы в многопроцессорной системе?
Многопроцессорная очередь Python
Мой процесс:
У меня есть 100 различных наборов параметров, которые я хотел бы запустить через SEAWAT/MODFLOW, чтобы сравнить результаты. Я предварительно создал входные файлы модели для каждого запуска модели и сохранил их в своих собственных каталогах. Что я хотел бы сделать, так это запустить 7 моделей одновременно, пока все реализации не будут завершены. Нет необходимости в обмене данными между процессами или отображении результатов. До сих пор мне удавалось создавать модели только последовательно:
import os,subprocess
import multiprocessing as mp
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
if f.endswith('.npy'):
files.append(f)
## def work(cmd):
## return subprocess.call(cmd, shell=False)
def run(f,def_param=ws):
real = f.split('_')[2].split('.')[0]
print 'Realization %s' % real
mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
exe = seawatV4x64
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
os.system( exe + swt_nam )
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
tasks = range(len(files))
results = []
for f in files:
r = p.map_async(run(f), tasks, callback=results.append)
Я изменил if __name__ == 'main':
на следующее в надежде, что это исправит недостаток параллелизма, который, как я чувствую, передается в приведенном выше сценарии с помощью цикла for
.Однако модель даже не запускается (ошибки Python нет):
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
p.map_async(run,((files[f],) for f in range(len(files))))
Мы будем очень признательны за любую помощь!
РЕДАКТИРОВАТЬ 26.03.2012 13:31 EST
Использование метода «Ручной пул» в @J.F. Ответ Себастьяна ниже. Я получаю параллельное выполнение моего внешнего .exe. Реализации модели вызываются партиями по 8 за раз, но не ожидает завершения этих 8 прогонов перед вызовом следующей партии и т. д.:
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam])
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()
Нет доступной трассировки ошибок. Функция run()
выполняет свои обязанности при вызове одного файла реализации модели, как и в случае нескольких файлов. Единственная разница заключается в том, что с несколькими файлами он вызывается len(files)
раз, хотя каждый из экземпляров немедленно закрывается, и разрешено завершить только один запуск модели, после чего скрипт корректно завершает работу (код выхода 0). ).
Добавление некоторых операторов печати в main()
показывает некоторую информацию об активных счетчиках потоков, а также о статусе потоков (обратите внимание, что это тест только для 8 файлов реализации, чтобы сделать снимок экрана более управляемым). , теоретически все 8 файлов должны запускаться одновременно, однако поведение сохраняется там, где они появляются и немедленно умирают, кроме одного):
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\test')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
print('Active Count a',threading.activeCount())
for _ in threads:
print(_)
q.put_nowait(None) # signal no more files
for t in threads:
print(t)
t.join() # wait for completion
print('Active Count b',threading.activeCount())
**Строка, которая читает " D:\\Data\\Users...
" — это информация об ошибке, выдаваемая, когда я вручную останавливаю выполнение модели до ее завершения. Как только я останавливаю работу модели, сообщаются оставшиеся строки состояния потока, и скрипт завершает работу.
РЕДАКТИРОВАТЬ 26.03.2012 16:24 EST
SEAWAT позволяет параллельное выполнение, как я делал это в прошлом, создавая экземпляры вручную с помощью iPython и запуская их из каждой папки с файлами моделей. На этот раз я запускаю все запуски модели из одного места, а именно из каталога, в котором находится мой скрипт. Похоже, что виновником может быть способ, которым SEAWAT сохраняет часть выходных данных. При запуске SEAWAT сразу же создаются файлы, относящиеся к запуску модели. Один из этих файлов сохраняется не в каталоге, в котором находится реализация модели, а в верхнем каталоге, где находится скрипт. Это препятствует тому, чтобы любые последующие потоки сохраняли одно и то же имя файла в том же месте (что они все хотят сделать, поскольку эти имена файлов являются общими и неспецифичными для каждой реализации). Окна SEAWAT не оставались открытыми достаточно долго, чтобы я мог прочитать или даже увидеть сообщение об ошибке, я понял это только тогда, когда вернулся и попытался запустить код с помощью iPython, который напрямую отображает распечатку из SEAWAT вместо открытия окна. новое окно для запуска программы.
Я принимаю @J.F. Ответ Себастьяна, поскольку вполне вероятно, что как только я решу эту проблему с исполняемой моделью, предоставленный им потоковый код приведет меня туда, где мне нужно быть.
ОКОНЧАТЕЛЬНЫЙ КОД
В subprocess.check_call добавлен аргумент cwd для запуска каждого экземпляра SEAWAT в своем собственном каталоге. Очень ключевой.
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
cwd = ws + r'\reals\real%s\ss' % real
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()