Задержка и дедупликация с использованием реактивных расширений (Rx)

Я хочу использовать Reactive Extensions для преобразования некоторых сообщений и ретрансляции их после небольшой задержки.

Сообщения выглядят примерно так:

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

Результат выглядит примерно так:

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

Есть пара требований:

  • Продолжительность задержки зависит от содержания сообщения.
  • Каждое сообщение имеет GroupId
  • . Если новое сообщение приходит с тем же GroupId, что и отложенное сообщение, ожидающее передачи, то первое сообщение должно быть отброшено и только второе сообщение должно быть передано после нового периода задержки.

Учитывая Observable и функция отправки:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

Я понимаю, что могу использовать Select для выполнения преобразования.

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • Как я могу применить задержку для указания сообщения? (Обратите внимание, что это может / должно привести к нарушению порядка доставки сообщений. )
  • Как я могу удалить дубликаты сообщений с тем же GroupId?
  • Способен ли Rx решить эту проблему?
  • Есть ли другой способ решения этой проблемы?
9
задан chillitom 19 January 2011 в 06:52
поделиться