У меня есть однопоточный производитель, который создает некоторые объекты задач, которые затем добавляются в ArrayBlockingQueue
(который имеет фиксированный размер).
Я также запускаю многопоточного потребителя. Он создается как фиксированный пул потоков (Executors.newFixedThreadPool(threadCount);
). Затем я отправляю несколько инстансов ConsumerWorker в этот пул потоков, каждый ConsumerWorker имеет референс к вышеупомянутому экземпляру ArrayBlockingQueue.
Каждый такой Worker будет выполнять take()
на очереди и решать задачу.
Моя проблема заключается в том, как лучше всего сделать так, чтобы рабочий знал, когда больше не будет никакой работы. Другими словами, как мне сообщить рабочим, что производитель закончил добавление в очередь, и с этого момента каждый рабочий должен остановиться, когда увидит, что очередь пуста.
Сейчас у меня есть установка, в которой мой Producer инициализируется с обратным вызовом, который срабатывает, когда он заканчивает свою работу (добавление материала в очередь). Я также веду список всех ConsumerWorkers, которые я создал и отправил в ThreadPool. Когда обратный вызов Producer Callback сообщает мне, что производитель закончил, я могу сообщить об этом каждому из рабочих. В этот момент они должны просто продолжать проверять, не пуста ли очередь, и когда она становится пустой, они должны остановиться, что позволит мне изящно закрыть пул потоков ExecutorService. Это выглядит примерно так
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
Проблема здесь в том, что у меня есть очевидное состояние гонки, когда иногда производитель завершает работу, сигнализирует об этом, а ConsumerWorkers останавливаются ДО того, как потребляют все, что есть в очереди.
Мой вопрос в том, как лучше всего синхронизировать это, чтобы все работало нормально? Должен ли я синхронизировать всю часть, где он проверяет, запущен ли производитель, и если очередь пуста, и взять что-то из очереди в одном блоке (на объекте очереди)? Должен ли я просто синхронизировать обновление булева isRunning
на экземпляре ConsumerWorker? Любые другие предложения?
ОБНОВЛЕНО, вот рабочий вариант, который я в итоге использовал:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Это было вдохновлено ответом JohnVint'а ниже, с небольшими изменениями.
=== Обновление в связи с комментарием @vendhan.
Спасибо за ваше замечание. Вы правы, первый фрагмент кода в этом вопросе имеет (среди прочих проблем) ту, где while(isRunning || !inputQueue.isEmpty())
не имеет смысла.
В моей фактической окончательной реализации этого, я делаю что-то, что ближе к вашему предложению о замене "||" (или) на "&&" (и), в том смысле, что каждый рабочий (потребитель) теперь только проверяет, является ли элемент, который он получил из списка, ядовитой пилюлей, и если да, то останавливается (поэтому теоретически мы можем сказать, что рабочий должен быть запущен И очередь не должна быть пустой).