Как заставить ThreadPoolExecutor отправить () блок метода, если он насыщается?

Я хочу создать a ThreadPoolExecutor таким образом, что то, когда это достигло своего максимального размера и очереди, полно, submit() блоки метода при попытке добавить новые задачи. Сделайте я должен реализовать пользовательское RejectedExecutionHandler для этого или там существующий способ сделать это пользующееся стандартной библиотекой Java?

99
задан Hearen 20 March 2019 в 04:06
поделиться

3 ответа

Одно из возможных решений, которое я только что нашел:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Есть ли другие решения? Я бы предпочел что-то, основанное на RejectedExecutionHandler, так как это кажется стандартным способом работы с подобными ситуациями.

.
45
ответ дан 24 November 2019 в 05:06
поделиться

Следует использовать CallerRunsPolicy, который выполняет отвергнутую задачу в вызывающем потоке. Таким образом, он не сможет отправить исполнителю новые задачи до тех пор, пока эта задача не будет выполнена, и тогда появятся какие-то свободные потоки пула или процесс повторится.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Из документа docs:

Rejected tasks

New tasks submitted in method execute(java.lang.Runnable) will be отвергнутый, когда исполнитель закрывается, а также когда исполнитель использует конечные пределы для обоих максимумов пропускная способность резьбовых и рабочих очередей, и насыщен. В любом случае метод выполнения вызывает RejectedExecutionHandler.rejectedExecution(java.lang.Runnable), java.util.concurrent.ThreadPoolExecutor)) его метод RejectedExecutionHandler. Четыре предопределённые политики обработчика при условии:

  1. В ThreadPoolExecutor.AbortPolicy, по умолчанию:

    1. В ThreadPoolExecutor.AbortPolicy, по умолчанию:

      1. . обработчик бросает время прогона RejectedExecutionException on отказ.
      2. В ThreadPoolExecutor.CallerRunsPolicy, вызывающая нить выполняет сама себя выполняет задание. Это обеспечивает простой механизм управления обратной связью, который будет замедлить скорость выполнения новых задач Представлено.
      3. В ThreadPoolExecutor.DiscardPolicy, a задача, которая не может быть выполнена просто упал.
      4. В ThreadPoolExecutor.DiscardOldestPolicy, если душеприказчик не будет закрыт, то задачей во главе очереди работы является сброшен, а затем повторное исполнение (который может опять провалиться, в результате чего повторить.)

Также при вызове конструктора ThreadPoolExecutor обязательно используйте ограниченную очередь, например ArrayBlockingQueueue. В противном случае ничто не будет отклонено.

Edit: в ответ на ваш комментарий установите размер ArrayBlockingQueue равным максимальному размеру пула потоков и используйте AbortPolicy.

Edit 2: Ok, я вижу к чему вы клоните. А как насчет этого: переопределите метод beforeExecute(), чтобы проверить, что getActiveCount() не превышает getMaximumPoolSize(), а если превысит, то переспите и попробуйте еще раз?

.
12
ответ дан 24 November 2019 в 05:06
поделиться

Создайте свою собственную блокирующую очередь, которая будет использоваться Исполнителем, с блокирующим поведением, которое вы ищете, при этом всегда возвращая доступную оставшуюся емкость (гарантируя, что Исполнитель не будет пытаться создать больше потоков, чем его пул ядра, или запустить обработчик отказа).

Думаю, это даст вам именно то блокирующее поведение, которое вы ищете. Обработчик отказа никогда не будет соответствовать счету, так как это указывает на то, что исполнитель не может выполнить задачу. Что я могу себе представить, так это то, что вы получаете в обработчике нечто вроде "занятого ожидания". Это не то, чего вы хотите, вам нужна очередь на исполнителя, которая блокирует вызывающего...

.
0
ответ дан 24 November 2019 в 05:06
поделиться
Другие вопросы по тегам:

Похожие вопросы: