Допустим, у меня есть две последовательности, возвращающие целые числа от 1 до 5.
Первая возвращает 1, 2 и 3 очень быстро, но 4 и 5 занимают 200 мс каждая.
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
Второй возвращает 1, 2 и 3 с задержкой 200 мс, но 4 и 5 возвращаются быстро.
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
Объединение обеих этих последовательностей дает мне только числа от 1 до 5 .
FastFirst().Union(SlowFirst());
Я не могу гарантировать, какой из двух методов имеет задержки в какой момент, поэтому порядок выполнения не может гарантировать мне решение. Поэтому я хотел бы распараллелить объединение, чтобы минимизировать (искусственную) задержку. в моем примере.
Реальный сценарий: у меня есть кеш, который возвращает некоторые сущности, и источник данных, который возвращает все сущности.Я хотел бы иметь возможность возвращать итератор из метода, который внутренне распараллеливает запрос как к кешу, так и к источнику данных, чтобы кешированные результаты выдавались как можно быстрее.
Примечание 1: я понимаю, что это все еще приводит к потере циклов процессора; Я не спрашиваю, как я могу предотвратить повторение последовательностей по их медленным элементам, просто как я могу объединить их как можно быстрее.
Обновление 1: Я адаптировал отличный ответ Ахитаки-сан для приема нескольких производителей и использования ContinueWhenAll для установки CompleteAdding в BlockingCollection только один раз. Я просто поместил его сюда, потому что он теряется из-за отсутствия форматирования комментариев. Любые дальнейшие отзывы были бы замечательными!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}