Я нахожусь в ситуации, когда у меня есть список задач, над которыми я работаю (включить диск, изменить положение, дождаться остановки, отключить ).
«Ожидание» отслеживает IObservable<Status>
, которое я хочу ожидать на (, чтобы я мог передать его через ContinueWith
и другие задачи ).
Я начал со следующих задач внутри обработки OnNext подписчика, но это было просто уродливо. Теперь я придумал этот метод расширения:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
var tcs = new TaskCompletionSource<T>();
source
.Where(pred)
.DistinctUntilChanged()
.Take(1) //OnCompletes the observable, subscription will self-dispose
.Subscribe(val => tcs.TrySetResult(val),
ex => tcs.TrySetException(ex),
() => tcs.TrySetCanceled());
return tcs.Task;
}
(ОБНОВЛЕНО с предложением Свика по обработке OnCompleted
иOnError
)
Where
и DistinctUntilChanged
? (Я думаю, они)