Политика ThreadPoolExecutor

Я пытаюсь использовать ThreadPoolExecutor для планирования задач, но сталкиваюсь с некоторыми проблемами с его политиками. Вот его установленное поведение:

  1. Если меньше, чем потоки corePoolSize работают, Исполнитель всегда предпочитает добавлять новый поток вместо организации очередей.
  2. Если corePoolSize или больше потоков работают, Исполнитель всегда предпочитает ставить запрос в очередь вместо того, чтобы добавить новый поток.
  3. Если бы запрос не может быть поставлен в очередь, новый поток создается, если это не превысило бы maximumPoolSize, в этом случае, задача будет отклонена.

Поведение, которое я хочу, является этим:

  1. то же как выше
  2. Если больше, чем corePoolSize, но меньше, чем потоки maximumPoolSize работают, предпочитает добавлять новый поток по организации очередей и использованию неактивного потока по добавлению нового потока.
  3. то же как выше

В основном я не хочу, чтобы любые задачи были отклонены; я хочу, чтобы они были поставлены в очередь в неограниченной очереди. Но я действительно хочу иметь до потоков maximumPoolSize. Если я использую неограниченную очередь, она никогда не генерирует потоки после того, как она поражает coreSize. Если я использую ограниченную очередь, она отклоняет задачи. Есть ли какой-либо путь вокруг этого?

О чем я думаю, теперь выполняет ThreadPoolExecutor на SynchronousQueue, но не подает задачи непосредственно к нему - вместо этого питание их к отдельному неограниченному LinkedBlockingQueue. Затем другой поток питается от LinkedBlockingQueue в Исполнителя, и если Вы отклоняетесь, он просто попробовал еще раз, пока он не отклоняется. Это походит на боль и определенный взлом, хотя - там более чистый способ сделать это?

13
задан Joe K 5 August 2010 в 21:44
поделиться

4 ответа

Ваш вариант использования является обычным, полностью законным и, к сожалению, более сложным, чем можно было бы ожидать. Для получения справочной информации вы можете прочитать это обсуждение и найти указатель на решение (также упомянутое в ветке) здесь . Решение Шэя работает нормально.

Обычно я бы немного опасался неограниченных очередей; Обычно лучше иметь явный контроль входящего потока, который плавно ухудшается и регулирует соотношение текущей / оставшейся работы, чтобы не перегружать ни производителя, ни потребителя.

1
ответ дан 2 December 2019 в 01:49
поделиться

Вероятно, нет необходимости в микроуправлении пулом потоков по запросу.

Пул кэшированных потоков будет повторно использовать незанятые потоки, одновременно разрешая потенциально неограниченное количество одновременных потоков. Это, конечно, может привести к снижению быстродействия из-за накладных расходов на переключение контекста во время периодов скачка.

Executors.newCachedThreadPool();

Лучше всего установить ограничение на общее количество потоков, отказавшись от идеи, что сначала будут использоваться незанятые потоки. Изменения конфигурации будут следующими:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

Рассмотрение этого сценария: если у исполнителя меньше, чем corePoolSize потоков, то он не должен быть очень занят. Если система не очень загружена, запуск нового потока не повредит. Это приведет к тому, что ваш ThreadPoolExecutor всегда будет создавать нового рабочего, если он меньше максимально допустимого числа рабочих. Только когда максимальное количество рабочих «работает», работникам, праздно ожидающим выполнения задач, будут даны задания. Если рабочий ожидает aReasonableTimeDuration без задачи, ему разрешается завершить работу. Используя разумные ограничения для размера пула (в конце концов, есть только определенное количество процессоров) и достаточно большой тайм-аут (чтобы потоки не завершались без необходимости), желаемые преимущества, вероятно, будут видны.

Последний вариант - хакерский. По сути, ThreadPoolExecutor внутренне использует BlockingQueue.offer , чтобы определить, есть ли у очереди емкость. Пользовательская реализация BlockingQueue всегда могла отклонить попытку предложения .Когда ThreadPoolExecutor не может предложить задачу в очередь, он попытается создать нового рабочего. Если новый рабочий не может быть создан, будет вызван RejectedExecutionHandler . В этот момент пользовательский RejectedExecutionHandler мог заставить поместить в пользовательский BlockingQueue .

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}
4
ответ дан 2 December 2019 в 01:49
поделиться

Вы ищете что-то более похожее на кэшированный пул потоков?

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()

1
ответ дан 2 December 2019 в 01:49
поделиться

Просто установите corePoolsize = maximumPoolSize и используйте неограниченную очередь?

В вашем списке точек 1 исключает 2, поскольку corePoolSize всегда будет меньше или равно maximumPoolSize .

Править

Все еще есть что-то несовместимое между тем, что вы хотите, и тем, что вам предложит TPE.

Если у вас неограниченная очередь, maximumPoolSize игнорируется, поэтому, как вы заметили, никогда не будет создано и использовано не более corePoolSize потоков.

Итак, опять же, если вы возьмете corePoolsize = maximumPoolSize с неограниченной очередью, вы получите то, что хотите, нет?

1
ответ дан 2 December 2019 в 01:49
поделиться