определение проблемы ThreadPoolExecutor

Yourkit Это является низко служебным, стабильным, легким установить на JVM, которая будет представлена (всего один dll) и мощное. Для анализа "кучи" выводит, это - единственный профилировщик, который близко подходит Память Eclipse Анализатор .

5
задан Stu Thompson 18 September 2009 в 09:18
поделиться

3 ответа

Может, вам подойдет что-то подобное? Я только что поднял его, пожалуйста, ткните в него. По сути, он реализует пул потоков переполнения, который используется для подпитки базового ThreadPoolExecutor

Я вижу два основных недостатка, которые я вижу в нем:

  • Отсутствие возвращаемого объекта Future в submit () . Но, возможно, это не проблема для вас.
  • Вторичная очередь будет очищаться только в ThreadPoolExecutor , когда задания будут отправлены. Должно быть элегантное решение, но я его пока не вижу. Если вы знаете, что в StusMagicExecutor будет поток задач, то это может не быть проблемой. («Май» - ключевое слово. ) Возможно, вы захотите, чтобы отправленные вами задачи указывали на StusMagicExecutor после завершения?

Магический исполнитель Стью:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}
5
ответ дан 14 December 2019 в 08:55
поделиться

Документы javadoc для ThreadPoolExecutor довольно ясно показывают, что после создания потоков corePoolSize новые потоки будут созданы только после заполнения очереди. Таким образом, если вы установите core на 5 и max на 20, вы никогда не получите желаемого поведения.

Однако, если вы установите и core , и max до 20, то задачи будут добавлены в очередь только в том случае, если все 20 потоков заняты. Конечно, это делает ваше требование «минимум 5 потоков» немного бессмысленным, поскольку все 20 будут сохранены (в любом случае, пока они не отключатся).

1
ответ дан 14 December 2019 в 08:55
поделиться

Я думаю, что эта проблема является недостатком класса и очень обманчива, учитывая комбинации параметров конструктора. Вот решение, взятое из внутреннего ThreadPoolExecutor SwingWorker, который я сделал классом верхнего уровня. В нем нет минимума, но, по крайней мере, используется верхняя граница. Единственное, чего я не знаю, так это какой выигрыш в производительности вы получите от выполнения блокировки.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
1
ответ дан 14 December 2019 в 08:55
поделиться
Другие вопросы по тегам:

Похожие вопросы: