Что является самым простым путем к ожидать всех задач ExecutorService
закончиться? Моя задача, прежде всего, вычислительна, таким образом, я просто хочу выполнить большое количество заданий - один на каждом ядре. Прямо сейчас моя установка похожа на это:
ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {
es.execute(new ComputeDTask(singleTable));
}
try{
es.wait();
}
catch (InterruptedException e){
e.printStackTrace();
}
ComputeDTask
выполнимые реализации. Это, кажется, выполняет задачи правильно, но катастрофические отказы кода на wait()
с IllegalMonitorStateException
. Это нечетно, потому что я играл вокруг с некоторыми игрушечными примерами, и это, казалось, работало.
uniquePhrases
содержит несколько десятков тысяч элементов. Я должен использовать другой метод? Я ищу что-то максимально простое
Если вы хотите дождаться завершения выполнения службы исполнителя, вызовите shutdown ()
, а затем awaitTermination (units, unitType) , например awaitTermination (1, МИНУТА)
. ExecutorService не блокирует свой собственный монитор, поэтому вы не можете использовать wait
и т. Д.
Если вы хотите дождаться завершения всех задач, используйте метод shutdown
вместо wait
. Затем введите awaitTermination
.
Кроме того, вы можете использовать Runtime.availableProcessors
, чтобы получить количество аппаратных потоков, чтобы вы могли правильно инициализировать пул потоков.
Самый простой подход - использовать ExecutorService.invokeAll ()
, который делает то, что вы хотите в однострочном письме. Выражаясь языком, вам нужно будет изменить или обернуть ComputeDTask
для реализации Callable <>
, что может дать вам немного больше гибкости. Возможно, в вашем приложении есть значимая реализация Callable.call ()
, но вот способ обернуть ее, если не используется Executors.callable ()
.
ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());
for (DataTable singleTable: uniquePhrases) {
todo.add(Executors.callable(new ComputeDTask(singleTable)));
}
List<Future<Object>> answers = es.invokeAll(todo);
Как отмечали другие, при необходимости вы можете использовать версию invokeAll ()
с тайм-аутом. В этом примере ответы
будут содержать группу Future
, которые будут возвращать нули (см. Определение Executors.callable ()
. Вероятно, то, что вы хотите нужно сделать небольшой рефакторинг, чтобы вы могли получить полезный ответ, или ссылку на лежащую в основе ComputeDTask
, но я не могу сказать по вашему примеру.
Если это непонятно, обратите внимание что invokeAll ()
не вернется, пока все задачи не будут выполнены. (то есть все Future
в вашей коллекции ответов
будут сообщать .isDone ()
, если спросят.) Это позволяет избежать ручного завершения работы, ожидания завершения и т. Д. И позволяет при желании повторно использовать эту ExecutorService
аккуратно для нескольких циклов.
Есть несколько связанных вопросов по SO:
Ни один из них не относится к вашему вопросу, но они дают представление о том, как люди думают Executor
/ ExecutorService
следует использовать.
Вы можете ждать завершения задания в течение определенного интервала:
int maxSecondsPerComputeDTask = 20;
try {
while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
// consider giving up with a 'break' statement under certain conditions
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Или вы можете использовать ExecutorService . отправить ( Runnable ) и собрать возвращаемые им объекты Future и вызвать get () для каждого по очереди, чтобы дождаться их выполнения финиш.
ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
InterruptedException чрезвычайно важно обрабатывать правильно. Это то, что позволяет вам или пользователям вашей библиотеки безопасно завершить длительный процесс.
Если ожидание завершения всех задач в ExecutorService
не является именно вашей целью, а скорее ожидание завершения определенной партии задач, вы можете использовать CompletionService
- в частности, ExecutorCompletionService
.
Идея заключается в том, чтобы создать ExecutorCompletionService
, обернув в него Executor
, отправить некоторое известное количество задач через CompletionService
, затем извлечь такое же количество результатов из очереди завершения, используя либо take()
(который блокирует), либо poll()
(который не блокирует). Как только вы нарисовали все ожидаемые результаты, соответствующие представленным вами задачам, вы знаете, что все они выполнены.
Позвольте мне сказать это еще раз, потому что это не очевидно из интерфейса: Вы должны знать, сколько вещей вы положили в CompletionService
, чтобы знать, сколько вещей пытаться вытащить. Это особенно важно для метода take()
: вызовите его слишком много раз, и он заблокирует вызывающий поток, пока какой-нибудь другой поток не отправит другое задание в тот же CompletionService
.
В книге Java Concurrency in Practice есть несколько примеров использования CompletionService
.