Блокирующая очередь и многопоточный потребитель, как узнать, когда остановиться

У меня есть однопоточный производитель, который создает некоторые объекты задач, которые затем добавляются в ArrayBlockingQueue (который имеет фиксированный размер).

Я также запускаю многопоточного потребителя. Он создается как фиксированный пул потоков (Executors.newFixedThreadPool(threadCount);). Затем я отправляю несколько инстансов ConsumerWorker в этот пул потоков, каждый ConsumerWorker имеет референс к вышеупомянутому экземпляру ArrayBlockingQueue.

Каждый такой Worker будет выполнять take() на очереди и решать задачу.

Моя проблема заключается в том, как лучше всего сделать так, чтобы рабочий знал, когда больше не будет никакой работы. Другими словами, как мне сообщить рабочим, что производитель закончил добавление в очередь, и с этого момента каждый рабочий должен остановиться, когда увидит, что очередь пуста.

Сейчас у меня есть установка, в которой мой Producer инициализируется с обратным вызовом, который срабатывает, когда он заканчивает свою работу (добавление материала в очередь). Я также веду список всех ConsumerWorkers, которые я создал и отправил в ThreadPool. Когда обратный вызов Producer Callback сообщает мне, что производитель закончил, я могу сообщить об этом каждому из рабочих. В этот момент они должны просто продолжать проверять, не пуста ли очередь, и когда она становится пустой, они должны остановиться, что позволит мне изящно закрыть пул потоков ExecutorService. Это выглядит примерно так

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

Проблема здесь в том, что у меня есть очевидное состояние гонки, когда иногда производитель завершает работу, сигнализирует об этом, а ConsumerWorkers останавливаются ДО того, как потребляют все, что есть в очереди.

Мой вопрос в том, как лучше всего синхронизировать это, чтобы все работало нормально? Должен ли я синхронизировать всю часть, где он проверяет, запущен ли производитель, и если очередь пуста, и взять что-то из очереди в одном блоке (на объекте очереди)? Должен ли я просто синхронизировать обновление булева isRunning на экземпляре ConsumerWorker? Любые другие предложения?

ОБНОВЛЕНО, вот рабочий вариант, который я в итоге использовал:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

Это было вдохновлено ответом JohnVint'а ниже, с небольшими изменениями.

=== Обновление в связи с комментарием @vendhan.

Спасибо за ваше замечание. Вы правы, первый фрагмент кода в этом вопросе имеет (среди прочих проблем) ту, где while(isRunning || !inputQueue.isEmpty()) не имеет смысла.

В моей фактической окончательной реализации этого, я делаю что-то, что ближе к вашему предложению о замене "||" (или) на "&&" (и), в том смысле, что каждый рабочий (потребитель) теперь только проверяет, является ли элемент, который он получил из списка, ядовитой пилюлей, и если да, то останавливается (поэтому теоретически мы можем сказать, что рабочий должен быть запущен И очередь не должна быть пустой).

66
задан Shivan Dragon 25 April 2013 в 19:07
поделиться