Как сразу выпустить потоки, ожидающие на BlockingQueue

Рассмотрите a BlockingQueue и несколько потоков, ожидающих на poll(long, TimeUnit) (возможно также на на take()).

Теперь очередь пуста, и она желаема, чтобы уведомить потоки ожидания, что они могут прекратить ожидать. Ожидаемое поведение состоит в том, чтобы иметь также null возвращенный или заявленное InterruptedException брошенный.

Object.notify() не будет работать на LinkedBlockingQueue поскольку потоки ожидают на внутренней блокировке.

Какой-либо простой путь?

9
задан Joel Shemtov 22 July 2010 в 07:56
поделиться

3 ответа

Javadoc для BlockQueue предлагает хороший способ:

Блокирующая очередь по своей сути не является поддержка любого вида «закрытия» или Операция «завершение работы», указывающая, что больше элементы добавляться не будут. Потребности и использование таких функций, как правило, зависит от реализации. Например Обычная тактика заключается в том, чтобы производители вставить специальный конец потока или яд интерпретируемые объекты соответственно, при приеме потребителями.

13
ответ дан 4 December 2019 в 11:03
поделиться

Обычным способом является прерывание потоков, но для этого, конечно, требуется, чтобы они правильно обрабатывали прерывания.

Это означает перехват и обработку InterruptedException в методах блокировки, а в противном случае - регулярно проверять (и действовать) флаг interrupted .

В API или спецификации языка нет ничего, что связывало бы прерывание с какой-либо конкретной семантикой отмены, но на практике использование прерывания для чего-либо, кроме отмены, хрупко и трудно поддерживать в более крупных приложениях. [...]

Обычно наиболее разумным способом отмены является прерывание.

Говорится Параллелизм Java на практике в разделе 7.1.1. Пример правильной обработки прерывания от того же (это поток-производитель, а не потребитель, но эта разница незначительна в текущем контексте):

class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /*  Allow thread to exit  */
        }
    }
    public void cancel() { interrupt(); }
}

Альтернативным решением может быть установка параметра тайм-аута для poll достаточно низким, чтобы поток регулярно просыпался и мог достаточно быстро замечать прерывания. Тем не менее, я считаю, что всегда полезно обрабатывать InterruptedException явно в соответствии с вашей конкретной политикой отмены потока.

5
ответ дан 4 December 2019 в 11:03
поделиться

Я бы сказал, что в вашей конструкции что-то не так. Потоки, потребляющие из BlockingQueue, не должны прерываться таким образом. Если они должны делать что-то еще с регулярным интервалом (например, проверять состояние переменной), одновременно потребляя из очереди, то вы должны использовать метод poll() с соответствующим таймаутом, чтобы эти два действия могли чередоваться.

1
ответ дан 4 December 2019 в 11:03
поделиться
Другие вопросы по тегам:

Похожие вопросы: