Функциональные атрибуты могут использоваться для записи легких закрытий, которые обертывают код и связанные данные вместе:
#!/usr/bin/env python
SW_DELTA = 0
SW_MARK = 1
SW_BASE = 2
def stopwatch():
import time
def _sw( action = SW_DELTA ):
if action == SW_DELTA:
return time.time() - _sw._time
elif action == SW_MARK:
_sw._time = time.time()
return _sw._time
elif action == SW_BASE:
return _sw._time
else:
raise NotImplementedError
_sw._time = time.time() # time of creation
return _sw
# test code
sw=stopwatch()
sw2=stopwatch()
import os
os.system("sleep 1")
print sw() # defaults to "SW_DELTA"
sw( SW_MARK )
os.system("sleep 2")
print sw()
print sw2()
1.00934004784
2.00644397736
3.01593494415
Я не вижу блокировки в коде ThreadPoolExecutor
execute (Runnable)
. Единственная переменная - это workQueue
. Какого рода BlockingQueue
вы предоставили своему ThreadPoolExecutor
?
По теме взаимоблокировок:
Вы можете подтвердить, что это взаимоблокировка, изучив полный дамп потока, поскольку предоставляется
в Windows или kill -QUIT
в системах UNIX.
Получив эти данные, вы можете исследовать потоки. Вот подходящая выдержка из статьи Sun об изучении дампов потоков (рекомендуется прочитать) :
Для зависающих, заблокированных или зависших программ: если вы считаете, что ваша программа зависает, генерировать трассировку стека и проверять потоки в состояниях MW или CW. Если программа зашла в тупик, то некоторые из системных потоков, вероятно, будут отображаться как текущие, потому что JVM больше нечего делать.
Более легкое замечание: если вы работаете в среде IDE, можете ли вы гарантировать что в этих методах не включены точки останова.
Как кто-то уже упоминал, это звучит как нормальное поведение, ThreadPoolExecutor просто ждет, чтобы поработать. Если вы хотите остановить его, вам нужно вызвать:
executeor.shutdown ()
, чтобы завершить его, обычно за ним следует executeor.awaitTermination
Ниже приведен исходный код библиотеки (на самом деле это класс из http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip ),
- немного сложно - добавлена защита от повторных вызовов FutureTask, если я не ошибаюсь - но не похоже на тупик - очень простое использование ThreadPool:
package net.spy.memcached.transcoders;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import net.spy.memcached.CachedData;
import net.spy.memcached.compat.SpyObject;
/**
* Asynchronous transcoder.
*/
public class TranscodeService extends SpyObject {
private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),
new ThreadPoolExecutor.DiscardPolicy());
/**
* Perform a decode.
*/
public <T> Future<T> decode(final Transcoder<T> tc,
final CachedData cachedData) {
assert !pool.isShutdown() : "Pool has already shut down.";
TranscodeService.Task<T> task = new TranscodeService.Task<T>(
new Callable<T>() {
public T call() {
return tc.decode(cachedData);
}
});
if (tc.asyncDecode(cachedData)) {
this.pool.execute(task);
}
return task;
}
/**
* Shut down the pool.
*/
public void shutdown() {
pool.shutdown();
}
/**
* Ask whether this service has been shut down.
*/
public boolean isShutdown() {
return pool.isShutdown();
}
private static class Task<T> extends FutureTask<T> {
private final AtomicBoolean isRunning = new AtomicBoolean(false);
public Task(Callable<T> callable) {
super(callable);
}
@Override
public T get() throws InterruptedException, ExecutionException {
this.run();
return super.get();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
this.run();
return super.get(timeout, unit);
}
@Override
public void run() {
if (this.isRunning.compareAndSet(false, true)) {
super.run();
}
}
}
}
Определенно странно.
Но прежде чем писать свой собственный TPE, попробуйте:
другой BlockingQueue
impl., например, LinkedBlockingQueue
укажите fairness=true в ArrayBlockingQueue, т.е. используйте new ArrayBlockingQueue(n, true)
Из этих двух вариантов я бы выбрал второй, потому что очень странно, что offer()
блокируется; Одна из причин, которая приходит на ум - политика планирования потоков на вашем Linux. Просто как предположение.