Многопроцессорная обработка Python 2.6. Очередь, совместимая с потоками?

Вы можете применять любую функцию, какую захотите. Некоторые из них уже реализованы для вас (например, mean, sum, но также first и last):

df.resample('D').first()
#             values
# datetime          
# 2018-05-08     0.1

Но вы можете просто применить любую функцию, какую захотите. будет передана вся группа для работы, как groupby.

Например, последний раз перед 2 часами ночи (при условии, что кадр данных уже отсортирован по индексу):

import datetime

def last_before_2_am(group):
    before_2_am = group[group.index.time < datetime.time(2, 0, 0)]
    return before_2_am.iloc[-1]

df.resample('D').apply(last_before_2_am)
#             values
# datetime          
# 2018-05-08     0.5
7
задан user43233 5 December 2008 в 00:59
поделиться

4 ответа

Спасибо за быстрый ответ. Я передаю многопроцессорную обработку. Экземпляры очереди как аргументы каждому Процессу, как Вы иллюстрируете. Отказ, кажется, происходит в потоках. Я создаю их путем разделения на подклассы поточной обработки. Поток и передача очереди к 'init' методу каждого экземпляра потока. Это, кажется, принятый способ передать в Очередях для поточной обработки подклассов. Моя единственная мысль это, что многопроцессорные Очереди не могут быть совместимы с потоками (хотя они, предположительно, ориентированы на многопотоковое исполнение).

-1
ответ дан 7 December 2019 в 10:09
поделиться

Необходимо передать Объекты очереди как аргументы цели.

Пример из документации многопроцессорной обработки:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

 if __name__ == '__main__':
     q = Queue()
     p = Process(target=f, args=(q,))
     p.start()
     print q.get()    # prints "[42, None, 'hello']"
     p.join()

Очереди являются потоком и обрабатывают безопасный.

2
ответ дан 7 December 2019 в 10:09
поделиться

Я еще не экспериментировал с многопроцессорной обработкой в 2,6, но я играл много с pyprocessing (как это назвали в 2,5).

Я вижу, что Вы ищете много процессов с каждым порождением ряд потоков соответственно.

Так как Вы используете многопроцессорный модуль, я предложу использование много процесс и не много подход потока, Вы поразите меньше проблем как мертвые блокировки и т.д.

Создайте объект очереди. http://pyprocessing.berlios.de/doc/queue-objects.html

Для создания много среды процесса используют пул: http://pyprocessing.berlios.de/doc/pool-objects.html, который справится с рабочими процессами для Вас. Вы можете затем подать заявку асинхронный/синхронный к рабочим и можете также добавить обратный вызов для каждого рабочего при необходимости. Но помните, перезванивают, общий блок кода, и он должен сразу возвратиться (как упомянуто в документации)

Некоторая дополнительная информация: При необходимости создайте менеджера http://pyprocessing.berlios.de/doc/manager-objects.html для управления доступом к объекту очереди. Необходимо будет сделать объект очереди совместно использованным для этого. Но преимущество состоит в том, что, когда-то совместно использованный и справился, можно получить доступ к этой общей очереди на всем протяжении сети путем создания объектов прокси. Это позволит Вам назвать методы централизованного общего объекта очереди как (по-видимому) собственные методы для любого сетевого узла.

вот пример кода из документации

Возможно выполнить сервер менеджера на одной машине и сделать, чтобы клиенты использовали его от других машин (предполагающий, что включенные брандмауэры позволяют его). Выполнение следующих команд создает сервер для общей очереди, которую могут использовать удаленные клиенты:

>>> from processing.managers import BaseManager, CreatorMethod
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy')
...
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from processing.managers import BaseManager, CreatorMethod
>>> class QueueManager(BaseManager):
...     get_proxy = CreatorMethod(typeid='get_proxy')
...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()
>>> queue.put('hello')

Если Вы настаиваете на безопасном потоковом материале, PEP371 (многопроцессорная обработка) ссылается на этот http://code.google.com/p/python-safethread/

4
ответ дан 7 December 2019 в 10:09
поделиться

Вы можете столкнуться с этой ошибкой:

http://bugs.python.org/issue4660

1
ответ дан 7 December 2019 в 10:09
поделиться
Другие вопросы по тегам:

Похожие вопросы: