Добавить / расширить поведение Future, созданное ListeningExecutorService

Конечная цель - добавить дополнительные поведение ListenableFuture s на основе типа аргумента Callable / Runnable . Я хочу добавить дополнительное поведение к каждому из Будущие методы. (Примеры использования можно найти в javadoc AbstractExecutorService и в разделе 7.1.7 документа Goetz Java Concurrency in Practice )

У меня есть существующий ExecutorService , который переопределяет newTaskFor . Он проверяет тип аргумента и создает подкласс FutureTask . T Естественно, он поддерживает submit, а также invokeAny и invokeAll .

Как получить тот же эффект для ListenableFuture , возвращаемых службой ListeningExecutorService ?

Другими словами, где я могу поместить этот код

if (callable instanceof SomeClass) {
   return new FutureTask(callable) {
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("Canceling Task");
            return super.cancel(mayInterruptIfRunning);
        }
    };
} else {
    return new FutureTask(callable);
}

, чтобы мой клиент может выполнить оператор println с

ListeningExecutorService executor = ...;
Collection callables = ImmutableSet.of(new SomeClass());
List> futures = executor.invokeAll(callables);
for (Future future : futures) {
    future.cancel(true);
}

Failed Solutions

Вот список вещей, которые я уже пробовал, и почему они не работают.

Решение A

Передайте MyExecutorService в MoreExecutors.listeningDecorator .

Проблема 1: К сожалению, результирующая ListeningExecutorService ( AbstractListeningExecutorService ) не делегируется методам ExecutorService , она делегирует ] выполнить метод (Runnable) на Executor . В результате метод newTaskFor в MyExecutorService никогда не вызывается.

Проблема 2: AbstractListeningExecutorService создает Runnable ( ListenableFutureTask ) с помощью статического фабричного метода, который я не могу расширить.

Решение B

Внутри newTaskFor , обычно создайте MyRunnableFuture , а затем заключите его в ListenableFutureTask .

Проблема 1: фабричные методы ListenableFutureTask не принимают RunnableFuture s, они принимают Runnable и Callable .Если я передаю MyRunnableFuture как Runnable, результирующая ListenableFutureTask просто вызовет run () , а не какой-либо из методов Future (где мой поведение есть).

Проблема 2: Даже если он вызвал мои Future методы, MyRunnableFuture не является вызываемым , поэтому я должен предоставить возвращаемое значение когда я создаю ListenableFutureTask ... которого у меня нет ... отсюда и Callable .

Решение C

Пусть MyRunnableFuture расширяет ListenableFutureTask вместо FutureTask

Проблема: ListenableFutureTask теперь является окончательным (начиная с r10 / r11).

Решение D

Пусть MyRunnableFuture расширит ForwardingListenableFuture и реализует RunnableFuture . Затем оберните аргумент SomeClass в ListenableFutureTask и верните его из delegate ()

Проблема: Зависает. Я недостаточно хорошо понимаю проблему, чтобы объяснить ее, но эта конфигурация вызывает тупик в FutureTask.Sync.

Исходный код: По запросу, вот источник зависшего Решения D:

import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.*;

/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceD extends ThreadPoolExecutor implements ListeningExecutorService {

    // ===== Test Harness =====

    private static interface SomeInterface {
        public String getName();
    }

    private static class SomeClass implements SomeInterface, Callable, Runnable {
        private final String name;

        private SomeClass(String name) {
            this.name = name;
        }

        public Void call() throws Exception {
            System.out.println("SomeClass.call");
            return null;
        }

        public void run() {
            System.out.println("SomeClass.run");
        }

        public String getName() {
            return name;
        }
    }

    private static class MyListener implements FutureCallback {
        public void onSuccess(Void result) {
            System.out.println("MyListener.onSuccess");
        }

        public void onFailure(Throwable t) {
            System.out.println("MyListener.onFailure");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Main.start");

        SomeClass someClass = new SomeClass("Main.someClass");

        ListeningExecutorService executor = new MyListeningExecutorServiceD();
        Collection> callables = ImmutableSet.>of(someClass);
        List> futures = executor.invokeAll(callables);

        for (Future future : futures) {
            Futures.addCallback((ListenableFuture) future, new MyListener());
            future.cancel(true);
        }

        System.out.println("Main.done");
    }

    // ===== Implementation =====

    private static class MyRunnableFutureD extends ForwardingListenableFuture implements RunnableFuture {

        private final ListenableFuture delegate;
        private final SomeInterface someClass;

        private MyRunnableFutureD(SomeInterface someClass, Runnable runnable, T value) {
            assert someClass == runnable;
            this.delegate = ListenableFutureTask.create(runnable, value);
            this.someClass = someClass;
        }

        private MyRunnableFutureD(SomeClass someClass, Callable callable) {
            assert someClass == callable;
            this.delegate = ListenableFutureTask.create(callable);
            this.someClass = someClass;
        }

        @Override
        protected ListenableFuture delegate() {
            return delegate;
        }

        public void run() {
            System.out.println("MyRunnableFuture.run");
            try {
                delegate.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("MyRunnableFuture.cancel " + someClass.getName());
            return super.cancel(mayInterruptIfRunning);
        }
    }

    public MyListeningExecutorServiceD() {
        // Same as Executors.newSingleThreadExecutor for now
        super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }

    @Override
    protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
        if (runnable instanceof SomeClass) {
            return new MyRunnableFutureD((SomeClass) runnable, runnable, value);
        } else {
            return new FutureTask(runnable, value);
        }
    }

    @Override
    protected  RunnableFuture newTaskFor(Callable callable) {
        if (callable instanceof SomeClass) {
            return new MyRunnableFutureD((SomeClass) callable, callable);
        } else {
            return new FutureTask(callable);
        }
    }

    /** Must override to supply co-variant return type */
    @Override
    public ListenableFuture submit(Runnable task) {
        return (ListenableFuture) super.submit(task);
    }

    /** Must override to supply co-variant return type */
    @Override
    public  ListenableFuture submit(Runnable task, T result) {
        return (ListenableFuture) super.submit(task, result);
    }

    /** Must override to supply co-variant return type */
    @Override
    public  ListenableFuture submit(Callable task) {
        return (ListenableFuture) super.submit(task);
    }
}

9
задан Drew 19 January 2012 в 12:24
поделиться