Конечная цель - добавить дополнительные поведение ListenableFuture s на основе типа аргумента Callable
/ Runnable
. Я хочу добавить дополнительное поведение к каждому из Будущие методы. (Примеры использования можно найти в javadoc AbstractExecutorService и в разделе 7.1.7 документа Goetz Java Concurrency in Practice )
У меня есть существующий ExecutorService , который переопределяет newTaskFor . Он проверяет тип аргумента и создает подкласс FutureTask
. T Естественно, он поддерживает submit, а также invokeAny и invokeAll .
Как получить тот же эффект для ListenableFuture , возвращаемых службой ListeningExecutorService ?
Другими словами, где я могу поместить этот код
if (callable instanceof SomeClass) {
return new FutureTask(callable) {
public boolean cancel(boolean mayInterruptIfRunning) {
System.out.println("Canceling Task");
return super.cancel(mayInterruptIfRunning);
}
};
} else {
return new FutureTask(callable);
}
, чтобы мой клиент может выполнить оператор println
с
ListeningExecutorService executor = ...;
Collection callables = ImmutableSet.of(new SomeClass());
List> futures = executor.invokeAll(callables);
for (Future> future : futures) {
future.cancel(true);
}
Вот список вещей, которые я уже пробовал, и почему они не работают.
Передайте MyExecutorService
в MoreExecutors.listeningDecorator .
Проблема 1: К сожалению, результирующая ListeningExecutorService ( AbstractListeningExecutorService
) не делегируется методам ExecutorService , она делегирует ] выполнить метод (Runnable) на Executor . В результате метод newTaskFor
в MyExecutorService
никогда не вызывается.
Проблема 2: AbstractListeningExecutorService
создает Runnable ( ListenableFutureTask ) с помощью статического фабричного метода, который я не могу расширить.
Внутри newTaskFor
, обычно создайте MyRunnableFuture
, а затем заключите его в ListenableFutureTask
.
Проблема 1: фабричные методы ListenableFutureTask не принимают RunnableFuture s, они принимают Runnable
и Callable
.Если я передаю MyRunnableFuture
как Runnable, результирующая ListenableFutureTask
просто вызовет run ()
, а не какой-либо из методов Future
(где мой поведение есть).
Проблема 2: Даже если он вызвал мои Future
методы, MyRunnableFuture
не является вызываемым
, поэтому я должен предоставить возвращаемое значение когда я создаю ListenableFutureTask
... которого у меня нет ... отсюда и Callable
.
Пусть MyRunnableFuture расширяет ListenableFutureTask
вместо FutureTask
Проблема: ListenableFutureTask
теперь является окончательным (начиная с r10 / r11).
Пусть MyRunnableFuture
расширит ForwardingListenableFuture и реализует RunnableFuture . Затем оберните аргумент SomeClass
в ListenableFutureTask
и верните его из delegate ()
Проблема: Зависает. Я недостаточно хорошо понимаю проблему, чтобы объяснить ее, но эта конфигурация вызывает тупик в FutureTask.Sync.
Исходный код: По запросу, вот источник зависшего Решения D:
import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.*;
/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceD extends ThreadPoolExecutor implements ListeningExecutorService {
// ===== Test Harness =====
private static interface SomeInterface {
public String getName();
}
private static class SomeClass implements SomeInterface, Callable, Runnable {
private final String name;
private SomeClass(String name) {
this.name = name;
}
public Void call() throws Exception {
System.out.println("SomeClass.call");
return null;
}
public void run() {
System.out.println("SomeClass.run");
}
public String getName() {
return name;
}
}
private static class MyListener implements FutureCallback {
public void onSuccess(Void result) {
System.out.println("MyListener.onSuccess");
}
public void onFailure(Throwable t) {
System.out.println("MyListener.onFailure");
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Main.start");
SomeClass someClass = new SomeClass("Main.someClass");
ListeningExecutorService executor = new MyListeningExecutorServiceD();
Collection> callables = ImmutableSet.>of(someClass);
List> futures = executor.invokeAll(callables);
for (Future future : futures) {
Futures.addCallback((ListenableFuture) future, new MyListener());
future.cancel(true);
}
System.out.println("Main.done");
}
// ===== Implementation =====
private static class MyRunnableFutureD extends ForwardingListenableFuture implements RunnableFuture {
private final ListenableFuture delegate;
private final SomeInterface someClass;
private MyRunnableFutureD(SomeInterface someClass, Runnable runnable, T value) {
assert someClass == runnable;
this.delegate = ListenableFutureTask.create(runnable, value);
this.someClass = someClass;
}
private MyRunnableFutureD(SomeClass someClass, Callable callable) {
assert someClass == callable;
this.delegate = ListenableFutureTask.create(callable);
this.someClass = someClass;
}
@Override
protected ListenableFuture delegate() {
return delegate;
}
public void run() {
System.out.println("MyRunnableFuture.run");
try {
delegate.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
System.out.println("MyRunnableFuture.cancel " + someClass.getName());
return super.cancel(mayInterruptIfRunning);
}
}
public MyListeningExecutorServiceD() {
// Same as Executors.newSingleThreadExecutor for now
super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
@Override
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
if (runnable instanceof SomeClass) {
return new MyRunnableFutureD((SomeClass) runnable, runnable, value);
} else {
return new FutureTask(runnable, value);
}
}
@Override
protected RunnableFuture newTaskFor(Callable callable) {
if (callable instanceof SomeClass) {
return new MyRunnableFutureD((SomeClass) callable, callable);
} else {
return new FutureTask(callable);
}
}
/** Must override to supply co-variant return type */
@Override
public ListenableFuture> submit(Runnable task) {
return (ListenableFuture>) super.submit(task);
}
/** Must override to supply co-variant return type */
@Override
public ListenableFuture submit(Runnable task, T result) {
return (ListenableFuture) super.submit(task, result);
}
/** Must override to supply co-variant return type */
@Override
public ListenableFuture submit(Callable task) {
return (ListenableFuture) super.submit(task);
}
}