Управление порядком выполнения Задачи с ExecutorService

У меня есть процесс, который делегирует асинхронные задачи к пулу потоков. Я должен удостовериться, что определенные задачи выполняются в порядке. Так, например,

Задачи прибывают в порядок

Задачи a1, b1, c1, d1, e1, a2, a3, b2, f1

Задачи могут быть выполнены в любом порядке, кроме того, где существует естественная зависимость, таким образом, a1, a2, a3 должен быть обработан в том порядке или выделяющий тому же потоку или блокирующий их, пока я не знаю, что предыдущая a# задача была выполнена.

В настоящее время это не использует пакет Параллелизма Java, но я полагаю, что изменение берет avantage управления потоком.

Делает у любого есть аналогичное решение или предложения того, как достигнуть этого

31
задан Wiretap 28 January 2010 в 10:08
поделиться

3 ответа

При подаче Runnable или Callable в ExecutorService вы получаете взамен Future. Пусть потоки, зависящие от a1, будут переданы a1's Future и вызовут Future.get(). Это будет блокироваться до тех пор, пока поток не завершит свою работу.

Итак:

ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
  @Override
  public void run() {
    f1.get();
    ... // do stuff
  }
}
exec.submit(a2);

и так далее.

3
ответ дан 27 November 2019 в 22:43
поделиться

Другой вариант — создать собственный исполнитель, назвать его OrderedExecutor и создать массив инкапсулированных объектов ThreadPoolExecutor с 1 потоком на каждого внутреннего исполнителя. Затем вы предоставляете механизм для выбора одного из внутренних объектов, например, вы можете сделать это, предоставив интерфейс, который может реализовать пользователь вашего класса:

executor = new OrderedExecutor( 10 /* pool size */, new OrderedExecutor.Chooser() {
  public int choose( Runnable runnable ) {
     MyRunnable myRunnable = (MyRunnable)runnable;
     return myRunnable.someId();
  });

executor.execute( new MyRunnable() );

Реализация OrderedExecutor.execute() будет использовать Chooser для получения int, вы изменяете это с размером пула, и это ваш индекс во внутреннем массиве. Идея состоит в том, что "someId()" будет возвращать одно и то же значение для всех "а" и т. д.

2
ответ дан 27 November 2019 в 22:43
поделиться

Когда я делал это в прошлом, у меня обычно порядок обрабатывался компонентом, который затем отправлял вызываемые/выполняемые объекты исполнителю.

Что-то вроде.

  • Получен список задач для запуска, некоторые с зависимостями.
  • Создание Executor и обертка с помощью ExecutorCompletionService.
  • Поиск всех задач, любых без зависимостей, планирование их с помощью службы завершения.
  • Опрос службы завершения.
  • По мере выполнения каждой задачи
    • Добавить его в список "завершенных"
    • Переоценить все ожидающие задачи по отношению к "списку завершенных", чтобы увидеть, являются ли они "завершенными зависимостями". Если это так, запланируйте их
    • Повторять полоскание до тех пор, пока все задачи не будут отправлены/выполнены.

Служба завершения — это хороший способ получить задачи по мере их выполнения, а не пытаться опросить кучу фьючерсов. Однако вы, вероятно, захотите сохранить Map, которая заполняется, когда задача назначается через службу завершения, чтобы, когда служба завершения дает вам завершенное будущее, вы могли выяснить, какое TaskIdentifier это так.

Если вы когда-либо оказывались в состоянии, когда задачи все еще ожидают запуска, но ничего не выполняется и ничего не может быть запланировано, то у вас есть проблема циклической зависимости.

14
ответ дан 27 November 2019 в 22:43
поделиться
Другие вопросы по тегам:

Похожие вопросы: