Используя параллельные расширения Linq для объединения двух последовательностей, как можно сначала получить самые быстрые результаты?

Допустим, у меня есть две последовательности, возвращающие целые числа от 1 до 5.

Первая возвращает 1, 2 и 3 очень быстро, но 4 и 5 занимают 200 мс каждая.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

Второй возвращает 1, 2 и 3 с задержкой 200 мс, но 4 и 5 возвращаются быстро.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

Объединение обеих этих последовательностей дает мне только числа от 1 до 5 .

FastFirst().Union(SlowFirst());

Я не могу гарантировать, какой из двух методов имеет задержки в какой момент, поэтому порядок выполнения не может гарантировать мне решение. Поэтому я хотел бы распараллелить объединение, чтобы минимизировать (искусственную) задержку. в моем примере.

Реальный сценарий: у меня есть кеш, который возвращает некоторые сущности, и источник данных, который возвращает все сущности.Я хотел бы иметь возможность возвращать итератор из метода, который внутренне распараллеливает запрос как к кешу, так и к источнику данных, чтобы кешированные результаты выдавались как можно быстрее.

Примечание 1: я понимаю, что это все еще приводит к потере циклов процессора; Я не спрашиваю, как я могу предотвратить повторение последовательностей по их медленным элементам, просто как я могу объединить их как можно быстрее.

Обновление 1: Я адаптировал отличный ответ Ахитаки-сан для приема нескольких производителей и использования ContinueWhenAll для установки CompleteAdding в BlockingCollection только один раз. Я просто поместил его сюда, потому что он теряется из-за отсутствия форматирования комментариев. Любые дальнейшие отзывы были бы замечательными!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}
9
задан Alex Norcliffe 11 November 2011 в 23:10
поделиться