Я пытаюсь получить голову вокруг правильных вариантов использования для Реактивных Расширений (Rx). Примерами, который продолжает подходить, являются события UI (перетаскивание, таща), и предположения, что Rx подходит для асинхронных приложений/операций, таких как вызовы веб-сервиса.
Я работаю над приложением, где я должен записать крошечному клиенту API для сервиса REST. Я должен назвать четыре конечных точки REST, три для получения некоторых справочных данных (Аэропорты, Авиакомпании и Состояния), и четвертым является основной сервис, который даст Вам время полета для данного аэропорта.
Я создал классы, подвергающие три сервиса справочных данных, и методы выглядят примерно так:
public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)
В моем методе GetFlights я хочу, чтобы каждый Полет содержал ссылку Аэропорт, от которого это отступает, и Авиакомпания, выполняющая рейс. Чтобы сделать это, мне нужны данные GetAirports и GetAirlines, чтобы быть доступным. Каждый Аэропорт, Авиакомпания и Состояние будут добавлены к Dictionar (ie.e Словарь) так, чтобы я мог легко установить ссылку при парсинге каждого полета.
flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]
Моя текущая реализация теперь похожа на это:
public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
var airports = new AirportNamesService().GetAirports();
var airlines = new AirlineNamesService().GetAirlines();
var statuses = new StatusService().GetStautses();
var referenceData = airports
.ForkJoin(airlines, (allAirports, allAirlines) =>
{
Airports.AddRange(allAirports);
Airlines.AddRange(allAirlines);
return new Unit();
})
.ForkJoin(statuses, (nothing, allStatuses) =>
{
Statuses.AddRange(allStatuses);
return new Unit();
});
string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);
var flights = from data in referenceData
from flight in GetFlightsFrom(url)
select flight;
return flights;
}
private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}
Текущая реализация основана на ответе Sergey и использует ForkJoin для обеспечения последовательного выполнения и что я ссылаюсь на данные, загружается перед Полетами. Эта реализация, выделяют более изящный, чем необходимость запустить событие "ReferenceDataLoaded" как моя предыдущая реализация.
Я думаю, если вы получаете список сущностей от каждого вызова REST, ваш вызов должен иметь немного другую сигнатуру - вы не наблюдаете каждое значение в возвращаемой коллекции, вы наблюдаете событие завершения вызова. Так, для аэропортов он должен иметь сигнатуру:
public IObservable<Aiports> GetAirports()
Следующим шагом будет запуск первых трех параллельно и ожидание их всех:
var ports_lines_statuses =
Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());
Третьим шагом будет компоновка вышеуказанного abservable с GetFlights():
var decoratedFlights =
from pls in ports_lines_statuses
let airport = MyAirportFunc(pls)
from flight in GetFlights(airport)
select flight;
EDIT: Я все еще не понимаю, почему ваши сервисы возвращают
IObservable<Airport>
вместо
IObservable<IEnumerable<Airport>>
AFAIK, из REST вызова вы получаете все сущности сразу - но может быть вы делаете пейджинг? В любом случае, если вы хотите, чтобы RX делал буферизацию, вы можете использовать .BufferWithCount() :
var allAirports = new AirportNamesService()
.GetAirports().BufferWithCount(int.MaxValue);
...
Затем вы можете применить ForkJoin:
var ports_lines_statuses =
allAirports
.ForkJoin(allAirlines, PortsLinesSelector)
.ForkJoin(statuses, ...
ports_lines_statuses будет содержать одно событие на временной шкале, которое будет содержать все ссылочные данные.
EDIT: Вот еще один вариант, использующий свежеиспеченный ListObservable (только последний выпуск):
allAiports = airports.Start();
allAirlines = airlines.Start();
allStatuses = statuses.Start();
...
whenReferenceDataLoaded =
Observable.Join(airports.WhenCompleted()
.And(airlines.WhenCompleted())
.And(statuses.WhenCompleted())
Then((p, l, s) => new Unit()));
public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
{
return source
.Materialize()
.Where(n => n.Kind == NotificationKind.OnCompleted)
.Select(_ => new Unit());
}
Вариант использования основан на вытягивании - IEnumerable в порядке. Если вы хотите сказать, уведомить, где приходит новый рейс, то упаковка REST-вызова на основе pull в Observable.Generate может иметь некоторую ценность.