Мне нужно реализовать пул потоков в Java (java.util.concurrent), количество потоков которого находится на некотором минимальном значении в режиме ожидания, растет до верхней границы (но никогда больше), когда задания отправляются в него быстрее, чем они завершают выполнение, и сжимается до нижней границы, когда все задания выполнены и больше не отправляется заданий.
Как бы вы реализовали нечто подобное? Я предполагаю, что это был бы довольно распространенный сценарий использования, но очевидно, что фабричные методы java.util.concurrent.Executors
могут создавать только пулы фиксированного размера и пулы, которые неограниченно увеличиваются при отправке большого количества заданий. Класс ThreadPoolExecutor
предоставляет параметры corePoolSize
и maxPoolSize
, но его документация, по-видимому, подразумевает, что единственный способ когда-либо иметь больше потоков, чем corePoolSize
. в то же время заключается в использовании ограниченной очереди заданий, и в этом случае, если вы достигли maxPoolSize
потоков, вы получите отказы в работе, с которыми вам придется иметь дело самостоятельно? Я придумал следующее:
//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(minSize));
...
//submitting jobs
for (Runnable job : ...) {
while (true) {
try {
pool.submit(job);
System.out.println("Job " + job + ": submitted");
break;
} catch (RejectedExecutionException e) {
// maxSize jobs executing concurrently atm.; re-submit new job after short wait
System.out.println("Job " + job + ": rejected...");
try {
Thread.sleep(300);
} catch (InterruptedException e1) {
}
}
}
}
Я что-то упускаю из виду? Есть лучший способ сделать это? Кроме того, в зависимости от требований, может быть проблематично, что приведенный выше код не завершится, по крайней мере, (я думаю) (общее количество заданий) - maxSize
заданий не будет завершено.Поэтому, если вы хотите иметь возможность отправить произвольное количество заданий в пул и продолжить работу немедленно, не дожидаясь завершения любого из них, я не понимаю, как вы могли бы сделать это, не имея выделенного потока «суммирования заданий», который управляет необходимая неограниченная очередь для хранения всех отправленных заданий. AFAICS, если вы используете неограниченную очередь для самого ThreadPoolExecutor, количество его потоков никогда не превысит значение corePoolSize.