У меня есть несколько вызовов, которые должны выполняться последовательно. Рассмотрим IService, у которого есть методы Query и Load. Запрос дает список виджетов, а загрузка предоставляет виджет «по умолчанию». Следовательно, мой сервис выглядит так.
void IService.Query(Action<IEnumerable<Widget>,Exception> callback);
void IService.Load(Action<Widget,Exception> callback);
Имея это в виду, вот грубый набросок модели представления:
public class ViewModel : BaseViewModel
{
public ViewModel()
{
Widgets = new ObservableCollection<Widget>();
WidgetService.Query((widgets,exception) =>
{
if (exception != null)
{
throw exception;
}
Widgets.Clear();
foreach(var widget in widgets)
{
Widgets.Add(widget);
}
WidgetService.Load((defaultWidget,ex) =>
{
if (ex != null)
{
throw ex;
}
if (defaultWidget != null)
{
CurrentWidget = defaultWidget;
}
}
});
}
public IService WidgetService { get; set; } // assume this is wired up
public ObservableCollection<Widget> Widgets { get; private set; }
private Widget _currentWidget;
public Widget CurrentWidget
{
get { return _currentWidget; }
set
{
_currentWidget = value;
RaisePropertyChanged(()=>CurrentWidget);
}
}
}
Я бы хотел упростить последовательный рабочий процесс вызова запроса, а затем значения по умолчанию. Возможно, лучший способ сделать это - вложить лямбда-выражения, как я показал, но я подумал, что может быть более элегантный способ с Rx. Я не хочу использовать Rx ради Rx, но если это позволяет мне организовать приведенную выше логику, чтобы ее было легче читать / поддерживать в методе, я воспользуюсь этим. В идеале, что-то вроде:
Observable.Create(
()=>firstAction(),
()=>secondAction())
.Subscribe(action=>action(),error=>{ throw error; });
С библиотекой power threading я бы сделал что-то вроде:
Service.Query(list=>{result=list};
yield return 1;
ProcessList(result);
Service.Query(widget=>{defaultWidget=widget};
yield return 1;
CurrentWidget = defaultWidget;
Это делает гораздо более очевидным, что рабочий процесс является последовательным и исключает вложенность (выходы являются частью асинхронного перечислителя и являются границами этот блок, пока не вернутся результаты).
Все подобное имело бы смысл для меня.
Итак, суть вопроса: пытаюсь ли я вставить квадратный стержень в круглое отверстие или есть способ переопределить вложенные асинхронные вызовы с помощью Rx?
Вы можете преобразовать методы обслуживания, чтобы они возвращали IObservable вместо того, чтобы принимать обратный вызов в качестве параметра. В этом случае последовательный рабочий процесс может быть реализован с использованием SelectMany , что-то вроде этого ...
WidgetService.Query()
.SelectMany(
widgets =>
{
Widgets.Clear();
foreach (var w in widgets)
{
Widgets.Add(w);
}
return WidgetService.Load();
}
)
.Do(
defaultWidget =>
{
if (defaultWidget != null)
Default = defaultWidget;
}
)
.Subscribe(
_ => { },
e => { throw e; }
);
Однако асинхронные функции IMO F # будут выглядеть намного яснее (в примере я предполагаю, что методы службы возвращают Async> и Async соответственно ). Обратите внимание, что в примере не учитывается, какой поток изменяет поля данных, в реальном коде вы должны обратить на это внимание:
let load = async {
let! widgets = WidgetService.Query()
Widgets.Clear()
for w in widgets do
Widgets.Add(w)
let! defaultWidget = WidgetService.Load()
if defaultWidget <> null then
Default <- defaultWidget
return ()
}
Async.StartWithContinuations(
load,
ignore, // success continuation - ignore result
raise, // error continuation - reraise exception
ignore // cancellation continuation - ignore
)
EDITED
На самом деле можно использовать технику с итераторами, которые вы упомянули в своем вопросе. :
private IEnumerable<IObservable<object>> Intialize()
{
var widgetsList = WidgetService.Query().Start();
yield return widgetsList;
Widgets.Clear();
foreach (var w in widgetsList[0])
{
Widgets.Add(w);
}
var defaultWidgetList = WidgetService.Load().Start();
yield return defaultWidgetList;
if (defaultWidgetList[0] != null)
Default = defaultWidgetList[0];
}
Observable
.Iterate(Intialize)
.Subscribe(
_ => { },
ex => { throw ex; }
);
Вы также можете сделать это с помощью ReactiveXaml , хотя, поскольку ваш CurrentWidget и виджеты являются изменяемыми, вы не можете сделать его таким чистым (есть класс под названием ObservableAsPropertyHelper , который будет обновлять свойство, основанное на IObservable и активирующее RaisePropertyChanged):
public class ViewModel
{
public ViewModel()
{
// These return a Func that wraps an async call in an IObservable<T>
// that always yields only one item (the result of the call)
var QueryAsObservable = Observable.FromAsyncCommand<IEnumerable<Widget>>(WebService.BeginQuery, WebService.EndQuery);
var LoadAsObservable = Observable.FromAsyncCommand<Widget>(WebService.BeginLoad, WebService.EndLoad);
// Create a new command
QueryAndLoad = new ReactiveAsyncCommand();
// QueryAndLoad fires every time someone calls ICommand.Execute
// The .Do is the hacky part, for sync calls it's hidden by RegisterAsyncFunction
var async_results = QueryAndLoad.SelectMany(_ => QueryAsObservable())
.Do(_ => DoTranslate.AsyncCompletedNotification.OnNext(new Unit()));
// Query up the Widgets
async_results.Subscribe(x => x.Run(Widgets.Add));
// Now execute the Load
async_results.SelectMany(_ => LoadAsObservable())
.Subscribe(x => CurrentWidget = x);
QueryAndLoad.Execute();
}
public ReactiveAsyncCommand QueryAndLoad {get; private set; }
public ObservableCollection<Widget> Widgets {get; private set; }
public Widget CurrentWidget {get; set; }
}