Я использую ExecutorService
для простоты параллельной многопоточной программы. Возьмите следующий код:
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
Future<..> ... = exService.submit(..);
...
}
В моем случае проблема - это submit()
не блокируется если все NUMBER_THREADS
заняты. Последствие - то, что очередь Задачи становится лавинно рассылаемой многими задачами. Последствие этого, это закрывающее сервис выполнения с ExecutorService.shutdown()
берет возрасты (ExecutorService.isTerminated()
будет ложь в течение долгого времени). Причина состоит в том, что очередь задачи все еще довольно полна.
На данный момент мое обходное решение должно работать с семафорами для запрещения, чтобы иметь ко многим записям в очереди задачи ExecutorService
:
...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);
while(xxx) {
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
semaphore.aquire();
// internally the task calls a finish callback, which invokes semaphore.release()
// -> now another task is added to queue
Future<..> ... = exService.submit(..);
...
}
Я уверен, что существует лучшее более инкапсулированное решение?
Вам лучше создать ThreadPoolExecutor самостоятельно (что в любом случае делает Executors.newXXX()).
В конструкторе вы можете передать BlockingQueue для использования исполнителем в качестве очереди задач. Если вы передадите BlockingQueue с ограничением размера (например, LinkedBlockingQueue), это позволит добиться желаемого эффекта.
ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
Вы можете вызвать ThreadPoolExecutor.getQueue (). Size ()
, чтобы узнать размер очереди ожидания. Вы можете принять меры, если очередь слишком длинная. Я предлагаю запустить задачу в текущем потоке, если очередь слишком длинная, чтобы замедлить производителя (если это уместно).
Истинный блокирующий ThreadPoolExecutor был в списке желаний многих, в нем даже обнаружена ошибка JDC. Я столкнулся с той же проблемой и наткнулся на это: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html
Это реализация BlockingThreadPoolExecutor, реализован с помощью RejectionPolicy, который использует предложение добавить задачу в очередь, ожидая, пока в очереди появится место. Это выглядит хорошо.