Подводные камни при попытке использовать PLINQ над долгоиграющими генераторами?

У меня есть несколько методов бесконечных генераторов, включая некоторые долгоиграющие и бесконечно долгоиграющие генераторы.

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time
        yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time
        yield return OtherLongRunningFunction();
}

Моя цель - получить бесконечную последовательность, которая объединяет элементы из двух примеров. Вот что я попробовал, используя PLINQ:

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
           .AsParallel()
           .WithMergeOptions(ParallelMergeOptions.NotBuffered)
           .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
           .SelectMany(source => source.GetRequests());

Кажется, что это правильно объединяет две IEnumerables в новую, при этом элементы из IEnumerable #1 и #2 доступны всякий раз, когда они появляются в любой из двух исходных IEnumerables:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output:  one one one one one TWO one one one one one one TWO

Однако, похоже, что иногда (обычно после многих часов работы) OtherLongRunningFunction() будет долгое время не возвращаться, и при условиях, которые трудно воспроизвести, комбинированная последовательность будет блокироваться на ней, а не продолжать возвращать результаты из первой LongRunningFunction. Похоже, что хотя комбинированный параллельный запрос начал с использования двух потоков, позже он решил переключиться на один поток.

Моей первой мыслью было "это, вероятно, работа для RX Observable.Merge, а не для PLINQ". Но я был бы признателен как за ответы, показывающие правильные альтернативные способы справиться с этой ситуацией, так и за объяснения механики того, как PLINQ может изменить степень параллелизма спустя несколько часов после начала выполнения запроса.

6
задан Jimmy 25 January 2012 в 09:51
поделиться