Java ThreadPoolExecutor, застревающий при использовании ArrayBlockingQueue

Я работаю над некоторым приложением и использую ThreadPoolExecutor для того, чтобы справиться с различными задачами. ThreadPoolExecutor застревает после некоторой продолжительности. Для моделирования этого в более простой среде я написал простой код, где я могу моделировать проблему.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor {
    private int poolSize = 10;
    private int maxPoolSize = 50;
    private long keepAliveTime = 10;
    private ThreadPoolExecutor threadPool = null;
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            100000);

    public MyThreadPoolExecutor() {
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, queue);
        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable runnable,
                    ThreadPoolExecutor threadPoolExecutor) {
                System.out
                        .println("Execution rejected. Please try restarting the application.");
            }

        });
    }

    public void runTask(Runnable task) {
        threadPool.execute(task);
    }

    public void shutDown() {
        threadPool.shutdownNow();
    }
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = threadPool;
    }

    public static void main(String[] args) {
        MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
        for (int i = 0; i < 1000; i++) {
            final int j = i;
            mtpe.runTask(new Runnable() {

                @Override
                public void run() {
                    System.out.println(j);
                }

            });
        }
    }
}

Попытайтесь выполнить этот код несколько раз. Это обычно печатает outs число на консоли и когда все потоки заканчиваются, это существует. Но время от времени, это закончило всю задачу и затем не становится завершенным. Дамп потока следующие:

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended)  
        Daemon System Thread [Attach Listener] (Suspended)  
        Daemon System Thread [Signal Dispatcher] (Suspended)    
        Daemon System Thread [Finalizer] (Suspended)    
            Object.wait(long) line: not available [native method]   
            ReferenceQueue<T>.remove(long) line: not available    
            ReferenceQueue<T>.remove() line: not available    
            Finalizer$FinalizerThread.run() line: not available 
        Daemon System Thread [Reference Handler] (Suspended)    
            Object.wait(long) line: not available [native method]   
            Reference$Lock(Object).wait() line: 485 
            Reference$ReferenceHandler.run() line: not available    
        Thread [pool-1-thread-1] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-2] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-3] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-4] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-6] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-8] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-5] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-10] (Suspended)   
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-9] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [pool-1-thread-7] (Suspended)    
            Unsafe.park(boolean, long) line: not available [native method]  
            LockSupport.park(Object) line: not available    
            AbstractQueuedSynchronizer$ConditionObject.await() line: not available  
            ArrayBlockingQueue<E>.take() line: not available  
            ThreadPoolExecutor.getTask() line: not available    
            ThreadPoolExecutor$Worker.run() line: not available 
            Thread.run() line: not available    
        Thread [DestroyJavaVM] (Suspended)  
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)  

В моем реальном приложении потоки ThreadPoolExecutor входят в это состояние, и затем оно прекращает отвечать.

С уважением, Ravi Rao

10
задан Ravi Rao 17 June 2010 в 05:54
поделиться

1 ответ

В вашем методе main вы никогда не вызываете mtpe.shutdown(). ThreadPoolExecutor будет пытаться поддерживать свой corePoolSize неопределенно долго. Иногда вам повезет, и у вас будет больше, чем corePoolSize потоков, тогда каждый рабочий поток перейдет в ветвь условной логики, которая позволит ему завершиться после заданного вами периода тайм-аута в 10 секунд. Однако, как вы заметили, иногда это не так, поэтому каждый поток в исполнителе будет блокироваться на ArrayBlockingQueue.take() и ждать нового задания.

Также обратите внимание, что существует существенная разница между ExecutorService.shutdown() и ExecutorService.shutdownNow(). Если вы вызываете ExecutorService.shutdownNow(), как указано в вашей реализации обертки, вы будете иногда сбрасывать некоторые задачи, которые не были назначены для выполнения.

Обновление: Со времени моего первоначального ответа реализация ThreadPoolExecutor изменилась таким образом, что программа в исходном сообщении никогда не должна завершаться.

9
ответ дан 4 December 2019 в 01:55
поделиться
Другие вопросы по тегам:

Похожие вопросы: