очереди заданий производителя/потребителя

Я борюсь с лучшим способом реализовать мой конвейер обработки.

Мои производители подают работу к BlockingQueue. На потребительской стороне я опрашиваю очередь, обертываю то, что я вхожу в Выполнимую задачу и отправляю его ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

Это не идеально; ExecutorService имеет свою собственную очередь, конечно, поэтому что действительно происходит, то, что я всегда полностью истощаю свою очередь заданий и заполняю очередь задачи, которую медленно завершает порожняя тара как задачи.

Я понимаю, что мог поставить задачи в очередь в конце производителя, но я действительно скорее не сделаю этого - мне нравится косвенность/изоляция моей очереди заданий, являющейся немыми строками; это действительно не бизнес производителя, что собирается произойти с ними. Принуждение производителя поставить в очередь Выполнимые или Вызываемые повреждения абстракция, по моему скромному мнению.

Но я действительно хочу, чтобы общая очередь заданий представила текущее состояние обработки. Я хочу быть в состоянии заблокировать производителей, если потребители не поддерживают на высоком уровне.

Я хотел бы использовать Исполнителей, но я чувствую, что борюсь с их дизайном. Я могу частично выпить Kool-ade, или я должен проглотить его? Я являюсь заблуждающимся в сопротивлении задачам организации очередей? (Я подозреваю, что мог создать ThreadPoolExecutor для использования очереди с 1 задачей и переопределения, которое это, выполняют метод для блокирования, а не reject-on-queue-full, но это чувствует себя грубым.)

Предложения?

10
задан Jolly Roger 10 February 2010 в 00:27
поделиться

2 ответа

Я хочу, чтобы общая рабочая очередь представляла текущее состояние обработки .

Попробуйте использовать общую BlockingQueue и создать пул рабочих потоков, удаляющих рабочие элементы из очереди.

Я хочу иметь возможность блокировать производителей, если потребители не успевают.

И ArrayBlockingQueue , и LinkedBlockingQueue поддерживают ограниченные очереди, так что они будут блокироваться при размещении при заполнении. Использование методов блокировки put () гарантирует, что производители будут заблокированы, если очередь заполнена.

Вот грубое начало. Вы можете настроить количество рабочих процессов и размер очереди:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
19
ответ дан 3 December 2019 в 19:32
поделиться

Вы можете заставить своего потребителя выполнить Runnable :: run напрямую вместо запуска нового потока. Совместите это с очередью блокировки с максимальным размером, и я думаю, вы получите то, что хотите. Ваш потребитель становится работником, который выполняет задачи в оперативном режиме на основе рабочих элементов в очереди. Они будут удалять товары из очереди только тогда, когда они их обрабатывают, поэтому ваш производитель, когда ваши потребители перестанут потреблять.

0
ответ дан 3 December 2019 в 19:32
поделиться
Другие вопросы по тегам:

Похожие вопросы: