Сломанный канал при использовании менеджеров многопроцессорной обработки Python (BaseManager / SyncManager) обмениваться очередями с удаленными машинами

В прошлом месяце у нас была постоянная проблема с многопроцессорным пакетом Python 2.6.x, когда мы ' Мы пытались использовать его для разделения очереди между несколькими компьютерами (linux). Я задал этот вопрос непосредственно Джесси Ноллеру, так как мы еще не нашли ничего, что объясняло бы проблему в StackOverflow, документах Python, исходном коде или где-либо еще в сети.

Наша команда инженеров не смогла решить этот вопрос, и мы задали этот вопрос довольно многим людям в группах пользователей Python безрезультатно. Я надеялся, что кто-то может пролить некоторую проницательность, поскольку я чувствую, что мы делаем что-то неправильно, но слишком близки к проблеме, чтобы понять, что это такое.

Вот симптом:

Traceback (most recent call last):
  File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
    return queue, queue.get(block=False)
  File "<string>", line 2, in get
  File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

(я показываю, где наш код вызывает queue.get () для объекта общей очереди, размещенного менеджером, который расширяет SyncManger).

Что ' Особенность этой проблемы заключается в том, что если мы подключаемся к этой общей очереди на одной машине (назовем этот компьютер A ), даже из множества параллельных процессов, мы никогда не столкнемся с проблемой. Только когда мы подключаемся к очереди (опять же, используя класс, который расширяет многопроцессорность SyncManager и в настоящее время не добавляет дополнительных функций) с других машин (назовем эти машины B и C ) и запускаем большое количество элементов в очередь и из нее одновременно с проблемой.

Как будто многопроцессорный пакет python обрабатывает локальные соединения (даже если они все еще используют один и тот же менеджер). метод соединения connect ()), который работает с машины A , но когда удаленные соединения выполняются одновременно хотя бы с одной из машин B или C , мы получаем ошибку сломанной трубы.

Во всех чтениях, которые моя команда сделала, мы думали, что проблема была связана с блокировкой. Мы подумали, что, возможно, нам не следует использовать Queue.Queue , а вместо этого multiprocessing.Queue , но мы переключились, и проблема осталась (мы также заметили, что собственная общая очередь SyncManager является примером Queue.Queue).

Мы стараемся изо всех сил отладить проблему, поскольку ее трудно воспроизвести, но это происходит довольно часто (много раз в день, если мы вставляем и .get () получаем много элементов из очереди).

Метод, который мы создали get_from_queue , пытается повторить попытку получения элемента из очереди ~ 10 раз с рандомизированными интервалами ожидания, но кажется, что если один раз произойдет сбой, он потерпит неудачу все десять раз (что приводит меня полагать, что .register () и .connect () для менеджера, возможно, не дают другого сокет-соединения с сервером, но я не мог подтвердить это, либо читая документы, либо просматривая внутренний исходный код Python).

Может ли кто-нибудь рассказать о том, куда мы можем посмотреть или как мы можем отследить, что на самом деле происходит?

Как мы можем начать новое соединение в случае разрыва трубы, используя multiprocessing.BaseManager или multiprocessing. SyncManager ?

Как мы можем предотвратить сломанный канал в первую очередь?

14
задан David 6 September 2010 в 06:33
поделиться