У меня возникли проблемы с рекурсивной цепочкой наблюдаемых.
Я работаю с RxJS, который в настоящее время находится в версии 1.0.10621 и содержит большинство основных функций Rx в сочетании с Rx для jQuery.
Позвольте мне представить пример сценария для моей проблемы :Я опрашиваю API поиска Twitter(ответ JSON )на наличие твитов/обновлений, содержащих определенное ключевое слово. Ответ также включает «URL-адрес обновления _», который следует использовать для создания последующего запроса -. Ответ на этот запрос -up снова будет содержать новый URL-адрес обновления _и т. д.
Rx.jQuery позволяет мне сделать так, чтобы API поиска Twitter вызывал наблюдаемое событие, которое создает событие onNext, а затем завершается. До сих пор я пытался заставить обработчик onNext запомнить URL-адрес обновления _и использовать его в обработчике onCompleted для создания как нового наблюдаемого, так и соответствующего наблюдателя для следующего запроса. Сюда,одна пара наблюдаемая + наблюдатель бесконечно следует за другой.
Проблема с этим подходом состоит в том, что:
последующие -наблюдаемые объекты/наблюдатели уже активны, когда их предшественники еще не утилизированы.
Мне приходится вести множество неприятных бухгалтерских операций, чтобы поддерживать действительную ссылку на живого в данный момент наблюдателя, которых на самом деле может быть два. (Один в onCompleted, а другой где-то еще в его жизненном -цикле )Эта ссылка, конечно, нужна для отмены подписки/удаления наблюдателя. Альтернативой бухгалтерскому учету было бы реализовать побочный эффект с помощью «все еще работает?» -boolean, как я сделал в своем примере.
Пример кода:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function () {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function () {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Не дайте себя обмануть двойным слоем наблюдаемых/наблюдателей от запросов до твитов. Мой пример касается в основном первого уровня :, запрашивающего данные из Twitter. Если бы при решении этой задачи второй слой (, конвертирующий ответы в твиты ), смог стать единым целым с первым, это было бы фантастикой; Но я думаю, что это совсем другое. Сейчас.
Эрик Мейер указал мне на оператор Expand (см. пример ниже )и предложил в качестве альтернативы шаблоны соединений .
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => ( i == 10 ? Observable.Empty() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
Это должно быть скопировано -и вставлено в LINQPad. Он предполагает одноэлементные наблюдаемые и создает одного конечного наблюдателя.
Итак, мой вопрос: :Как я могу лучше всего реализовать трюк с расширением в RxJS?
РЕДАКТИРОВАТЬ:
Оператор расширения, вероятно, может быть реализован, как показано в этой теме . Но нужно генераторов (, а у меня только JS К сожалению, RxJS 2.0.20304 -бета-версия не реализует метод Extend.