Мертвая блокировка в ThreadPoolExecutor

Функциональные атрибуты могут использоваться для записи легких закрытий, которые обертывают код и связанные данные вместе:

#!/usr/bin/env python

SW_DELTA = 0
SW_MARK  = 1
SW_BASE  = 2

def stopwatch():
   import time

   def _sw( action = SW_DELTA ):

      if action == SW_DELTA:
         return time.time() - _sw._time

      elif action == SW_MARK:
         _sw._time = time.time()
         return _sw._time

      elif action == SW_BASE:
         return _sw._time

      else:
         raise NotImplementedError

   _sw._time = time.time() # time of creation

   return _sw

# test code
sw=stopwatch()
sw2=stopwatch()
import os
os.system("sleep 1")
print sw() # defaults to "SW_DELTA"
sw( SW_MARK )
os.system("sleep 2")
print sw()
print sw2()

1.00934004784

2.00644397736

3.01593494415

13
задан aioobe 2 July 2014 в 21:13
поделиться

4 ответа

Я не вижу блокировки в коде ThreadPoolExecutor execute (Runnable) . Единственная переменная - это workQueue . Какого рода BlockingQueue вы предоставили своему ThreadPoolExecutor ?

По теме взаимоблокировок:

Вы можете подтвердить, что это взаимоблокировка, изучив полный дамп потока, поскольку предоставляется в Windows или kill -QUIT в системах UNIX.

Получив эти данные, вы можете исследовать потоки. Вот подходящая выдержка из статьи Sun об изучении дампов потоков (рекомендуется прочитать) :

Для зависающих, заблокированных или зависших программ: если вы считаете, что ваша программа зависает, генерировать трассировку стека и проверять потоки в состояниях MW или CW. Если программа зашла в тупик, то некоторые из системных потоков, вероятно, будут отображаться как текущие, потому что JVM больше нечего делать.

Более легкое замечание: если вы работаете в среде IDE, можете ли вы гарантировать что в этих методах не включены точки останова.

2
ответ дан 2 December 2019 в 01:21
поделиться

Как кто-то уже упоминал, это звучит как нормальное поведение, ThreadPoolExecutor просто ждет, чтобы поработать. Если вы хотите остановить его, вам нужно вызвать:

executeor.shutdown ()

, чтобы завершить его, обычно за ним следует executeor.awaitTermination

0
ответ дан 2 December 2019 в 01:21
поделиться

Ниже приведен исходный код библиотеки (на самом деле это класс из http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip ),
- немного сложно - добавлена ​​защита от повторных вызовов FutureTask, если я не ошибаюсь - но не похоже на тупик - очень простое использование ThreadPool:

package net.spy.memcached.transcoders;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import net.spy.memcached.CachedData;
import net.spy.memcached.compat.SpyObject;

/**
 * Asynchronous transcoder.
 */
public class TranscodeService extends SpyObject {

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L,
            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),
            new ThreadPoolExecutor.DiscardPolicy());

    /**
     * Perform a decode.
     */
    public <T> Future<T> decode(final Transcoder<T> tc,
            final CachedData cachedData) {

        assert !pool.isShutdown() : "Pool has already shut down.";

        TranscodeService.Task<T> task = new TranscodeService.Task<T>(
                new Callable<T>() {
                    public T call() {
                        return tc.decode(cachedData);
                    }
                });

        if (tc.asyncDecode(cachedData)) {
            this.pool.execute(task);
        }
        return task;
    }

    /**
     * Shut down the pool.
     */
    public void shutdown() {
        pool.shutdown();
    }

    /**
     * Ask whether this service has been shut down.
     */
    public boolean isShutdown() {
        return pool.isShutdown();
    }

    private static class Task<T> extends FutureTask<T> {
        private final AtomicBoolean isRunning = new AtomicBoolean(false);

        public Task(Callable<T> callable) {
            super(callable);
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            this.run();
            return super.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            this.run();
            return super.get(timeout, unit);
        }

        @Override
        public void run() {
            if (this.isRunning.compareAndSet(false, true)) {
                super.run();
            }
        }
    }

}
0
ответ дан 2 December 2019 в 01:21
поделиться

Определенно странно.

Но прежде чем писать свой собственный TPE, попробуйте:

  • другой BlockingQueue impl., например, LinkedBlockingQueue

  • укажите fairness=true в ArrayBlockingQueue, т.е. используйте new ArrayBlockingQueue(n, true)

Из этих двух вариантов я бы выбрал второй, потому что очень странно, что offer() блокируется; Одна из причин, которая приходит на ум - политика планирования потоков на вашем Linux. Просто как предположение.

0
ответ дан 2 December 2019 в 01:21
поделиться
Другие вопросы по тегам:

Похожие вопросы: