Я пытаюсь использовать ThreadPoolExecutor для планирования задач, но сталкиваюсь с некоторыми проблемами с его политиками. Вот его установленное поведение:
Поведение, которое я хочу, является этим:
В основном я не хочу, чтобы любые задачи были отклонены; я хочу, чтобы они были поставлены в очередь в неограниченной очереди. Но я действительно хочу иметь до потоков maximumPoolSize. Если я использую неограниченную очередь, она никогда не генерирует потоки после того, как она поражает coreSize. Если я использую ограниченную очередь, она отклоняет задачи. Есть ли какой-либо путь вокруг этого?
О чем я думаю, теперь выполняет ThreadPoolExecutor на SynchronousQueue, но не подает задачи непосредственно к нему - вместо этого питание их к отдельному неограниченному LinkedBlockingQueue. Затем другой поток питается от LinkedBlockingQueue в Исполнителя, и если Вы отклоняетесь, он просто попробовал еще раз, пока он не отклоняется. Это походит на боль и определенный взлом, хотя - там более чистый способ сделать это?
Ваш вариант использования является обычным, полностью законным и, к сожалению, более сложным, чем можно было бы ожидать. Для получения справочной информации вы можете прочитать это обсуждение и найти указатель на решение (также упомянутое в ветке) здесь . Решение Шэя работает нормально.
Обычно я бы немного опасался неограниченных очередей; Обычно лучше иметь явный контроль входящего потока, который плавно ухудшается и регулирует соотношение текущей / оставшейся работы, чтобы не перегружать ни производителя, ни потребителя.
Вероятно, нет необходимости в микроуправлении пулом потоков по запросу.
Пул кэшированных потоков будет повторно использовать незанятые потоки, одновременно разрешая потенциально неограниченное количество одновременных потоков. Это, конечно, может привести к снижению быстродействия из-за накладных расходов на переключение контекста во время периодов скачка.
Executors.newCachedThreadPool();
Лучше всего установить ограничение на общее количество потоков, отказавшись от идеи, что сначала будут использоваться незанятые потоки. Изменения конфигурации будут следующими:
corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);
Рассмотрение этого сценария: если у исполнителя меньше, чем corePoolSize
потоков, то он не должен быть очень занят. Если система не очень загружена, запуск нового потока не повредит. Это приведет к тому, что ваш ThreadPoolExecutor
всегда будет создавать нового рабочего, если он меньше максимально допустимого числа рабочих. Только когда максимальное количество рабочих «работает», работникам, праздно ожидающим выполнения задач, будут даны задания. Если рабочий ожидает aReasonableTimeDuration
без задачи, ему разрешается завершить работу. Используя разумные ограничения для размера пула (в конце концов, есть только определенное количество процессоров) и достаточно большой тайм-аут (чтобы потоки не завершались без необходимости), желаемые преимущества, вероятно, будут видны.
Последний вариант - хакерский. По сути, ThreadPoolExecutor
внутренне использует BlockingQueue.offer
, чтобы определить, есть ли у очереди емкость. Пользовательская реализация BlockingQueue
всегда могла отклонить попытку предложения
.Когда ThreadPoolExecutor
не может предложить
задачу в очередь, он попытается создать нового рабочего. Если новый рабочий не может быть создан, будет вызван RejectedExecutionHandler
. В этот момент пользовательский RejectedExecutionHandler
мог заставить поместить
в пользовательский BlockingQueue
.
/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
BlockingQueue<T> delegate;
public boolean offer(T item) {
return false;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
delegate.put(r);
}
//.... delegate methods
}
Вы ищете что-то более похожее на кэшированный пул потоков?
Просто установите corePoolsize = maximumPoolSize
и используйте неограниченную очередь?
В вашем списке точек 1 исключает 2, поскольку corePoolSize
всегда будет меньше или равно maximumPoolSize
.
Править
Все еще есть что-то несовместимое между тем, что вы хотите, и тем, что вам предложит TPE.
Если у вас неограниченная очередь, maximumPoolSize
игнорируется, поэтому, как вы заметили, никогда не будет создано и использовано не более corePoolSize
потоков.
Итак, опять же, если вы возьмете corePoolsize = maximumPoolSize
с неограниченной очередью, вы получите то, что хотите, нет?