ExecutorService, стандартный способ постараться не определять задачу для очереди, становящейся слишком полной

Я использую ExecutorService для простоты параллельной многопоточной программы. Возьмите следующий код:

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
    ...  
    Future<..> ... = exService.submit(..);
    ...
}

В моем случае проблема - это submit() не блокируется если все NUMBER_THREADS заняты. Последствие - то, что очередь Задачи становится лавинно рассылаемой многими задачами. Последствие этого, это закрывающее сервис выполнения с ExecutorService.shutdown() берет возрасты (ExecutorService.isTerminated() будет ложь в течение долгого времени). Причина состоит в том, что очередь задачи все еще довольно полна.

На данный момент мое обходное решение должно работать с семафорами для запрещения, чтобы иметь ко многим записям в очереди задачи ExecutorService:

...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);

while(xxx) {
    ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
    ...
    semaphore.aquire();  
    // internally the task calls a finish callback, which invokes semaphore.release()
    // -> now another task is added to queue
    Future<..> ... = exService.submit(..); 
    ...
}

Я уверен, что существует лучшее более инкапсулированное решение?

29
задан HugoTeixeira 10 October 2018 в 18:53
поделиться

3 ответа

Вам лучше создать ThreadPoolExecutor самостоятельно (что в любом случае делает Executors.newXXX()).

В конструкторе вы можете передать BlockingQueue для использования исполнителем в качестве очереди задач. Если вы передадите BlockingQueue с ограничением размера (например, LinkedBlockingQueue), это позволит добиться желаемого эффекта.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
5
ответ дан 28 November 2019 в 01:43
поделиться

Вы можете вызвать ThreadPoolExecutor.getQueue (). Size () , чтобы узнать размер очереди ожидания. Вы можете принять меры, если очередь слишком длинная. Я предлагаю запустить задачу в текущем потоке, если очередь слишком длинная, чтобы замедлить производителя (если это уместно).

6
ответ дан 28 November 2019 в 01:43
поделиться

Истинный блокирующий ThreadPoolExecutor был в списке желаний многих, в нем даже обнаружена ошибка JDC. Я столкнулся с той же проблемой и наткнулся на это: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

Это реализация BlockingThreadPoolExecutor, реализован с помощью RejectionPolicy, который использует предложение добавить задачу в очередь, ожидая, пока в очереди появится место. Это выглядит хорошо.

5
ответ дан 28 November 2019 в 01:43
поделиться
Другие вопросы по тегам:

Похожие вопросы: