Excel Interop - Найти сумму значений столбца

Скажите, что у вас 1000 URL-адресов, и вы хотите, чтобы одновременно открывалось 50 запросов; но как только один запрос завершается, вы открываете соединение со следующим URL-адресом в списке. Таким образом, всегда открыто ровно 50 подключений за раз, пока список URL не исчерпан.

Следующее простое решение появилось много раз здесь, на SO. Он не использует блокирующий код и не создает потоки явно, поэтому он очень хорошо масштабируется:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

Дело в том, что обработка загруженных данных должна быть выполненный на другом конвейере, с другим уровнем параллелизма, особенно если это обработка, связанная с процессором.

Например, вы, вероятно, захотите иметь 4 потока одновременно с обработкой данных (количество ядер ЦП) и до 50 ожидающих запросов для большего количества данных (которые вообще не используют потоки). AFAICT, это не то, что делает ваш код в настоящее время.

Вот где TPL Dataflow или Rx может пригодиться в качестве предпочтительного решения. Однако, безусловно, можно реализовать что-то подобное с помощью простого TPL. Примечание. Единственным кодом блокировки является тот, который выполняет фактическую обработку данных внутри Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}
0
задан Sohel 16 January 2019 в 18:16
поделиться