RxJS :Рекурсивный список наблюдаемых и один наблюдатель

У меня возникли проблемы с рекурсивной цепочкой наблюдаемых.

Я работаю с RxJS, который в настоящее время находится в версии 1.0.10621 и содержит большинство основных функций Rx в сочетании с Rx для jQuery.

Позвольте мне представить пример сценария для моей проблемы :Я опрашиваю API поиска Twitter(ответ JSON )на наличие твитов/обновлений, содержащих определенное ключевое слово. Ответ также включает «URL-адрес обновления _», который следует использовать для создания последующего запроса -. Ответ на этот запрос -up снова будет содержать новый URL-адрес обновления _и т. д.

Rx.jQuery позволяет мне сделать так, чтобы API поиска Twitter вызывал наблюдаемое событие, которое создает событие onNext, а затем завершается. До сих пор я пытался заставить обработчик onNext запомнить URL-адрес обновления _и использовать его в обработчике onCompleted для создания как нового наблюдаемого, так и соответствующего наблюдателя для следующего запроса. Сюда,одна пара наблюдаемая + наблюдатель бесконечно следует за другой.

Проблема с этим подходом состоит в том, что:

  1. последующие -наблюдаемые объекты/наблюдатели уже активны, когда их предшественники еще не утилизированы.

  2. Мне приходится вести множество неприятных бухгалтерских операций, чтобы поддерживать действительную ссылку на живого в данный момент наблюдателя, которых на самом деле может быть два. (Один в onCompleted, а другой где-то еще в его жизненном -цикле )Эта ссылка, конечно, нужна для отмены подписки/удаления наблюдателя. Альтернативой бухгалтерскому учету было бы реализовать побочный эффект с помощью «все еще работает?» -boolean, как я сделал в своем примере.

Пример кода:

            running = true;
            twitterUrl = "http://search.twitter.com/search.json";
            twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
            twitterMaxId = 0; //actually twitter ignores its since_id parameter

            newTweetObserver = function () {
                return Rx.Observer.create(
                        function (tweet) {
                            if (tweet.id > twitterMaxId) {
                                twitterMaxId = tweet.id;
                                displayTweet(tweet);
                            }
                        }
                    );
            }

            createTwitterObserver = function() {
                twitterObserver = Rx.Observer.create(
                        function (response) {
                            if (response.textStatus == "success") {
                                var data = response.data;
                                if (data.error == undefined) {
                                    twitterQuery = data.refresh_url;
                                    var tweetObservable;
                                    tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                    tweetObservable.subscribe(newTweetObserver());
                                }
                            }
                        },
                        function(error) { alert(error); },
                        function () {
                            //create and listen to new observer that includes a delay 
                            if (running) {
                                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                twitterObservable.subscribe(createTwitterObserver());
                            }
                        } 
                    );
                return twitterObserver;
            }
            twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
            twitterObservable.subscribe(createTwitterObserver());

Не дайте себя обмануть двойным слоем наблюдаемых/наблюдателей от запросов до твитов. Мой пример касается в основном первого уровня :, запрашивающего данные из Twitter. Если бы при решении этой задачи второй слой (, конвертирующий ответы в твиты ), смог стать единым целым с первым, это было бы фантастикой; Но я думаю, что это совсем другое. Сейчас.

Эрик Мейер указал мне на оператор Expand (см. пример ниже )и предложил в качестве альтернативы шаблоны соединений .

var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
                  , i => ( i == 10 ? Observable.Empty() // terminate
         : new[]{i+1}.ToObservable() // recurse
 )
);

ys.ToArray().Select(a => string.Join(",", a)).DumpLive();

Это должно быть скопировано -и вставлено в LINQPad. Он предполагает одноэлементные наблюдаемые и создает одного конечного наблюдателя.

Итак, мой вопрос: :Как я могу лучше всего реализовать трюк с расширением в RxJS?

РЕДАКТИРОВАТЬ:
Оператор расширения, вероятно, может быть реализован, как показано в этой теме . Но нужно генераторов (, а у меня только JS К сожалению, RxJS 2.0.20304 -бета-версия не реализует метод Extend.

8
задан derabbink 2 February 2013 в 16:50
поделиться