Я использую Python 2.6 и многопроцессорный модуль для многопоточности. Теперь я хотел бы иметь синхронизируемый dict (где единственная атомарная операция, в которой я действительно нуждаюсь, + = оператор на значении).
Я должен перенести dict с multiprocessing.sharedctypes.synchronized () вызов? Или иначе способ пойти?
Похоже, что здесь много предложений, сделанных на уровне "кресла", и нет работающих примеров. Ни один из перечисленных здесь ответов даже не предлагает использовать многопроцессорную обработку, и это немного разочаровывает и настораживает. Как любители python мы должны поддерживать наши встроенные библиотеки, и хотя параллельная обработка и синхронизация никогда не будут тривиальным делом, я считаю, что их можно сделать тривиальными при правильном проектировании. Это становится чрезвычайно важным в современных многоядерных архитектурах и не может быть достаточно подчеркнуто! Тем не менее, я далеко не удовлетворен библиотекой многопроцессорной обработки, поскольку она все еще находится в стадии становления и имеет довольно много подводных камней, ошибок и ориентирована на функциональное программирование (которое я ненавижу). В настоящее время я все еще предпочитаю модуль Pyro (который намного опередил свое время) мультипроцессингу из-за серьезного ограничения мультипроцессинга в том, что он не может делиться вновь созданными объектами во время работы сервера. Класс-метод "register" объектов менеджера будет фактически регистрировать объект только ДО запуска менеджера (или его сервера). Достаточно болтовни, еще код:
from multiprocessing.managers import SyncManager
class MyManager(SyncManager):
pass
syncdict = {}
def get_dict():
return syncdict
if __name__ == "__main__":
MyManager.register("syncdict", get_dict)
manager = MyManager(("127.0.0.1", 5000), authkey="password")
manager.start()
raw_input("Press any key to kill server".center(50, "-"))
manager.shutdown()
В приведенном выше примере кода Server.py использует SyncManager мультипроцессинга, который может предоставлять синхронизированные общие объекты. Этот код не будет работать в интерпретаторе, потому что библиотека multiprocessing довольно требовательна к тому, как найти "callable" для каждого зарегистрированного объекта. Запуск Server.py запустит настроенный SyncManager, который разделяет словарь syncdict для использования несколькими процессами и может быть подключен к клиентам либо на той же машине, либо, если запущен на IP-адресе, отличном от loopback, на других машинах. В данном случае сервер запущен на loopback (127.0.0.1) на порту 5000. Использование параметра authkey позволяет использовать безопасные соединения при работе с syncdict. При нажатии любой клавиши менеджер выключается.
from multiprocessing.managers import SyncManager
import sys, time
class MyManager(SyncManager):
pass
MyManager.register("syncdict")
if __name__ == "__main__":
manager = MyManager(("127.0.0.1", 5000), authkey="password")
manager.connect()
syncdict = manager.syncdict()
print "dict = %s" % (dir(syncdict))
key = raw_input("Enter key to update: ")
inc = float(raw_input("Enter increment: "))
sleep = float(raw_input("Enter sleep time (sec): "))
try:
#if the key doesn't exist create it
if not syncdict.has_key(key):
syncdict.update([(key, 0)])
#increment key value every sleep seconds
#then print syncdict
while True:
syncdict.update([(key, syncdict.get(key) + inc)])
time.sleep(sleep)
print "%s" % (syncdict)
except KeyboardInterrupt:
print "Killed client"
Клиент также должен создать настроенный SyncManager, зарегистрировав "syncdict", на этот раз без передачи вызываемого параметра для получения общей дикты. Затем он использует настроенный SycnManager для подключения, используя IP-адрес loopback (127.0.0.1) на порту 5000 и authkey, устанавливающий безопасное соединение с менеджером, запущенным в Server.py. Он получает общий dict syncdict, вызывая зарегистрированный callable на менеджере. Он запрашивает у пользователя следующие данные:
Затем клиент проверяет, существует ли ключ. Если нет, он создает ключ на syncdict. Затем клиент входит в "бесконечный" цикл, где он обновляет значение ключа на инкремент, спит указанное количество времени и печатает syncdict, повторяя этот процесс до тех пор, пока не произойдет прерывание клавиатуры (Ctrl+C).
Надеюсь, вам понравился этот довольно подробный и немного трудоемкий ответ, как и мне. У меня были большие проблемы с пониманием того, почему я испытывал такие трудности с модулем мультипроцессинга, в то время как Pyro делает это с легкостью, и теперь, благодаря этому ответу, я попал в точку. Я надеюсь, что это будет полезно для сообщества python в том, как улучшить модуль мультипроцессинга, так как я считаю, что у него большие перспективы, но в его зачаточном состоянии не хватает того, что возможно. Несмотря на описанные раздражающие проблемы, я думаю, что это все еще вполне жизнеспособная альтернатива и довольно простая. Вы также можете использовать SyncManager.dict() и передать его в Processes в качестве аргумента, как показано в документации, и это, вероятно, было бы еще более простым решением, в зависимости от ваших требований, но мне это кажется неестественным.
Я бы выделил отдельный процесс для поддержания "общей дикты": просто используйте, например, xmlrpclib, чтобы сделать этот небольшой объем кода доступным для других процессов, раскрывая через xmlrpclib, например. функцию, принимающую key, increment
для выполнения инкремента, и функцию, принимающую только key
и возвращающую значение, с семантическими деталями (есть ли значение по умолчанию для отсутствующих ключей, и т.д., и т.п.) в зависимости от потребностей вашего приложения.
Затем вы можете использовать любой подход для реализации выделенного процесса shared-dict: от однопоточного сервера с простым dict в памяти до простой sqlite DB и т.д. и т.п. Я предлагаю вам начать с кода "настолько простого, насколько это возможно" (в зависимости от того, нужен ли вам постоянный shared dict, или постоянство вам не нужно), а затем измерять и оптимизировать по мере необходимости.
В ответ на соответствующее решение проблемы одновременной записи. Я провел очень быстрое исследование и обнаружил, что эта статья предлагает решение с блокировкой / семафором. ( http://effbot.org/zone/thread-synchronization.htm )
Хотя пример не является конкретным для словаря, я почти уверен, что вы могли бы написать оболочку на основе классов объект, который поможет вам работать со словарями на основе этой идеи.
Если бы у меня было требование реализовать что-то подобное в потокобезопасном режиме, я бы, вероятно, использовал решение Python Semaphore. (Предполагая, что моя более ранняя техника слияния не сработает.) Я считаю, что семафоры обычно снижают эффективность потоков из-за их блокирующей природы.
С сайта:
Семафор - это более продвинутый механизм блокировки. Семафор имеет внутренний счетчик, а не флаг блокировки, и он блокируется только в том случае, если более заданного числа потоков пытались удержать семафор. В зависимости от того, как инициализируется семафор, это позволяет нескольким потокам одновременно обращаться к одному и тому же разделу кода.
semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter
Есть ли причина, по которой словарь должен быть опубликован в первую очередь?Не могли бы вы заставить каждый поток поддерживать свой собственный экземпляр словаря и либо объединять в конце обработки потока, либо периодически использовать обратный вызов для объединения копий отдельных словарей потоков вместе?
Я не знаю точно, что вы делаете, поэтому держитесь в моем, чтобы мой письменный план не работал дословно. То, что я предлагаю, больше похож на высокоуровневую дизайнерскую идею.