Операция параллельного ввода-вывода C # .NET (с дросселированием) [дубликат]

Многие объяснения уже присутствуют, чтобы объяснить, как это происходит и как это исправить, но вы также должны следовать рекомендациям, чтобы избежать NullPointerException вообще.

См. также: A хороший список лучших практик

Я бы добавил, очень важно, хорошо использовать модификатор final. Использование "окончательной" модификатор, когда это применимо в Java

Сводка:

  1. Используйте модификатор final для обеспечения хорошей инициализации.
  2. Избегайте возврата null в методы, например, при возврате пустых коллекций.
  3. Использовать аннотации @NotNull и @Nullable
  4. Быстрое завершение работы и использование утверждений, чтобы избежать распространения нулевых объектов через все приложение, когда они не должен быть пустым.
  5. Сначала используйте значения с известным объектом: if("knownObject".equals(unknownObject)
  6. Предпочитают valueOf() поверх toString ().
  7. Используйте null safe StringUtils StringUtils.isEmpty(null).

45
задан Josh Wyant 31 March 2014 в 19:11
поделиться

3 ответа

Как и было предложено, используйте TPL Dataflow.

Возможно, вы TransformBlock<TInput, TOutput> .

Вы определяете MaxDegreeOfParallelism для ограничения того, сколько строк может быть преобразовано (т. е. сколько URL-адресов может быть загружено) параллельно. Затем вы отправляете URL-адреса блоку, и когда вы закончите, вы сообщите блоку, что вы закончили добавление элементов, и вы получите ответы.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

Примечание. Буферы TransformBlock как его ввод, так и вывод. Почему тогда нам нужно связать его с BufferBlock?

Поскольку TransformBlock не будет завершена до тех пор, пока все элементы (HttpResponse) не будут использованы, а await downloader.Completion будет висеть , Вместо этого мы позволяем downloader перенаправлять весь свой вывод на выделенный буферный блок - тогда мы ждем завершения downloader и проверяем буферный блок.

48
ответ дан JDawg 19 August 2018 в 17:29
поделиться
  • 1
    +1 Какое изящное решение. Так что меньше кода, так много функциональности. – BlueM 19 March 2014 в 00:53
  • 2
    Надеюсь, мой код хотя бы проиллюстрировал точку :) В моем случае параллелизм, вероятно, будет низким, но многие асинхронные задачи будут опубликованы. Как отключить асинхронные задачи с помощью этой модели? – Josh Wyant 19 March 2014 в 01:00
  • 3
    @JoshWyant Вы имеете в виду ограничение того, сколько URL-адресов может быть загружено сразу? Используя MaxDegreeOfParallelism – dcastro 19 March 2014 в 01:03
  • 4
    @JoshWyant С приведенным выше кодом вы просто публикуете столько URL-адресов, сколько хотите (используя SendAsync). Теза будет буферизирована блоком. Блок будет продолжать брать URL-адреса из буфера и обрабатывать не более 50 за раз. Затем результаты будут помещены в другой буфер. A TransformBlock буферизует как входной, так и выходной. – dcastro 19 March 2014 в 01:05
  • 5
    @dcastro Итак, я в конечном итоге пошел с решением Dataflow. Мой первоначальный страх заключался в том, что MaxDegreeOfParallelism работал точно так же, как Parallel.ForEach, просто создавая произвольное количество потоков для достижения параллелизма. Я ошибался, и параметр очень хорошо работает с async. Tpl.Dataflow работает красиво. Благодаря! – Josh Wyant 20 March 2014 в 17:21

В соответствии с запросом, вот код, в который я закончил.

Работа настраивается в конфигурации мастер-детали, и каждый мастер обрабатывается как пакет. Каждая единица работы ставится в очередь таким образом:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

Мастера буферизуют по одному, чтобы сохранить работу для других внешних процессов. Детали для каждого мастера отправляются для работы через masterTransform TransformManyBlock. A BatchedJoinBlock также создается для сбора деталей в одной партии.

Фактическая работа выполняется в detailTransform TransformBlock асинхронно, 150 за раз. BoundedCapacity установлено значение 300, чтобы гарантировать, что слишком много мастеров не получают буферизацию в начале цепочки, а также оставляют место для достаточно подробных записей, которые должны быть поставлены в очередь, чтобы позволить 150 записей обрабатываться за один раз. Блок выводит object на свои цели, потому что он фильтруется по ссылкам в зависимости от того, является ли это Detail или Exception.

batchAction ActionBlock собирает вывод из всех пакеты и выполняет массовые обновления баз данных, протоколирование ошибок и т. д. для каждой партии.

Будет несколько BatchedJoinBlock s, по одному для каждого мастера. Поскольку каждый ISourceBlock выводится последовательно, и каждая партия принимает только количество подробных записей, связанных с одним мастером, партии будут обрабатываться по порядку. Каждый блок выводит только одну группу и при завершении отсоединяется. Только последний пакетный блок передает свое завершение окончательному ActionBlock.

Сеть потока данных:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
3
ответ дан 7 revs 19 August 2018 в 17:29
поделиться

Скажите, что у вас 1000 URL-адресов, и вы хотите, чтобы одновременно открывалось 50 запросов; но как только один запрос завершается, вы открываете соединение со следующим URL-адресом в списке. Таким образом, всегда открыто ровно 50 подключений за раз, пока список URL не исчерпан.

Следующее простое решение появилось много раз здесь, на SO. Он не использует блокирующий код и не создает потоки явно, поэтому он очень хорошо масштабируется:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

Дело в том, что обработка загруженных данных должна быть выполненный на другом конвейере, с другим уровнем параллелизма, особенно если это обработка, связанная с процессором.

Например, вы, вероятно, захотите иметь 4 потока одновременно с обработкой данных (количество ядер ЦП) и до 50 ожидающих запросов для большего количества данных (которые вообще не используют потоки). AFAICT, это не то, что делает ваш код в настоящее время.

Вот где TPL Dataflow или Rx может пригодиться в качестве предпочтительного решения. Однако, безусловно, можно реализовать что-то подобное с помощью простого TPL. Примечание. Единственным кодом блокировки является тот, который выполняет фактическую обработку данных внутри Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}
33
ответ дан abatishchev 19 August 2018 в 17:29
поделиться
  • 1
    Это самый простой и простой ответ. Это точно то, что я пытался сделать. Моя ошибка пыталась запустить семафор в отдельном потоке, но это делает его намного проще и исключает BlockingCollection. Я просто не понимал, что могу использовать WaitAsync таким образом. Благодарю вас @ Noseratio. – Josh Wyant 19 March 2014 в 03:04
  • 2
    @JoshWyant, никаких проблем. Я считаю, что это почти то, что TPL Dataflow будет делать за сценой, если его конвейер правильно спроектирован и собран. Просто у меня недостаточно навыков TPL Dataflow, но я собираюсь вкладывать в него больше времени. – Noseratio 19 March 2014 в 03:09
  • 3
    Ты прав. Как только вы это понимаете, TPL Dataflow работает красиво. Он решает проблему распространения работы async на несколько ядер, что было моей другой целью. Этот ответ касается моей первой цели, и Dataflow обращается к ним обоим. @Noseratio – Josh Wyant 20 March 2014 в 17:25
  • 4
Другие вопросы по тегам:

Похожие вопросы: