Python threading.Event () - Обеспечение пробуждения всех ожидающих потоков на event.set ()

У меня есть несколько потоков, которые ждут события, выполняют какое-то действие, а затем снова ждут события. Другой поток вызовет событие, когда это будет уместно.

Я не могу найти способ гарантировать, что каждый ожидающий поток срабатывает ровно один раз после установки события. В настоящее время у меня есть триггерный поток, установил его, поспал немного, затем очистил К сожалению, это приводит к тому, что ожидающие потоки захватывают заданное событие много раз или вообще ничего.

Я не могу просто заставить инициирующий поток порождать потоки ответов, чтобы запускать их один раз, потому что они являются ответами на запросы, сделанные из других источников.

Короче говоря: в Python, как поток может установить событие и убедиться, что каждый ожидающий поток воздействует на событие ровно один раз до его очистки?

Обновление:

I ' мы пытались настроить его, используя блокировку и очередь, но это не работает. Вот что у меня есть:

# Globals - used to synch threads
waitingOnEvent = Queue.Queue
MainEvent = threading.Event()
MainEvent.clear()    # Not sure this is necessary, but figured I'd be safe
mainLock = threading.Lock()

def waitCall():
    mainLock.acquire()
    waitingOnEvent.put("waiting")
    mainLock.release()
    MainEvent.wait()
    waitingOnEvent.get(False)
    waitingOnEvent.task_done()
    #do stuff
    return

def triggerCall():
    mainLock.acquire()
    itemsinq = waitingOnEvent.qsize()
    MainEvent.set()
    waitingOnEvent.join()
    MainEvent.clear()
    mainLock.release()
    return

В первый раз itemsinq правильно отображает, сколько вызовов ожидает, но только первый ожидающий поток, который совершит вызов, пройдет через них. С этого момента itemsinq всегда равен 1, и ожидающие потоки сменяются; каждый раз, когда происходит триггерный вызов, происходит следующее.

Обновление 2 Похоже, что только один из потоков event.wait () пробуждается , и все же queue.join () работает. Это говорит мне о том, что один ожидающий поток просыпается, захватывает из очереди и вызывает task_done (), а тот единственный get () / task_done () каким-то образом очищает очередь и разрешает join (). Затем триггерный поток завершает join (), очищает событие и, таким образом, предотвращает прохождение других ожидающих потоков. Почему очередь регистрируется как пустая / завершенная только после одного вызова get / task_done?

Кажется, что просыпается только один, даже если я закомментирую queue.get () и queue.task_done () и повесю триггер, поэтому он не может очистить событие.

10
задан Jason Orendorff 10 August 2010 в 15:55
поделиться

4 ответа

Вам не нужно событие, и вам не нужны и блокировка, и очередь. Все, что вам нужно, это очередь.

Вызов queue.put , чтобы отбросить сообщение, не дожидаясь его доставки или обработки.

Вызов queue.get в рабочем потоке, чтобы дождаться прибытия сообщения.

import threading
import Queue

active_queues = []

class Worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            data = self.mailbox.get()
            if data == 'shutdown':
                print self, 'shutting down'
                return
            print self, 'received a message:', data

    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")
        self.join()


def broadcast_event(data):
    for q in active_queues:
        q.put(data)

t1 = Worker()
t2 = Worker()
t1.start()
t2.start()
broadcast_event("first event")
broadcast_event("second event")
broadcast_event("shutdown")

t1.stop()
t2.stop()

Сообщения не обязательно должны быть строками; они могут быть любым объектом Python.

11
ответ дан 3 December 2019 в 21:19
поделиться

Если вам нужны дискретные, атомарные события, которые могут последовательно обрабатываться каждым потоком, сделайте так, как предлагали krs1 и bot403, и используйте очередь. Класс Python Queue синхронизирован - вам не нужно беспокоиться о блокировке, чтобы использовать его.

Если, однако, ваши потребности проще (событие сообщает вам, что у вас есть данные, доступные для чтения и т. Д.), Вы можете подписаться / зарегистрировать свои потоки в качестве наблюдателей объекта, ответственного за запуск событий. Этот объект будет поддерживать список объектов наблюдателя threading.Event . Затем с помощью триггера он может вызвать set () для всех объектов threading.Event в списке.

3
ответ дан 3 December 2019 в 21:19
поделиться

Я не программист на Python, но если событие может быть обработано только один раз, возможно, вам нужно переключиться в очередь сообщений с соответствующей блокировкой, чтобы, когда один поток просыпается и получает сообщение о событии, он обрабатывает его и удаляет это из очереди, поэтому его нет, если другие потоки просыпаются и смотрят в очередь.

1
ответ дан 3 December 2019 в 21:19
поделиться

Одно из решений, которое я использовал в прошлом, - это класс Queue для межпотоковой связи. Он является потокобезопасным и может использоваться для упрощения взаимодействия между потоками при использовании как многопроцессорной, так и поточной библиотеки. У вас могут быть дочерние потоки, ожидающие чего-то для ввода в очередь, а затем обрабатывать новую запись. Класс Queue также имеет метод get (), который принимает удобный блокирующий аргумент.

2
ответ дан 3 December 2019 в 21:19
поделиться
Другие вопросы по тегам:

Похожие вопросы: