Я пытаюсь переписать код, используя Reactive Extensions для .NET, но мне нужно руководство, как достичь моей цели.
У меня есть класс, который инкапсулирует некоторое асинхронное поведение в библиотеке низкого уровня. Подумайте что-нибудь, что либо читает, либо записывает в сеть. Когда класс запущен, он попытается подключиться к среде, и в случае успеха он сообщит об этом, вызвав из рабочего потока.
Я хочу превратить это асинхронное поведение в синхронный вызов, и ниже я создал значительно упрощенный пример того, как этого можно достичь:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
Запуск AsyncStart
в рабочем потоке - это всего лишь способ имитировать асинхронное поведение библиотеки и не является частью моего реального кода, где библиотека нижнего уровня предоставляет поток и вызывает мой код при обратном вызове.
Обратите внимание, что метод Start
выдаст ] TimeoutException
, если запуск не завершился в течение интервала тайм-аута.
Я хочу переписать этот код, чтобы использовать Rx. Вот моя первая попытка:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Это неплохая попытка, но, к сожалению, она содержит состояние гонки. Если запуск завершается быстро (например, если delay
равен 0) и если есть дополнительная задержка в точке A, то OnNext
будет вызываться на readySubject
до того, как First
выполнит . По сути, IObservable
Я применяю Тайм-аут
и Первый
никогда не видит, что запуск завершен, и вместо этого будет сгенерировано TimeoutException
. 12199] Похоже, что Observable.Defer
был создан для решения подобных проблем. Вот несколько более сложная попытка использовать Rx:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Теперь асинхронная операция запускается не сразу, а только тогда, когда используется IObservable
. К сожалению, состояние гонки все еще сохраняется, но на этот раз в точке B. Если запущенная асинхронная операция вызывает OnNext
до возврата лямбда-выражения Defer
, оно все равно теряется, и TimeoutException
будет вызвано Timeout
. . 12201] Я знаю, что могу использовать такие операторы, как Replay
, для буферизации событий, но мой первоначальный пример без Rx не использует никакой буферизации. Есть ли у меня способ использовать Rx для решения моей проблемы без условий гонки? По сути, запуск асинхронной операции только после подключения IObservable
в этом случае Тайм-аут
и Первый
?
На основании ответа Пола Беттса здесь работает решение:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
Интересная часть - это когда в точке C возникает задержка, превышающая время, необходимое для завершения AsyncStart
. AsyncSubject
сохраняет последнее отправленное уведомление, а Тайм-аут
и Первый
по-прежнему будут работать должным образом.