Чтобы создать асинхронную одиночную очередь параллелизма в очереди, вы можете просто создать SemaphoreSlim
, инициализированный одним, а затем получить метод enqueing await
при приобретении этого семафора до начала запрошенной работы.
public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}
public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}
Конечно, чтобы фиксированная степень параллелизма, отличная от одной, просто инициализировала семафор другому ряду.
На самом деле вам не нужно запускать задачи в одном потоке, вам нужно, чтобы они запускались последовательно (один за другим) и FIFO. У TPL нет класса для этого, но вот моя реализация с тестами. https://github.com/Gentlee/SerialQueue
Также есть реализация @Servy, тесты показывают, что он в два раза медленнее, чем мой, и он не гарантирует FIFO.
Пример:
private readonly SerialQueue queue = new SerialQueue();
async Task SomeAsyncMethod()
{
var result = await queue.Enqueue(DoSomething);
}
Ваш лучший вариант, как я вижу, это использовать TPL Dataflow
's ActionBlock
:
var actionBlock = new ActionBlock<string>(address =>
{
if (!IsDuplicate(address))
{
LocateAddress(address);
}
});
actionBlock.Post(context.Request.UserHostAddress);
TPL Dataflow
надежный, потокобезопасный, async
- уже и очень настраиваемая структура, основанная на актерах (доступна как nuget)
Вот простой пример для более сложного случая. Предположим, вы хотите:
LocateAddress
и вставка очереди async
.
var actionBlock = new ActionBlock<string>(async address =>
{
if (!IsDuplicate(address))
{
await LocateAddressAsync(address);
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});
await actionBlock.SendAsync(context.Request.UserHostAddress);
Используйте BlockingCollection<Action>
для создания шаблона производителя / потребителя с одним потребителем (только одна вещь работает в то время, как вы хотите) и один или несколько производителей.
Сначала определите общую очередь:
BlockingCollection<Action> queue = new BlockingCollection<Action>();
В вашем пользователе Thread
или Task
вы берете из него:
//This will block until there's an item available
Action itemToRun = queue.Take()
Затем из любого количества продюсеров на другие потоки просто добавьте в очередь:
queue.Add(() => LocateAddress(context.Request.UserHostAddress));
you need additional threads to add more parallelism
Нет, нет. В своем ответе я точно продемонстрирую, как создать N степеней параллелизма с помощью всего лишь zero i> потоков.
– Servy
5 September 2014 в 19:55
Interlocked.Increment(ref counter); await semaphore.WaitAsync(); [...] Interlocked.Decrement(ref counter); semaphore.Release();
или есть лучший способ сделать это? – Profet 29 November 2016 в 13:22