ExecutorService, как ожидать всех задач закончиться

Что является самым простым путем к ожидать всех задач ExecutorService закончиться? Моя задача, прежде всего, вычислительна, таким образом, я просто хочу выполнить большое количество заданий - один на каждом ядре. Прямо сейчас моя установка похожа на это:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask выполнимые реализации. Это, кажется, выполняет задачи правильно, но катастрофические отказы кода на wait() с IllegalMonitorStateException. Это нечетно, потому что я играл вокруг с некоторыми игрушечными примерами, и это, казалось, работало.

uniquePhrases содержит несколько десятков тысяч элементов. Я должен использовать другой метод? Я ищу что-то максимально простое

186
задан gstackoverflow 16 February 2017 в 03:37
поделиться

5 ответов

Если вы хотите дождаться завершения выполнения службы исполнителя, вызовите shutdown () , а затем awaitTermination (units, unitType) , например awaitTermination (1, МИНУТА) . ExecutorService не блокирует свой собственный монитор, поэтому вы не можете использовать wait и т. Д.

11
ответ дан 23 November 2019 в 05:49
поделиться

Если вы хотите дождаться завершения всех задач, используйте метод shutdown вместо wait . Затем введите awaitTermination .

Кроме того, вы можете использовать Runtime.availableProcessors , чтобы получить количество аппаратных потоков, чтобы вы могли правильно инициализировать пул потоков.

57
ответ дан 23 November 2019 в 05:49
поделиться

Самый простой подход - использовать ExecutorService.invokeAll () , который делает то, что вы хотите в однострочном письме. Выражаясь языком, вам нужно будет изменить или обернуть ComputeDTask для реализации Callable <> , что может дать вам немного больше гибкости. Возможно, в вашем приложении есть значимая реализация Callable.call () , но вот способ обернуть ее, если не используется Executors.callable () .

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

Как отмечали другие, при необходимости вы можете использовать версию invokeAll () с тайм-аутом. В этом примере ответы будут содержать группу Future , которые будут возвращать нули (см. Определение Executors.callable () . Вероятно, то, что вы хотите нужно сделать небольшой рефакторинг, чтобы вы могли получить полезный ответ, или ссылку на лежащую в основе ComputeDTask , но я не могу сказать по вашему примеру.

Если это непонятно, обратите внимание что invokeAll () не вернется, пока все задачи не будут выполнены. (то есть все Future в вашей коллекции ответов будут сообщать .isDone () , если спросят.) Это позволяет избежать ручного завершения работы, ожидания завершения и т. Д. И позволяет при желании повторно использовать эту ExecutorService аккуратно для нескольких циклов.

Есть несколько связанных вопросов по SO:

Ни один из них не относится к вашему вопросу, но они дают представление о том, как люди думают Executor / ExecutorService следует использовать.

205
ответ дан 23 November 2019 в 05:49
поделиться

Вы можете ждать завершения задания в течение определенного интервала:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Или вы можете использовать ExecutorService . отправить ( Runnable ) и собрать возвращаемые им объекты Future и вызвать get () для каждого по очереди, чтобы дождаться их выполнения финиш.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException чрезвычайно важно обрабатывать правильно. Это то, что позволяет вам или пользователям вашей библиотеки безопасно завершить длительный процесс.

7
ответ дан 23 November 2019 в 05:49
поделиться

Если ожидание завершения всех задач в ExecutorService не является именно вашей целью, а скорее ожидание завершения определенной партии задач, вы можете использовать CompletionService - в частности, ExecutorCompletionService.

Идея заключается в том, чтобы создать ExecutorCompletionService, обернув в него Executor, отправить некоторое известное количество задач через CompletionService, затем извлечь такое же количество результатов из очереди завершения, используя либо take() (который блокирует), либо poll() (который не блокирует). Как только вы нарисовали все ожидаемые результаты, соответствующие представленным вами задачам, вы знаете, что все они выполнены.

Позвольте мне сказать это еще раз, потому что это не очевидно из интерфейса: Вы должны знать, сколько вещей вы положили в CompletionService, чтобы знать, сколько вещей пытаться вытащить. Это особенно важно для метода take(): вызовите его слишком много раз, и он заблокирует вызывающий поток, пока какой-нибудь другой поток не отправит другое задание в тот же CompletionService.

В книге Java Concurrency in Practice есть несколько примеров использования CompletionService .

48
ответ дан 23 November 2019 в 05:49
поделиться
Другие вопросы по тегам:

Похожие вопросы: