Вы можете применять любую функцию, какую захотите. Некоторые из них уже реализованы для вас (например, mean
, sum
, но также first
и last
):
df.resample('D').first()
# values
# datetime
# 2018-05-08 0.1
Но вы можете просто применить любую функцию, какую захотите. будет передана вся группа для работы, как groupby
.
Например, последний раз перед 2 часами ночи (при условии, что кадр данных уже отсортирован по индексу):
import datetime
def last_before_2_am(group):
before_2_am = group[group.index.time < datetime.time(2, 0, 0)]
return before_2_am.iloc[-1]
df.resample('D').apply(last_before_2_am)
# values
# datetime
# 2018-05-08 0.5
Спасибо за быстрый ответ. Я передаю многопроцессорную обработку. Экземпляры очереди как аргументы каждому Процессу, как Вы иллюстрируете. Отказ, кажется, происходит в потоках. Я создаю их путем разделения на подклассы поточной обработки. Поток и передача очереди к 'init' методу каждого экземпляра потока. Это, кажется, принятый способ передать в Очередях для поточной обработки подклассов. Моя единственная мысль это, что многопроцессорные Очереди не могут быть совместимы с потоками (хотя они, предположительно, ориентированы на многопотоковое исполнение).
Необходимо передать Объекты очереди как аргументы цели.
Пример из документации многопроцессорной обработки:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
Очереди являются потоком и обрабатывают безопасный.
Я еще не экспериментировал с многопроцессорной обработкой в 2,6, но я играл много с pyprocessing (как это назвали в 2,5).
Я вижу, что Вы ищете много процессов с каждым порождением ряд потоков соответственно.
Так как Вы используете многопроцессорный модуль, я предложу использование много процесс и не много подход потока, Вы поразите меньше проблем как мертвые блокировки и т.д.
Создайте объект очереди. http://pyprocessing.berlios.de/doc/queue-objects.html
Для создания много среды процесса используют пул: http://pyprocessing.berlios.de/doc/pool-objects.html, который справится с рабочими процессами для Вас. Вы можете затем подать заявку асинхронный/синхронный к рабочим и можете также добавить обратный вызов для каждого рабочего при необходимости. Но помните, перезванивают, общий блок кода, и он должен сразу возвратиться (как упомянуто в документации)
Некоторая дополнительная информация: При необходимости создайте менеджера http://pyprocessing.berlios.de/doc/manager-objects.html для управления доступом к объекту очереди. Необходимо будет сделать объект очереди совместно использованным для этого. Но преимущество состоит в том, что, когда-то совместно использованный и справился, можно получить доступ к этой общей очереди на всем протяжении сети путем создания объектов прокси. Это позволит Вам назвать методы централизованного общего объекта очереди как (по-видимому) собственные методы для любого сетевого узла.
вот пример кода из документации
Возможно выполнить сервер менеджера на одной машине и сделать, чтобы клиенты использовали его от других машин (предполагающий, что включенные брандмауэры позволяют его). Выполнение следующих команд создает сервер для общей очереди, которую могут использовать удаленные клиенты:
>>> from processing.managers import BaseManager, CreatorMethod
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager):
... get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy')
...
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none')
>>> m.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from processing.managers import BaseManager, CreatorMethod
>>> class QueueManager(BaseManager):
... get_proxy = CreatorMethod(typeid='get_proxy')
...
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none')
>>> queue = m.get_proxy()
>>> queue.put('hello')
Если Вы настаиваете на безопасном потоковом материале, PEP371 (многопроцессорная обработка) ссылается на этот http://code.google.com/p/python-safethread/