Как справиться с завершением запросов в очередь? `async` или событие завершения? [Дубликат]

new_list = list(old_list)

14
задан Josh 5 September 2014 в 19:12
поделиться

4 ответа

Чтобы создать асинхронную одиночную очередь параллелизма в очереди, вы можете просто создать SemaphoreSlim, инициализированный одним, а затем получить метод enqueing await при приобретении этого семафора до начала запрошенной работы.

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

Конечно, чтобы фиксированная степень параллелизма, отличная от одной, просто инициализировала семафор другому ряду.

36
ответ дан Servy 21 August 2018 в 11:34
поделиться
  • 1
    Обратите внимание, что этот код не выполняет задачи в отдельном выделенном потоке (как задано в заголовке), но вместо этого гарантирует, что задачи выполняются один за другим, как фактически необходимый OP. – Alexei Levenkov 2 June 2015 в 14:43
  • 2
    Предоставляет ли этот код даже FIFO? – Thibault D. 2 November 2016 в 13:10
  • 3
    @ThibaultD. Нет. – Servy 2 November 2016 в 14:09
  • 4
    @Thibault D. мой ответ ниже – Alexander Danilov 3 November 2016 в 21:13
  • 5
    Было бы нормально реализовать счетчик выполняемых задач следующим образом: Interlocked.Increment(ref counter); await semaphore.WaitAsync(); [...] Interlocked.Decrement(ref counter); semaphore.Release(); или есть лучший способ сделать это? – Profet 29 November 2016 в 13:22

На самом деле вам не нужно запускать задачи в одном потоке, вам нужно, чтобы они запускались последовательно (один за другим) и FIFO. У TPL нет класса для этого, но вот моя реализация с тестами. https://github.com/Gentlee/SerialQueue

Также есть реализация @Servy, тесты показывают, что он в два раза медленнее, чем мой, и он не гарантирует FIFO.

Пример:

private readonly SerialQueue queue = new SerialQueue();

async Task SomeAsyncMethod()
{
    var result = await queue.Enqueue(DoSomething);
}
3
ответ дан Alexander Danilov 21 August 2018 в 11:34
поделиться

Ваш лучший вариант, как я вижу, это использовать TPL Dataflow 's ActionBlock:

var actionBlock = new ActionBlock<string>(address =>
{
    if (!IsDuplicate(address))
    {
        LocateAddress(address);
    }
});

actionBlock.Post(context.Request.UserHostAddress);

TPL Dataflow надежный, потокобезопасный, async - уже и очень настраиваемая структура, основанная на актерах (доступна как nuget)

Вот простой пример для более сложного случая. Предположим, вы хотите:

  • Включить параллелизм (ограничено доступными ядрами).
  • Ограничить размер очереди (чтобы у вас не хватило памяти).
  • У вас есть LocateAddress и вставка очереди async.
  • Отменить все через час.

var actionBlock = new ActionBlock<string>(async address =>
{
    if (!IsDuplicate(address))
    {
        await LocateAddressAsync(address);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10000,
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});

await actionBlock.SendAsync(context.Request.UserHostAddress);
13
ответ дан i3arnon 21 August 2018 в 11:34
поделиться
  • 1
    Очень хорошо! Я обязательно проверю это. Я никогда не заглядывал в TPL Dataflow, но похоже, что он имеет много преимуществ по сравнению с выделенным потоком. – Josh 6 September 2014 в 21:00

Используйте BlockingCollection<Action> для создания шаблона производителя / потребителя с одним потребителем (только одна вещь работает в то время, как вы хотите) и один или несколько производителей.

Сначала определите общую очередь:

BlockingCollection<Action> queue = new BlockingCollection<Action>();

В вашем пользователе Thread или Task вы берете из него:

//This will block until there's an item available
Action itemToRun = queue.Take()

Затем из любого количества продюсеров на другие потоки просто добавьте в очередь:

queue.Add(() => LocateAddress(context.Request.UserHostAddress));
5
ответ дан Zer0 21 August 2018 в 11:34
поделиться
  • 1
    Это требует, чтобы потребитель обрабатывал задачи синхронно. Там должна быть нить, сидящая там, ничего не делающая, когда нет работы, вместо того, чтобы использовать операции асинхронно, так что нет нити, когда нет работы. – Servy 5 September 2014 в 19:35
  • 2
    @Servy Поток будет в состоянии ожидания, если нет работы. Это не будет тратить время на процессор, поэтому я не вижу никаких проблем с этим. Хотя я до сих пор не понимаю ваши комментарии к «синхронно», или "асинхронно". Это асинхронный дизайн. – Zer0 5 September 2014 в 19:37
  • 3
    @ Zer0 Это асинхронный производитель , но полностью синхронный потребитель . Потоки по-прежнему дорогие, их много крутит, просто чтобы они сидели, ничего не делая, все еще дорого, и их следует избегать. – Servy 5 September 2014 в 19:39
  • 4
    @ Zer0 BlockingCollection не будет дорогостоящим, вы создадите целую кучу нитей, чтобы быть потребителями будет быть дорогим или, по крайней мере, дороже, чем нужно. Это особенно верно, если потребители не полностью насыщены работой в 100% случаев. you need additional threads to add more parallelism Нет, нет. В своем ответе я точно продемонстрирую, как создать N степеней параллелизма с помощью всего лишь zero потоков. – Servy 5 September 2014 в 19:55
  • 5
    @ Zer0 Это просто , чтобы процессор не нуждался во всех для параллелизма. Параллелизм может произойти без каких-либо потоков. Например, вы можете иметь несколько ожидающих запросов ввода-вывода, каждый из которых выполняет работу, при этом нитки не используются вообще. Существует множество способов «работать» и «работать». без использования процессора. – Servy 5 September 2014 в 20:02
Другие вопросы по тегам:

Похожие вопросы: