Используя Rx для синхронизации асинхронных событий

Я хочу поместить Реактивные Расширения для.NET (Rx) к хорошему использованию и хотел бы получить некоторый вход при выполнении некоторых основных задач. Для иллюстрирования, что я пытаюсь сделать, у меня есть изобретенный пример, где у меня есть внешний компонент с асинхронными событиями:

class Component {

  public void BeginStart() { ... }

  public event EventHandler Started;

}

Компонент запускается путем вызова BeginStart(). Этот метод возвращается сразу, и позже, когда компонент завершил запуск, Started огни события.

Я хочу создать синхронный метод запуска путем обертывания компонента и ожидать до Started событие запущено. Это - то, что я придумал до сих пор:

class ComponentWrapper {

  readonly Component component = new Component();

  void StartComponent() {
    var componentStarted =
      Observable.FromEvent(this.component, "Started");
    using (var startedEvent = new ManualResetEvent(false))
      using (componentStarted.Take(1).Subscribe(e => { startedEvent.Set(); })) {
        this.componenet.BeginStart();
        startedEvent.WaitOne();
      }
  }

}

Я хотел бы избавиться от ManualResetEvent, и я ожидаю, что Rx имеет решение. Но как?

7
задан Martin Liversage 2 June 2010 в 08:54
поделиться

2 ответа

Ответ PL , если он идеально подходит для вашей спецификации, но я подумал, что вы могли бы получить лучшие результаты, не борясь с RX с помощью .First (), но приняв его, создав наблюдаемую для вашего компонента:

    public static IObservable<Unit> AsObservable(this Component component)
    {
        return Observable.Defer(() =>
        {
            component.BeginStart();
            return Observable
                .FromEvent<EventArgs>(component, "Started")
                .Select(_ => new Unit());
        });
    }

Затем вы можете использовать его как блокировку:

new Component().AsObservable().First();

Неблокирующий:

new Component().AsObservable().Subscribe(_ => Console.WriteLine("Done"));

Горячий:

var pub = new Component().AsObservable().Publish();
pub.Subscribe(_ => Console.WriteLine("Sub1"));
pub.Subscribe(_ => Console.WriteLine("Sub2"));
pub.Connect();  // started just once per two subscriptions

Составной:

new Component().AsObservable().Delay(TimeSpan.FromSeconds(1));

и т. Д.

РЕДАКТИРОВАТЬ: В случае нескольких событий, которые вы должны ждать и собирать информацию, может использоваться следующий вариант:

public static IObservable<EventArgs> AsObservable(this Component component)
{
    return Observable.Defer(() =>
    {
        component.BeginStart();
        return 
            Observable.FromEvent<EventArgs>(component, "Started1").Take(1)
                .Merge(
            Observable.FromEvent<EventArgs>(component, "Started2").Take(1))
                .Select(evt => evt.EventArgs);
    });
}

В этом случае, если вы хотите заблокировать до завершения, вы можете использовать .AsObservable.Last () .

10
ответ дан 6 December 2019 в 19:33
поделиться

Что-то вроде этого должно сделать это:

var replay = Observable
    .FromEvent<EventArgs>(this.component, "Started")
    .Replay();
replay.Connect();
component.BeginStart();
replay.First();
3
ответ дан 6 December 2019 в 19:33
поделиться
Другие вопросы по тегам:

Похожие вопросы: