Как я могу объединить два потока, упорядоченных, а затем сгруппированных по отметке времени?

У меня есть два потока объектов, каждый из которых имеет значение Timestamp. Оба потока идут по порядку, поэтому, например, метки времени могут быть T a= 1,3,6,6,7в одном потоке и T b= 1,2,5,5,6,8в другой. Объекты в обоих потоках одного типа.

Что я хотел бы сделать, так это поместить каждое из этих событий на шину в порядке метки времени, т.е. поставить A 1, затем B 1, В 2 , А 3 и так далее. Кроме того, поскольку некоторые потоки имеют несколько (последовательных) элементов с одинаковой отметкой времени, я хочу, чтобы эти элементы были сгруппированы так, чтобы каждое новое событие представляло собой массив.Таким образом, мы помещаем [A 3] на шину, за которой следуют [A 15, A 25] и так далее. .

Я попытался реализовать это, создав две структуры ConcurrentQueue, помещая каждое событие в конец очереди, затем просматривая каждое начало очереди, выбирая сначала более раннее событие, а затем проходя по поставить в очередь так, чтобы присутствовали все события с этой меткой времени.

Однако я столкнулся с двумя проблемами:

  • Если я оставлю эти очереди неограниченными, у меня быстро закончится память, так как операция чтения выполняется намного быстрее, чем обработчики, получающие события. (У меня есть несколько гигабайт данных).
  • Иногда я сталкиваюсь с ситуацией, когда я обрабатываю событие, скажем, A 15до того, как наступит A 25. Мне как-то нужно остерегаться этого.

Я думаю, что Rx может помочь в этом отношении, но я не вижу явного комбинатора(ов), чтобы сделать это возможным. Таким образом, любой совет очень ценится.

6
задан Dmitri Nesteruk 16 March 2012 в 18:06
поделиться