Создание динамического (растущего/сокращающегося) пула потоков

Мне нужно реализовать пул потоков в Java (java.util.concurrent), количество потоков которого находится на некотором минимальном значении в режиме ожидания, растет до верхней границы (но никогда больше), когда задания отправляются в него быстрее, чем они завершают выполнение, и сжимается до нижней границы, когда все задания выполнены и больше не отправляется заданий.

Как бы вы реализовали нечто подобное? Я предполагаю, что это был бы довольно распространенный сценарий использования, но очевидно, что фабричные методы java.util.concurrent.Executorsмогут создавать только пулы фиксированного размера и пулы, которые неограниченно увеличиваются при отправке большого количества заданий. Класс ThreadPoolExecutorпредоставляет параметры corePoolSizeи maxPoolSize, но его документация, по-видимому, подразумевает, что единственный способ когда-либо иметь больше потоков, чем corePoolSize. в то же время заключается в использовании ограниченной очереди заданий, и в этом случае, если вы достигли maxPoolSizeпотоков, вы получите отказы в работе, с которыми вам придется иметь дело самостоятельно? Я придумал следующее:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

Я что-то упускаю из виду? Есть лучший способ сделать это? Кроме того, в зависимости от требований, может быть проблематично, что приведенный выше код не завершится, по крайней мере, (я думаю) (общее количество заданий) - maxSizeзаданий не будет завершено.Поэтому, если вы хотите иметь возможность отправить произвольное количество заданий в пул и продолжить работу немедленно, не дожидаясь завершения любого из них, я не понимаю, как вы могли бы сделать это, не имея выделенного потока «суммирования заданий», который управляет необходимая неограниченная очередь для хранения всех отправленных заданий. AFAICS, если вы используете неограниченную очередь для самого ThreadPoolExecutor, количество его потоков никогда не превысит значение corePoolSize.

15
задан Gray 27 September 2016 в 14:24
поделиться