Который ThreadPool в Java я должен использовать?

Существует огромное количество задач. Каждая задача, принадлежат единственной группе. Требование является каждой группой задач, должен выполняемый последовательно, точно так же, как выполняется в единственном потоке, и пропускная способность должна быть максимизирована в многоядерном (или мульти-CPU) среда.Примечание: существует также огромная сумма групп, которая пропорциональна количеству задач.

Наивное решение использует ThreadPoolExecutor, и синхронизируйтесь (или блокировка). Однако потоки заблокировали бы друг друга, и пропускная способность не максимизируется.

Какая-либо лучшая идея? Или есть ли, существуют, сторонняя библиотека удовлетворяет требование?

8
задан James 14 October 2010 в 08:33
поделиться

3 ответа

Простым подходом было бы «объединить» все групповые задачи в одну суперзадачу, чтобы подзадачи выполнялись последовательно. Но это, вероятно, вызовет задержку в других группах, которая не запустится, если какая-то другая группа полностью не завершит работу и не освободит место в пуле потоков.

В качестве альтернативы рассмотрите возможность объединения задач группы в цепочку. Следующий код иллюстрирует это:

public class MultiSerialExecutor {
    private final ExecutorService executor;

    public MultiSerialExecutor(int maxNumThreads) {
        executor = Executors.newFixedThreadPool(maxNumThreads);
    }

    public void addTaskSequence(List<Runnable> tasks) {
        executor.execute(new TaskChain(tasks));
    }

    private void shutdown() {
        executor.shutdown();
    }

    private class TaskChain implements Runnable {
        private List<Runnable> seq;
        private int ind;

        public TaskChain(List<Runnable> seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            seq.get(ind++).run(); //NOTE: No special error handling
            if (ind < seq.size())
                executor.execute(this);
        }       
    }

Преимущество состоит в том, что не используется дополнительный ресурс (поток / очередь) и что степень детализации задач лучше, чем в наивном подходе. Недостатком является то, что все задачи группы должны быть известны заранее .

- править -

Чтобы сделать это решение универсальным и законченным, вы можете решить, как обрабатывать ошибки (то есть будет ли цепочка продолжаться даже в случае возникновения ошибки), а также было бы неплохо реализовать ExecutorService и делегируйте все вызовы базовому исполнителю.

3
ответ дан 6 December 2019 в 00:04
поделиться

Я в основном согласен с ответом Дэйва, но если вам нужно нарезать процессор время во всех «группах», т.е. все группы задач должны развиваться параллельно, вы можете найти эту конструкцию полезной (с использованием удаления как «блокировки». В моем случае это сработало нормально, хотя я предполагаю, что она обычно использует больше памяти):

class TaskAllocator {
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
         = childQueuePerTaskGroup();

    public Queue<Runnable> lockTaskGroup(){
        return entireWork.poll();
    }

    public void release(Queue<Runnable> taskGroup){
        entireWork.offer(taskGroup);
    }
 }

и

 class DoWork implmements Runnable {
     private final TaskAllocator allocator;

     public DoWork(TaskAllocator allocator){
         this.allocator = allocator;
     }

     pubic void run(){
        for(;;){
            Queue<Runnable> taskGroup = allocator.lockTaskGroup();
            if(task==null){
                //No more work
                return;
            }
            Runnable work = taskGroup.poll();
            if(work == null){
                //This group is done
                continue;
            }

            //Do work, but never forget to release the group to 
            // the allocator.
            try {
                work.run();
            } finally {
                allocator.release(taskGroup);
            }
        }//for
     }
 }

Затем вы можете использовать оптимальное количество потоков для выполнения задачи DoWork . Это своего рода циклический перебор балансировки нагрузки.

Вы можете даже сделать что-то более сложное, используя это вместо простой очереди в TaskAllocator (группы задач с большим количеством оставшихся задач, как правило, выполняются)

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator());

​​где SophisticatedComparator равно

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
        int diff = o2.size() - o1.size();
        if(diff==0){
             //This is crucial. You must assign unique ids to your 
             //Subqueue and break the equality if they happen to have same size.
             //Otherwise your queues will disappear...
             return o1.id - o2.id;
        }
        return diff;
    }
 }
1
ответ дан 6 December 2019 в 00:04
поделиться

Я бы предложил использовать очереди задач:

  • Для каждой группы задач Вы создаете очередь и вставляете в нее все задачи из этой группы.
  • Теперь все ваши очереди могут выполняться параллельно, в то время как задачи внутри одной очереди выполняются последовательно.

Быстрый поиск в Google показывает, что в java api нет очередей задач/потоков. Однако есть много учебников по кодированию очередей. Все не стесняйтесь перечислить хорошие учебники / реализации, если вы знаете некоторые:

2
ответ дан 6 December 2019 в 00:04
поделиться
Другие вопросы по тегам:

Похожие вопросы: