Оператор Rx для различных последовательностей

ВАЖНО:описание результатов и более подробную информацию см. также в моем ответе

Мне нужно сгруппировать и отфильтровать последовательность объектов/событий, которые обычно реплицируются, буферизируя их с интервалом TimeSpan. Я пытаюсь объяснить это лучше с помощью мраморных диаграмм :

X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z

. произведет

X---Y---Z---X---Y---Z

где X, Y и Z — разные типы событий, а «---» означает интервал. Кроме того, я также хотел бы выделить ключевое свойство, которое доступно для всех типов, поскольку они имеют общий базовый класс :

X, Y, Z : A

. и A содержит свойство Key. Используя обозначение X.a, означающее X.Key = a, конечным образцом будет:

X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c

произведет

X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b

Может ли кто-нибудь помочь мне собрать необходимые операторы Linq (, возможно, DistinctUntilChanged и Buffer )для достижения такого поведения? Спасибо

ОБНОВЛЕНИЕ 18.08.12:

по просьбе, я пытаюсь дать лучшее объяснение. У нас есть устройства, собирающие и отправляющие события в веб-службу. Эти устройства имеют старую логику (, и мы не можем изменить ее из-за обратной совместимости ), и они постоянно отправляют событие, пока не получат подтверждение; после подтверждения они отправляют следующее событие в своей очереди и так далее. События содержат сетевой адрес объекта и некоторые другие свойства, отличающие события в очереди для каждого устройства. Событие выглядит так:

class Event
{
    public string NetworkAddress { get; }

    public string EventCode { get; }

    public string AdditionalAttribute { get; }
}

Цель состоит в том, чтобы каждые 5 секунд обрабатывать выделенные события, полученные от всех устройств,хранение информации в базе данных (, поэтому мы не хотим делать это пачками )и отправлять акк на устройство. Давайте сделаем пример только с двумя устройствами и некоторыми событиями:

Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'

Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'

Pn are the operations done by our server, explained later

Возможная мраморная диаграмма (входные потоки + выходной поток):

Device 'a'          : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b'          : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...

Time                : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-

P1: Server stores and acknowledges [a1] and [b1]
P2: "      "      "   "            [b2]
P3: "      "      "   "            [a2] and [b3]
P4: "      "      "   "            [a3] and [b4]

В конце я думаю, что это, вероятно, простая комбинация основных операторов, но я новичок в Rx, и я немного сбит с толку, так как кажется, что есть много операторов (или комбинаций операторов ), чтобы получить тот же выходной поток.

Обновление 19.08.12:

Пожалуйста, имейте в виду, что этот код работает на сервере и должен работать в течение нескольких дней без утечек памяти... Я не уверен в поведении субъектов. На данный момент для каждого события я вызываю операцию push для службы, которая вызывает OnNext субъекта, поверх которого я должен построить запрос (, если я не ошибаюсь в использовании субъектов ).

Обновление 20.08.12:

Текущая реализация, включая проверочные испытания; это то, что я пробовал, и похоже, что то же самое предложил @yamen

public interface IEventService
{
    // Persists the events
    void Add(IEnumerable<Event> events);
}

public class Event
{
    public string Description { get; set; }
}

/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
    private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);

    private readonly Subject<EventMessage> subject = new Subject<EventMessage>();

    private readonly IDisposable subscription;

    private readonly object locker = new object();

    private readonly IEventService eventService;

    /// <summary>
    /// Initializes a new instance of the <see cref="EventManager"/> class.
    /// </summary>
    /// <param name="scheduler">The scheduler.</param>
    public EventManager(IEventService eventService, IScheduler scheduler)
    {
        this.eventService = eventService;
        this.subscription = this.CreateQuery(scheduler);
    }

    /// <summary>
    /// Pushes the event.
    /// </summary>
    /// <param name="eventMessage">The event message.</param>
    public void PushEvent(EventMessage eventMessage)
    {
        Contract.Requires(eventMessage != null);
        this.subject.OnNext(eventMessage);
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    /// <filterpriority>2</filterpriority>
    public void Dispose()
    {
        this.Dispose(true);
    }

    private void Dispose(bool disposing)
    {
        if (disposing)
        {
            // Dispose unmanaged resources
        }

        this.subject.Dispose();
        this.subscription.Dispose();
    }

    private IDisposable CreateQuery(IScheduler scheduler)
    {
        var buffered = this.subject
           .DistinctUntilChanged(new EventComparer())
           .Buffer(EventHandlingPeriod, scheduler);

        var query = buffered
           .Subscribe(this.HandleEvents);
        return query;
    }

    private void HandleEvents(IList<EventMessage> eventMessages)
    {
        Contract.Requires(eventMessages != null);
        var events = eventMessages.Select(this.SelectEvent);
        this.eventService.Add(events);
    }

    private Event SelectEvent(EventMessage message)
    {
        return new Event { Description = "evaluated description" };
    }

    private class EventComparer : IEqualityComparer<EventMessage>
    {
        public bool Equals(EventMessage x, EventMessage y)
        {
            return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
        }

        public int GetHashCode(EventMessage obj)
        {
            var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
            return s.GetHashCode();
        }
    }
}

public class EventMessage
{
    public string NetworkAddress { get; set; }

    public byte EventCode { get; set; }

    public byte Attribute { get; set; }

    // Other properties
}

И испытание:

public void PushEventTest()
    {
        const string Address1 = "A:2.1.1";
        const string Address2 = "A:2.1.2";

        var eventServiceMock = new Mock<IEventService>();

        var scheduler = new TestScheduler();
        var target = new EventManager(eventServiceMock.Object, scheduler);
        var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
        var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        scheduler.Schedule(() => target.PushEvent(eventMessageA1));
        scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1));
        scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1));

        scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);

        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());

        scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1));

        scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);

        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
    }

Кроме того, я еще раз отмечаю, что очень важно, чтобы программное обеспечение могло работать без проблем в течение нескольких дней, обрабатывая тысячи сообщений. Чтобы было ясно :, тест не проходит с текущей реализацией.

10
задан fra 26 August 2012 в 07:54
поделиться