Динамическое изменение размеров java.util.concurrent. ThreadPoolExecutor, в то время как это имеет ожидающие задачи

Я работаю с a java.util.concurrent.ThreadPoolExecutor обработать много объектов параллельно. Хотя поточная обработка себя хорошо работает, время от времени мы сталкивались с другими ограничениями ресурсов из-за действий, происходящих в потоках, которые заставили нас хотеть набрать вниз номер Потоков в пуле.

Я хотел бы знать, существует ли способ набрать вниз номер потоков, в то время как потоки на самом деле работают. Я знаю, что можно звонить setMaximumPoolSize() и/или setCorePoolSize(), но они только изменяют размер пула, после того как потоки становятся неактивными, но они не становятся неактивными, пока нет никаких задач, ожидающих в очереди.

18
задан Ravindra babu 26 November 2017 в 16:20
поделиться

3 ответа

Насколько я могу судить, чистым способом это невозможно.

Вы можете реализовать метод beforeExecute для проверки некоторого логического значения и принудительной остановки потоков. Имейте в виду, что они будут содержать задачу, которая не будет выполнена, пока они не будут повторно включены.

В качестве альтернативы вы можете реализовать afterExecute, чтобы генерировать исключение RuntimeException, когда вы находитесь в состоянии насыщения. Это фактически приведет к тому, что поток умрет, и, поскольку Executor будет выше максимума, новый не будет создан.

Я тоже не рекомендую. Вместо этого попробуйте найти другой способ контроля одновременного выполнения задач, которые вызывают у вас проблемы. Возможно, выполняя их в отдельном пуле потоков с более ограниченным числом рабочих.

5
ответ дан 30 November 2019 в 06:39
поделиться

Я прочитал документацию по setMaximumPoolSize() и setCorePoolSize(), и кажется, что они могут обеспечить нужное вам поведение.

-- EDIT --

Мой вывод был неверным: пожалуйста, смотрите обсуждение ниже для деталей...

0
ответ дан 30 November 2019 в 06:39
поделиться

Совершенно верно. Вызов setCorePoolSize (int) изменит размер ядра пула. Вызов этого метода является потокобезопасным и переопределяет настройки, предоставленные конструктору ThreadPoolExecutor . Если вы обрезаете размер пула, оставшиеся потоки будут отключены после завершения их текущей очереди заданий (если они простаивают, они немедленно отключатся). Если вы увеличиваете размер пула, новые потоки будут выделены как можно скорее. Временные рамки для выделения новых потоков недокументированы, но в реализации выделение новых потоков выполняется при каждом вызове метода execute .

Чтобы связать это с настраиваемой во время выполнения фермой заданий, вы можете предоставить это свойство (либо с помощью оболочки, либо с помощью динамического экспортера MBean) в качестве атрибута JMX для чтения и записи, чтобы создать довольно приятный настраиваемый на лету пакетный процессор.

Чтобы принудительно уменьшить размер пула во время выполнения (что является вашим запросом), вы должны создать подкласс ThreadPoolExecutor и добавить нарушение в метод beforeExecute (Thread, Runnable) . Прерывание потока не является достаточным нарушением, поскольку оно взаимодействует только с состояниями ожидания, а во время обработки потоки задач ThreadPoolExecutor не переходят в состояние прерывания.

Недавно у меня была такая же проблема при попытке принудительного завершения пула потоков до выполнения всех отправленных задач. Чтобы это произошло, я прервал поток, выдав исключение времени выполнения только после замены UncaughtExceptionHandler потока тем, который ожидает мое конкретное исключение и отбрасывает его.

/**
 * A runtime exception used to prematurely terminate threads in this pool.
 */
static class ShutdownException
extends RuntimeException {
    ShutdownException (String message) {
        super(message);
    }
}

/**
 * This uncaught exception handler is used only as threads are entered into
 * their shutdown state.
 */
static class ShutdownHandler 
implements UncaughtExceptionHandler {
    private UncaughtExceptionHandler handler;

    /**
     * Create a new shutdown handler.
     *
     * @param handler The original handler to deligate non-shutdown
     * exceptions to.
     */
    ShutdownHandler (UncaughtExceptionHandler handler) {
        this.handler = handler;
    }
    /**
     * Quietly ignore {@link ShutdownException}.
     * <p>
     * Do nothing if this is a ShutdownException, this is just to prevent
     * logging an uncaught exception which is expected.  Otherwise forward
     * it to the thread group handler (which may hand it off to the default
     * uncaught exception handler).
     * </p>
     */
    public void uncaughtException (Thread thread, Throwable throwable) {
        if (!(throwable instanceof ShutdownException)) {
            /* Use the original exception handler if one is available,
             * otherwise use the group exception handler.
             */
            if (handler != null) {
                handler.uncaughtException(thread, throwable);
            }
        }
    }
}
/**
 * Configure the given job as a spring bean.
 *
 * <p>Given a runnable task, configure it as a prototype spring bean,
 * injecting any necessary dependencices.</p>
 *
 * @param thread The thread the task will be executed in.
 * @param job The job to configure.
 *
 * @throws IllegalStateException if any error occurs.
 */
protected void beforeExecute (final Thread thread, final Runnable job) {
    /* If we're in shutdown, it's because spring is in singleton shutdown
     * mode.  This means we must not attempt to configure the bean, but
     * rather we must exit immediately (prematurely, even).
     */
    if (!this.isShutdown()) {
        if (factory == null) {
            throw new IllegalStateException(
                "This class must be instantiated by spring"
                );
        }

        factory.configureBean(job, job.getClass().getName());
    }
    else {
        /* If we are in shutdown mode, replace the job on the queue so the
         * next process will see it and it won't get dropped.  Further,
         * interrupt this thread so it will no longer process jobs.  This
         * deviates from the existing behavior of shutdown().
         */
        workQueue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

В вашем случае вы можете создать семафор (только для использования в качестве поточно-безопасного счетчика), у которого нет разрешений, и при завершении работы потоков высвободите ему количество разрешений, которое соответствует дельте предыдущего пула ядра. size и новый размер пула (требуется переопределить метод setCorePoolSize (int) ). Это позволит вам завершить ваши потоки после завершения их текущей задачи.

private Semaphore terminations = new Semaphore(0);

protected void beforeExecute (final Thread thread, final Runnable job) {
    if (terminations.tryAcquire()) {
        /* Replace this item in the queue so it may be executed by another
         * thread
         */
        queue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

public void setCorePoolSize (final int size) {
    int delta = getActiveCount() - size;

    super.setCorePoolSize(size);

    if (delta > 0) {
        terminations.release(delta);
    }
}

Это должно прервать n потоков для f (n) = активно - запрошено .Если есть какие-либо проблемы, стратегия распределения ThreadPoolExecutor достаточно надежна. Он учитывает преждевременное завершение с помощью блока finally , который гарантирует выполнение. По этой причине, даже если вы завершите слишком много потоков, они снова заполнятся.

36
ответ дан 30 November 2019 в 06:39
поделиться