У меня есть несколько методов бесконечных генераторов, включая некоторые долгоиграющие и бесконечно долгоиграющие генераторы.
IEnumerable<T> ExampleOne() {
while(true) // this one blocks for a few seconds at a time
yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() {
while(true) //this one blocks for a really long time
yield return OtherLongRunningFunction();
}
Моя цель - получить бесконечную последовательность, которая объединяет элементы из двух примеров. Вот что я попробовал, используя PLINQ:
IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() }
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.SelectMany(source => source.GetRequests());
Кажется, что это правильно объединяет две IEnumerables в новую, при этом элементы из IEnumerable
#1 и #2 доступны всякий раз, когда они появляются в любой из двух исходных IEnumerables
:
//assuming ExampleTwo yields TWO but happens roughly 5 times
//less often then ExampleOne
Example output: one one one one one TWO one one one one one one TWO
Однако, похоже, что иногда (обычно после многих часов работы) OtherLongRunningFunction()
будет долгое время не возвращаться, и при условиях, которые трудно воспроизвести, комбинированная
последовательность будет блокироваться на ней, а не продолжать возвращать результаты из первой LongRunningFunction
. Похоже, что хотя комбинированный параллельный запрос начал с использования двух потоков, позже он решил переключиться на один поток.
Моей первой мыслью было "это, вероятно, работа для RX Observable.Merge
, а не для PLINQ". Но я был бы признателен как за ответы, показывающие правильные альтернативные способы справиться с этой ситуацией, так и за объяснения механики того, как PLINQ может изменить степень параллелизма спустя несколько часов после начала выполнения запроса.