Существует огромное количество задач. Каждая задача, принадлежат единственной группе. Требование является каждой группой задач, должен выполняемый последовательно, точно так же, как выполняется в единственном потоке, и пропускная способность должна быть максимизирована в многоядерном (или мульти-CPU) среда.Примечание: существует также огромная сумма групп, которая пропорциональна количеству задач.
Наивное решение использует ThreadPoolExecutor, и синхронизируйтесь (или блокировка). Однако потоки заблокировали бы друг друга, и пропускная способность не максимизируется.
Какая-либо лучшая идея? Или есть ли, существуют, сторонняя библиотека удовлетворяет требование?
Простым подходом было бы «объединить» все групповые задачи в одну суперзадачу, чтобы подзадачи выполнялись последовательно. Но это, вероятно, вызовет задержку в других группах, которая не запустится, если какая-то другая группа полностью не завершит работу и не освободит место в пуле потоков.
В качестве альтернативы рассмотрите возможность объединения задач группы в цепочку. Следующий код иллюстрирует это:
public class MultiSerialExecutor {
private final ExecutorService executor;
public MultiSerialExecutor(int maxNumThreads) {
executor = Executors.newFixedThreadPool(maxNumThreads);
}
public void addTaskSequence(List<Runnable> tasks) {
executor.execute(new TaskChain(tasks));
}
private void shutdown() {
executor.shutdown();
}
private class TaskChain implements Runnable {
private List<Runnable> seq;
private int ind;
public TaskChain(List<Runnable> seq) {
this.seq = seq;
}
@Override
public void run() {
seq.get(ind++).run(); //NOTE: No special error handling
if (ind < seq.size())
executor.execute(this);
}
}
Преимущество состоит в том, что не используется дополнительный ресурс (поток / очередь) и что степень детализации задач лучше, чем в наивном подходе. Недостатком является то, что все задачи группы должны быть известны заранее .
- править -
Чтобы сделать это решение универсальным и законченным, вы можете решить, как обрабатывать ошибки (то есть будет ли цепочка продолжаться даже в случае возникновения ошибки), а также было бы неплохо реализовать ExecutorService и делегируйте все вызовы базовому исполнителю.
Я в основном согласен с ответом Дэйва, но если вам нужно нарезать процессор время во всех «группах», т.е. все группы задач должны развиваться параллельно, вы можете найти эту конструкцию полезной (с использованием удаления как «блокировки». В моем случае это сработало нормально, хотя я предполагаю, что она обычно использует больше памяти):
class TaskAllocator {
private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
= childQueuePerTaskGroup();
public Queue<Runnable> lockTaskGroup(){
return entireWork.poll();
}
public void release(Queue<Runnable> taskGroup){
entireWork.offer(taskGroup);
}
}
и
class DoWork implmements Runnable {
private final TaskAllocator allocator;
public DoWork(TaskAllocator allocator){
this.allocator = allocator;
}
pubic void run(){
for(;;){
Queue<Runnable> taskGroup = allocator.lockTaskGroup();
if(task==null){
//No more work
return;
}
Runnable work = taskGroup.poll();
if(work == null){
//This group is done
continue;
}
//Do work, but never forget to release the group to
// the allocator.
try {
work.run();
} finally {
allocator.release(taskGroup);
}
}//for
}
}
Затем вы можете использовать оптимальное количество потоков для выполнения задачи DoWork
. Это своего рода циклический перебор балансировки нагрузки.
Вы можете даже сделать что-то более сложное, используя это вместо простой очереди в TaskAllocator
(группы задач с большим количеством оставшихся задач, как правило, выполняются)
ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue =
new ConcurrentSkipListSet(new SophisticatedComparator());
где SophisticatedComparator
равно
class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
int diff = o2.size() - o1.size();
if(diff==0){
//This is crucial. You must assign unique ids to your
//Subqueue and break the equality if they happen to have same size.
//Otherwise your queues will disappear...
return o1.id - o2.id;
}
return diff;
}
}
Я бы предложил использовать очереди задач:
Быстрый поиск в Google показывает, что в java api нет очередей задач/потоков. Однако есть много учебников по кодированию очередей. Все не стесняйтесь перечислить хорошие учебники / реализации, если вы знаете некоторые: