Как синхронизировать Python dict с многопроцессорной обработкой

Я использую Python 2.6 и многопроцессорный модуль для многопоточности. Теперь я хотел бы иметь синхронизируемый dict (где единственная атомарная операция, в которой я действительно нуждаюсь, + = оператор на значении).

Я должен перенести dict с multiprocessing.sharedctypes.synchronized () вызов? Или иначе способ пойти?

24
задан Peter Smit 30 March 2010 в 14:26
поделиться

4 ответа

Вступление

Похоже, что здесь много предложений, сделанных на уровне "кресла", и нет работающих примеров. Ни один из перечисленных здесь ответов даже не предлагает использовать многопроцессорную обработку, и это немного разочаровывает и настораживает. Как любители python мы должны поддерживать наши встроенные библиотеки, и хотя параллельная обработка и синхронизация никогда не будут тривиальным делом, я считаю, что их можно сделать тривиальными при правильном проектировании. Это становится чрезвычайно важным в современных многоядерных архитектурах и не может быть достаточно подчеркнуто! Тем не менее, я далеко не удовлетворен библиотекой многопроцессорной обработки, поскольку она все еще находится в стадии становления и имеет довольно много подводных камней, ошибок и ориентирована на функциональное программирование (которое я ненавижу). В настоящее время я все еще предпочитаю модуль Pyro (который намного опередил свое время) мультипроцессингу из-за серьезного ограничения мультипроцессинга в том, что он не может делиться вновь созданными объектами во время работы сервера. Класс-метод "register" объектов менеджера будет фактически регистрировать объект только ДО запуска менеджера (или его сервера). Достаточно болтовни, еще код:

Server.py

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()

В приведенном выше примере кода Server.py использует SyncManager мультипроцессинга, который может предоставлять синхронизированные общие объекты. Этот код не будет работать в интерпретаторе, потому что библиотека multiprocessing довольно требовательна к тому, как найти "callable" для каждого зарегистрированного объекта. Запуск Server.py запустит настроенный SyncManager, который разделяет словарь syncdict для использования несколькими процессами и может быть подключен к клиентам либо на той же машине, либо, если запущен на IP-адресе, отличном от loopback, на других машинах. В данном случае сервер запущен на loopback (127.0.0.1) на порту 5000. Использование параметра authkey позволяет использовать безопасные соединения при работе с syncdict. При нажатии любой клавиши менеджер выключается.

Client.py

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"

Клиент также должен создать настроенный SyncManager, зарегистрировав "syncdict", на этот раз без передачи вызываемого параметра для получения общей дикты. Затем он использует настроенный SycnManager для подключения, используя IP-адрес loopback (127.0.0.1) на порту 5000 и authkey, устанавливающий безопасное соединение с менеджером, запущенным в Server.py. Он получает общий dict syncdict, вызывая зарегистрированный callable на менеджере. Он запрашивает у пользователя следующие данные:

  1. Ключ в syncdict для работы
  2. Сумма для увеличения значения, к которому обращается ключ, каждый цикл
  3. Количество времени сна за цикл в секундах

Затем клиент проверяет, существует ли ключ. Если нет, он создает ключ на syncdict. Затем клиент входит в "бесконечный" цикл, где он обновляет значение ключа на инкремент, спит указанное количество времени и печатает syncdict, повторяя этот процесс до тех пор, пока не произойдет прерывание клавиатуры (Ctrl+C).

Досадные проблемы

  1. Методы register менеджера ДОЛЖНЫ быть вызваны до запуска менеджера, иначе вы получите исключения, даже если вызов dir менеджера покажет, что у него действительно есть метод, который был зарегистрирован.
  2. Все манипуляции с dict должны выполняться с помощью методов, а не присвоения dict (syncdict["blast"] = 2 будет неудачным из-за того, как мультипроцессинг разделяет пользовательские объекты)
  3. Использование метода dict SyncManager могло бы облегчить раздражающую проблему #2, за исключением того, что раздражающая проблема #1 препятствует регистрации и совместному использованию прокси, возвращаемого SyncManager.dict(). (SyncManager.dict() может быть вызван только ПОСЛЕ запуска менеджера, а register будет работать только ДО запуска менеджера, поэтому SyncManager. dict() полезен только при функциональном программировании и передаче прокси в Processes в качестве аргумента, как это делается в примерах doc)
  4. Серверу и клиенту обоим нужно регистрироваться, хотя интуитивно кажется, что клиент сможет разобраться с этим после подключения к менеджеру (Пожалуйста, добавьте это в список пожеланий разработчиков многопроцессорных систем)

Заключение

Надеюсь, вам понравился этот довольно подробный и немного трудоемкий ответ, как и мне. У меня были большие проблемы с пониманием того, почему я испытывал такие трудности с модулем мультипроцессинга, в то время как Pyro делает это с легкостью, и теперь, благодаря этому ответу, я попал в точку. Я надеюсь, что это будет полезно для сообщества python в том, как улучшить модуль мультипроцессинга, так как я считаю, что у него большие перспективы, но в его зачаточном состоянии не хватает того, что возможно. Несмотря на описанные раздражающие проблемы, я думаю, что это все еще вполне жизнеспособная альтернатива и довольно простая. Вы также можете использовать SyncManager.dict() и передать его в Processes в качестве аргумента, как показано в документации, и это, вероятно, было бы еще более простым решением, в зависимости от ваших требований, но мне это кажется неестественным.

55
ответ дан 28 November 2019 в 22:36
поделиться

Я бы выделил отдельный процесс для поддержания "общей дикты": просто используйте, например, xmlrpclib, чтобы сделать этот небольшой объем кода доступным для других процессов, раскрывая через xmlrpclib, например. функцию, принимающую key, increment для выполнения инкремента, и функцию, принимающую только key и возвращающую значение, с семантическими деталями (есть ли значение по умолчанию для отсутствующих ключей, и т.д., и т.п.) в зависимости от потребностей вашего приложения.

Затем вы можете использовать любой подход для реализации выделенного процесса shared-dict: от однопоточного сервера с простым dict в памяти до простой sqlite DB и т.д. и т.п. Я предлагаю вам начать с кода "настолько простого, насколько это возможно" (в зависимости от того, нужен ли вам постоянный shared dict, или постоянство вам не нужно), а затем измерять и оптимизировать по мере необходимости.

4
ответ дан 28 November 2019 в 22:36
поделиться

В ответ на соответствующее решение проблемы одновременной записи. Я провел очень быстрое исследование и обнаружил, что эта статья предлагает решение с блокировкой / семафором. ( http://effbot.org/zone/thread-synchronization.htm )

Хотя пример не является конкретным для словаря, я почти уверен, что вы могли бы написать оболочку на основе классов объект, который поможет вам работать со словарями на основе этой идеи.

Если бы у меня было требование реализовать что-то подобное в потокобезопасном режиме, я бы, вероятно, использовал решение Python Semaphore. (Предполагая, что моя более ранняя техника слияния не сработает.) Я считаю, что семафоры обычно снижают эффективность потоков из-за их блокирующей природы.

С сайта:

Семафор - это более продвинутый механизм блокировки. Семафор имеет внутренний счетчик, а не флаг блокировки, и он блокируется только в том случае, если более заданного числа потоков пытались удержать семафор. В зависимости от того, как инициализируется семафор, это позволяет нескольким потокам одновременно обращаться к одному и тому же разделу кода.

semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter
4
ответ дан 28 November 2019 в 22:36
поделиться

Есть ли причина, по которой словарь должен быть опубликован в первую очередь?Не могли бы вы заставить каждый поток поддерживать свой собственный экземпляр словаря и либо объединять в конце обработки потока, либо периодически использовать обратный вызов для объединения копий отдельных словарей потоков вместе?

Я не знаю точно, что вы делаете, поэтому держитесь в моем, чтобы мой письменный план не работал дословно. То, что я предлагаю, больше похож на высокоуровневую дизайнерскую идею.

3
ответ дан 28 November 2019 в 22:36
поделиться
Другие вопросы по тегам:

Похожие вопросы: