Использование Rx для блокировки (и, возможно, тайм-аута) асинхронной операции

Я пытаюсь переписать код, используя Reactive Extensions для .NET, но мне нужно руководство, как достичь моей цели.

У меня есть класс, который инкапсулирует некоторое асинхронное поведение в библиотеке низкого уровня. Подумайте что-нибудь, что либо читает, либо записывает в сеть. Когда класс запущен, он попытается подключиться к среде, и в случае успеха он сообщит об этом, вызвав из рабочего потока.

Я хочу превратить это асинхронное поведение в синхронный вызов, и ниже я создал значительно упрощенный пример того, как этого можно достичь:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

Запуск AsyncStart в рабочем потоке - это всего лишь способ имитировать асинхронное поведение библиотеки и не является частью моего реального кода, где библиотека нижнего уровня предоставляет поток и вызывает мой код при обратном вызове.

Обратите внимание, что метод Start выдаст ] TimeoutException , если запуск не завершился в течение интервала тайм-аута.

Я хочу переписать этот код, чтобы использовать Rx. Вот моя первая попытка:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

Это неплохая попытка, но, к сожалению, она содержит состояние гонки. Если запуск завершается быстро (например, если delay равен 0) и если есть дополнительная задержка в точке A, то OnNext будет вызываться на readySubject до того, как First выполнит . По сути, IObservable Я применяю Тайм-аут и Первый никогда не видит, что запуск завершен, и вместо этого будет сгенерировано TimeoutException . 12199] Похоже, что Observable.Defer был создан для решения подобных проблем. Вот несколько более сложная попытка использовать Rx:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

Теперь асинхронная операция запускается не сразу, а только тогда, когда используется IObservable . К сожалению, состояние гонки все еще сохраняется, но на этот раз в точке B. Если запущенная асинхронная операция вызывает OnNext до возврата лямбда-выражения Defer , оно все равно теряется, и TimeoutException будет вызвано Timeout . . 12201] Я знаю, что могу использовать такие операторы, как Replay , для буферизации событий, но мой первоначальный пример без Rx не использует никакой буферизации. Есть ли у меня способ использовать Rx для решения моей проблемы без условий гонки? По сути, запуск асинхронной операции только после подключения IObservable в этом случае Тайм-аут и Первый ?


На основании ответа Пола Беттса здесь работает решение:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

Интересная часть - это когда в точке C возникает задержка, превышающая время, необходимое для завершения AsyncStart . AsyncSubject сохраняет последнее отправленное уведомление, а Тайм-аут и Первый по-прежнему будут работать должным образом.

9
задан Martin Liversage 19 January 2011 в 10:02
поделиться