Как породить параллельные дочерние процессы на многопроцессорной системе?

У меня есть сценарий Python, который я хочу использовать в качестве контроллера к другому сценарию Python. У меня есть сервер с 64 процессорами, поэтому хотят породить до 64 дочерних процессов этого второго сценария Python. Дочерний сценарий называют:

$ python create_graphs.py --name=NAME

где ИМЯ - что-то как XYZ, ABC, Нью-Йоркский университет и т.д.

В моем родительском сценарии контроллера я получаю переменную имени из списка:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Таким образом, мой вопрос, что лучший способ состоит в том, чтобы породить от этих процессов как дети? Я хочу ограничить число детей к 64 за один раз, так должен отследить состояние (если дочерний процесс закончился или не), таким образом, я могу эффективно поддерживать целое поколение в рабочем состоянии.

Я изучил использование пакета подпроцесса, но отклонил его, потому что это только порождает одного ребенка за один раз. Я наконец нашел многопроцессорный пакет, но я признаюсь, что был поражен целыми потоками по сравнению с документацией подпроцессов.

Прямо сейчас, мое использование сценария subprocess.call только порождать одного ребенка за один раз и похоже на это:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Я действительно хочу, чтобы это породило 64 ребенка за один раз. В других stackoverflow вопросах я видел, что люди использовали Очередь, но она походит, который создает хит производительности?

42
задан tshepang 13 August 2012 в 18:30
поделиться

4 ответа

То, что вам нужно, это класс пула процессов в многопроцессорной обработке.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

А вот пример вычисления, чтобы облегчить понимание. Следующее разделит 10000 задач на N процессов, где N - количество ЦП. Обратите внимание, что я передаю None как количество процессов. Это заставит класс Pool использовать cpu_count для количества процессов ( ссылка )

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results
61
ответ дан 26 November 2019 в 23:51
поделиться

Я бы определенно использовал многопроцессорность вместо того, чтобы использовать собственное решение с помощью подпроцесса.

1
ответ дан 26 November 2019 в 23:51
поделиться

Я не думаю, что вам нужна очередь, если вы не собираетесь получать данные из приложений (что, если вам действительно нужны данные, я думаю, что в любом случае будет проще добавить их в базу данных)

но попробуйте это для размера:

поместите все содержимое вашего скрипта create_graphs.py в функцию под названием «create_graphs»

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Я знаю, что это приведет к на 1 потоку меньше, чем процессоров, что, вероятно, хорошо , он оставляет процессор для управления потоками, дисковым вводом-выводом и другими вещами, происходящими на компьютере. Если вы решите, что хотите использовать последнее ядро, просто добавьте к нему одно

edit : Я думаю, что я мог неверно истолковать назначение my_list. Вам не нужен my_list , чтобы отслеживать потоки вообще (поскольку на все они ссылаются элементы в списке thread ). Но это прекрасный способ подачи входных данных процессов - или даже лучше: использовать функцию генератора;)

Назначение my_list и потоков

my_list хранит данные которые вам нужно обработать в вашей функции
потоки - это просто список текущих запущенных потоков

, цикл while выполняет две вещи: запускает новые потоки для обработки данных и проверяет, выполняются ли какие-либо потоки .

Итак, пока у вас есть либо (а) больше данных для обработки, либо (б) потоки, которые не завершены ... вы хотите, чтобы программа продолжала работу. После того, как оба списка станут пустыми, они будут вычислены как False , и цикл while завершится

1
ответ дан 26 November 2019 в 23:51
поделиться

Вот решение, которое я придумал, основываясь на комментариях Нади и Джима. Я не уверен, что это лучший способ, но он работает. Первоначальный вызываемый дочерний скрипт должен быть скриптом оболочки, потому что мне нужно использовать некоторые сторонние приложения, включая Matlab. Так что мне пришлось извлечь это из Python и закодировать на bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Кажется ли это разумным решением? Я попытался использовать формат цикла while Джима, но мой скрипт ничего не вернул. Я не уверен, почему это так. Вот результат, когда я запускаю скрипт с циклом «while» Джима, заменяющим цикл «for»:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Когда я запускаю его с циклом «for», я получаю кое-что более значимое:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Итак, это работает, и Я счастлив. Однако я до сих пор не понимаю, почему я не могу использовать цикл стиля «while» Джима вместо цикла «for», который я использую.

2
ответ дан 26 November 2019 в 23:51
поделиться
Другие вопросы по тегам:

Похожие вопросы: