У меня есть несколько потоков, которые ждут события, выполняют какое-то действие, а затем снова ждут события. Другой поток вызовет событие, когда это будет уместно.
Я не могу найти способ гарантировать, что каждый ожидающий поток срабатывает ровно один раз после установки события. В настоящее время у меня есть триггерный поток, установил его, поспал немного, затем очистил К сожалению, это приводит к тому, что ожидающие потоки захватывают заданное событие много раз или вообще ничего.
Я не могу просто заставить инициирующий поток порождать потоки ответов, чтобы запускать их один раз, потому что они являются ответами на запросы, сделанные из других источников.
Короче говоря: в Python, как поток может установить событие и убедиться, что каждый ожидающий поток воздействует на событие ровно один раз до его очистки?
Обновление:
I ' мы пытались настроить его, используя блокировку и очередь, но это не работает. Вот что у меня есть:
# Globals - used to synch threads
waitingOnEvent = Queue.Queue
MainEvent = threading.Event()
MainEvent.clear() # Not sure this is necessary, but figured I'd be safe
mainLock = threading.Lock()
def waitCall():
mainLock.acquire()
waitingOnEvent.put("waiting")
mainLock.release()
MainEvent.wait()
waitingOnEvent.get(False)
waitingOnEvent.task_done()
#do stuff
return
def triggerCall():
mainLock.acquire()
itemsinq = waitingOnEvent.qsize()
MainEvent.set()
waitingOnEvent.join()
MainEvent.clear()
mainLock.release()
return
В первый раз itemsinq правильно отображает, сколько вызовов ожидает, но только первый ожидающий поток, который совершит вызов, пройдет через них. С этого момента itemsinq всегда равен 1, и ожидающие потоки сменяются; каждый раз, когда происходит триггерный вызов, происходит следующее.
Обновление 2
Похоже, что только один из потоков event.wait () пробуждается , и все же queue.join () работает. Это говорит мне о том, что один ожидающий поток просыпается, захватывает из очереди и вызывает task_done (), а тот единственный get () / task_done () каким-то образом очищает очередь и разрешает join (). Затем триггерный поток завершает join (), очищает событие и, таким образом, предотвращает прохождение других ожидающих потоков. Почему очередь регистрируется как пустая / завершенная только после одного вызова get / task_done?
Кажется, что просыпается только один, даже если я закомментирую queue.get () и queue.task_done () и повесю триггер, поэтому он не может очистить событие.
Вам не нужно событие, и вам не нужны и блокировка, и очередь. Все, что вам нужно, это очередь.
Вызов queue.put
, чтобы отбросить сообщение, не дожидаясь его доставки или обработки.
Вызов queue.get
в рабочем потоке, чтобы дождаться прибытия сообщения.
import threading
import Queue
active_queues = []
class Worker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.mailbox = Queue.Queue()
active_queues.append(self.mailbox)
def run(self):
while True:
data = self.mailbox.get()
if data == 'shutdown':
print self, 'shutting down'
return
print self, 'received a message:', data
def stop(self):
active_queues.remove(self.mailbox)
self.mailbox.put("shutdown")
self.join()
def broadcast_event(data):
for q in active_queues:
q.put(data)
t1 = Worker()
t2 = Worker()
t1.start()
t2.start()
broadcast_event("first event")
broadcast_event("second event")
broadcast_event("shutdown")
t1.stop()
t2.stop()
Сообщения не обязательно должны быть строками; они могут быть любым объектом Python.
Если вам нужны дискретные, атомарные события, которые могут последовательно обрабатываться каждым потоком, сделайте так, как предлагали krs1 и bot403, и используйте очередь. Класс Python Queue
синхронизирован - вам не нужно беспокоиться о блокировке, чтобы использовать его.
Если, однако, ваши потребности проще (событие сообщает вам, что у вас есть данные, доступные для чтения и т. Д.), Вы можете подписаться / зарегистрировать свои потоки в качестве наблюдателей объекта, ответственного за запуск событий. Этот объект будет поддерживать список объектов наблюдателя threading.Event
. Затем с помощью триггера он может вызвать set () для всех объектов threading.Event
в списке.
Я не программист на Python, но если событие может быть обработано только один раз, возможно, вам нужно переключиться в очередь сообщений с соответствующей блокировкой, чтобы, когда один поток просыпается и получает сообщение о событии, он обрабатывает его и удаляет это из очереди, поэтому его нет, если другие потоки просыпаются и смотрят в очередь.
Одно из решений, которое я использовал в прошлом, - это класс Queue для межпотоковой связи. Он является потокобезопасным и может использоваться для упрощения взаимодействия между потоками при использовании как многопроцессорной, так и поточной библиотеки. У вас могут быть дочерние потоки, ожидающие чего-то для ввода в очередь, а затем обрабатывать новую запись. Класс Queue также имеет метод get (), который принимает удобный блокирующий аргумент.