RxJava Как заставить наблюдателей обрабатывать в порядке [дублировать]

Другие ответы верны, что обычно удалить плохую идею из списка, который вы повторяете. Обратное повторение позволяет избежать ошибок, но гораздо сложнее следовать коду, который делает это, поэтому обычно вам лучше использовать понимание списка или filter.

Однако есть один случай, когда безопасно удалять элементы из последовательности, которую вы выполняете: если вы удаляете только один элемент во время повтора. Это может быть обеспечено с помощью return или break. Например:

for i, item in enumerate(lst):
    if item % 4 == 0:
        foo(item)
        del lst[i]
        break

Это часто легче понять, чем понимание списка, когда вы выполняете некоторые операции с побочными эффектами для первого элемента в списке, который удовлетворяет некоторым условиям, а затем удаляет этот элемент из список сразу после.

2
задан Martynas Jurkus 8 May 2014 в 07:26
поделиться

3 ответа

Я не совсем уверен, что вы пытаетесь сделать в IdlingWrapper, но я думаю, что текущая реализация очень хрупкая.

Я думаю, что самое важное, что должно произойти, - это гарантировать наблюдаемый может быть вызван только один раз.

Вот краткая реализация, чтобы продемонстрировать это, а также мою реализацию wrapObservable.

public class Test {

    private static int counter = 0;

    private static final List<Observable<?>> list = Collections.synchronizedList(new ArrayList<>());

    protected static <T> Observable<T> wrapObservable(final Observable<T> original) {
        // run atleast once???
        synchronized (list) {
            list.add(original);
        }

        return Observable.create(new Observable.OnSubscribe<Void>() {
            @Override
            public void call(Subscriber<? super Void> subscriber) {
                synchronized (list) {
                    counter++;
                    if (!list.contains(original)) {
                        subscriber.onError(new Exception("You can only subscribe once!"));
                        return;
                    }
                    list.remove(original);
                }

                // Sleep to make it easier to see things happening...
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException ignored) {
                }

                subscriber.onCompleted();
            }
        }).flatMap(new Func1<Void, Observable<? extends T>>() {
            @Override
            public Observable<? extends T> call(Void o) {
                return original;
            }
        }).finallyDo(new Action0() {
            @Override
            public void call() {
                synchronized (list) {
                    counter--;
                    if (list.size() == 0 && counter == 0) {
                        System.err.println("finally");
                    }
                }
            }
        });
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i = 0; i < 10; i++) {
            // running in io thread for simulating async call.
            Observable<String> test = wrapObservable(Observable.from("TEST!!!!!!")).subscribeOn(Schedulers.io());
            test.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.err.println("completed");
                }

                @Override
                public void onError(Throwable e) {
                    System.err.println("error");
                }

                @Override
                public void onNext(String s) {
                    System.err.println("next");
                }
            });

            // example of calling the same observable twice.
            test.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.err.println("completed");
                }

                @Override
                public void onError(Throwable e) {
                    System.err.println("error");
                }

                @Override
                public void onNext(String s) {
                    System.err.println("next");
                }
            });
        }

        Thread.sleep(10000);
    }
}
1
ответ дан tanoshi 26 August 2018 в 08:36
поделиться

Кажется, что это сработало отлично.

protected <T> Observable<T> wrapObservable(final Observable<T> original) {
    return Observable.create(new Observable.OnSubscribeFunc<T>() {
        @Override
        public Subscription onSubscribe(final Observer<? super T> t1) {
            original.subscribe(new Observer<T>() {
                @Override
                public void onCompleted() {
                    t1.onCompleted();
                    uiThreadHandler.post(new Runnable() {
                        @Override
                        public void run() {
                            counter.decrementAndGet();
                            notifyIdle();
                        }
                    });
                }

                @Override
                public void onError(Throwable e) {
                    t1.onError(e);
                    uiThreadHandler.post(new Runnable() {
                        @Override
                        public void run() {
                            counter.decrementAndGet();
                            notifyIdle();
                        }
                    });
                }

                @Override
                public void onNext(T args) {
                    t1.onNext(args);
                }
            });

            return Subscriptions.empty();
        }
    });
}
0
ответ дан Martynas Jurkus 26 August 2018 в 08:36
поделиться

Если вы хотите просто использовать встроенные методы RxJava для заказа своих наблюдателей, вы можете использовать flatMap и диапазон, чтобы каждый элемент был разбит на несколько элементов с приоритетом, а затем на приоритет. Наблюдатели упорядочиваются на основе того, как они фильтруются.

Вот тривиальный пример:

Observable<Pair<Integer, Object>> shared = RxView.clicks(findViewById(R.id.textView))
        .flatMap(c -> Observable.range(0, 2).map(i -> Pair.create(i, c)))
        .share();

shared.filter(p -> p.first == 1)
        .map(p -> p.second)
        .doOnSubscribe(c -> Log.d(TAG, "first subscribed doOnSubscribe"))
        .subscribe(c -> Log.d(TAG, "first subscribed onNext"));

shared.filter(p -> p.first == 0)
        .map(p -> p.second)
        .doOnSubscribe(c -> Log.d(TAG, "second subscribed doOnSubscribe"))
        .subscribe(c -> Log.d(TAG, "second subscribed onNext"));

Если вы делаете это повсюду

0
ответ дан nick allen 26 August 2018 в 08:36
поделиться
Другие вопросы по тегам:

Похожие вопросы: