У нас с коллегой спор. Мы пишем приложение.NET, которое обрабатывает огромные объемы данных. Он получает элементы данных, группирует их подмножества в блоки по какому-либо критерию и обрабатывает эти блоки.
Допустим, у нас есть элементы данных типа Foo
, поступающие в какой-то источник (из сети, например )один за другим. Мы хотим собрать подмножеств связанных объектов типа Foo
, построить объект типа Bar
из каждого такого подмножества и обработать объекты типа Bar
.
Один из нас предложил следующую конструкцию. Его основная тема — предоставление IObservable<T>
объектов непосредственно из интерфейсов наших компонентов.
// ********* Interfaces **********
interface IFooSource
{
// this is the event-stream of objects of type Foo
IObservable<Foo> FooArrivals { get; }
}
interface IBarSource
{
// this is the event-stream of objects of type Bar
IObservable<Bar> BarArrivals { get; }
}
/ ********* Implementations *********
class FooSource : IFooSource
{
// Here we put logic that receives Foo objects from the network and publishes them to the FooArrivals event stream.
}
class FooSubsetsToBarConverter : IBarSource
{
IFooSource fooSource;
IObservable<Bar> BarArrivals
{
get
{
// Do some fancy Rx operators on fooSource.FooArrivals, like Buffer, Window, Join and others and return IObservable<Bar>
}
}
}
// this class will subscribe to the bar source and do processing
class BarsProcessor
{
BarsProcessor(IBarSource barSource);
void Subscribe();
}
// ******************* Main ************************
class Program
{
public static void Main(string[] args)
{
var fooSource = FooSourceFactory.Create();
var barsProcessor = BarsProcessorFactory.Create(fooSource) // this will create FooSubsetToBarConverter and BarsProcessor
barsProcessor.Subscribe();
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
Другой предложил другой дизайн, основной темой которого является использование наших собственных интерфейсов издателя/подписчика и использование Rx внутри реализаций только при необходимости.
//********** interfaces *********
interface IPublisher<T>
{
void Subscribe(ISubscriber<T> subscriber);
}
interface ISubscriber<T>
{
Action<T> Callback { get; }
}
//********** implementations *********
class FooSource : IPublisher<Foo>
{
public void Subscribe(ISubscriber<Foo> subscriber) { /*... */ }
// here we put logic that receives Foo objects from some source (the network?) publishes them to the registered subscribers
}
class FooSubsetsToBarConverter : ISubscriber<Foo>, IPublisher<Bar>
{
void Callback(Foo foo)
{
// here we put logic that aggregates Foo objects and publishes Bars when we have received a subset of Foos that match our criteria
// maybe we use Rx here internally.
}
public void Subscribe(ISubscriber<Bar> subscriber) { /*... */ }
}
class BarsProcessor : ISubscriber<Bar>
{
void Callback(Bar bar)
{
// here we put code that processes Bar objects
}
}
//********** program *********
class Program
{
public static void Main(string[] args)
{
var fooSource = fooSourceFactory.Create();
var barsProcessor = barsProcessorFactory.Create(fooSource) // this will create BarsProcessor and perform all the necessary subscriptions
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
Как вы думаете, какой из них лучше? Открывая IObservable<T>
и заставляя наши компоненты создавать новые потоки событий от операторов Rx,или определить наши собственные интерфейсы издателя/подписчика и использовать Rx внутри, если это необходимо?
Вот некоторые вещи, которые следует учитывать при проектировании:
В первом дизайне потребитель наших интерфейсов имеет всю мощь Rx в своих руках и может выполнять любые операторы Rx. Один из нас утверждает, что это преимущество, а другой утверждает, что это недостаток.
Второй дизайн позволяет нам использовать любую архитектуру издатель/подписчик под капотом. Первый дизайн связывает нас с Rx.
Если мы хотим использовать мощность Rx, потребуется больше работы во втором дизайне, потому что нам нужно перевести пользовательскую реализацию издателя/подписчика на Rx и обратно. Это требует написания связующего кода для каждого класса, который хочет выполнить некоторую обработку событий.