Как сделать динамическое создание очередей для каждого процесса в многопроцессорной обработке Python

Я хочу динамически создать несколько Process es, где каждый экземпляр имеет очередь для входящих сообщений от других экземпляров, и каждый экземпляр также может создавать новые экземпляры. Таким образом, мы получаем сеть процессов, которые все отправляют друг другу. Каждому экземпляру разрешено отправлять друг другу.

Приведенный ниже код будет делать то, что я хочу: он использует Manager.dict () для хранения очередей, обеспечивая распространение обновлений и Lock ( ) для защиты доступа с записью к очередям. Однако при добавлении новой очереди он выдает «RuntimeError: объекты очереди должны разделяться между процессами только посредством наследования» .

Проблема в том, что при запуске мы не знаем, сколько очередей в конечном итоге потребуется, поэтому мы должны создавать их динамически. Но поскольку мы не можем совместно использовать очереди, кроме как во время строительства, я не знаю, как это сделать.

Я знаю, что одной из возможностей было бы сделать очереди глобальной переменной вместо управляемой. передано в __ init __ : проблема, насколько я понимаю, в том, что дополнения к переменной queues не будут распространяться на другие процессы.

EDIT Я работаю над эволюционными алгоритмами. Советники - это разновидность техники машинного обучения. EA моделирует «популяцию», которая развивается путем выживания наиболее приспособленных, кроссовера и мутации. В параллельных EA, как и здесь, у нас также есть миграция между популяциями, соответствующая межпроцессному взаимодействию. Острова также могут порождать новые острова, поэтому нам нужен способ отправки сообщений между динамически создаваемыми процессами.

import random, time
from multiprocessing import Process, Queue, Lock, Manager, current_process
try:
    from queue import Empty as EmptyQueueException
except ImportError:
    from Queue import Empty as EmptyQueueException

class MyProcess(Process):
    def __init__(self, queues, lock):
        super(MyProcess, self).__init__(target=lambda x: self.run(x),
                                     args=tuple())
        self.queues = queues
        self.lock = lock
        # acquire lock and add a new queue for this process
        with self.lock:
            self.id = len(list(self.queues.keys()))
            self.queues[self.id] = Queue()

    def run(self):
        while len(list(self.queues.keys())) < 10:

            # make a new process
            new = MyProcess(self.lock)
            new.start()

            # send a message to a random process
            dest_key = random.choice(list(self.queues.keys()))
            dest = self.queues[dest_key]
            dest.put("hello to %s from %s" % (dest_key, self.id))

            # receive messages
            message = True
            while message:
                try:
                    message = self.queues[self.id].get(False) # don't block
                    print("%s received: %s" % (self.id, message))
                except EmptyQueueException:
                    break

            # what queues does this process know about?
            print("%d: I know of %s" %
                  (self.id, " ".join([str(id) for id in self.queues.keys()])))

            time.sleep(1)

if __name__ == "__main__":
    # Construct MyProcess with a Manager.dict for storing the queues
    # and a lock to protect write access. Start.
    MyProcess(Manager().dict(), Lock()).start()
8
задан jmmcd 16 August 2011 в 20:59
поделиться