Мне интересно, существует ли реализация / оболочка для ConcurrentQueue , аналогичная BlockingCollection , где взятие из коллекции не блокируется, но вместо этого является асинхронным и вызовет асинхронное ожидание, пока элемент не будет помещен в очередь.
Я придумал свою собственную реализацию, но, похоже, она работает не так, как ожидалось. Мне интересно, изобретаю ли я что-то, что уже существует.
Вот моя реализация:
public class MessageQueue
{
ConcurrentQueue queue = new ConcurrentQueue();
ConcurrentQueue> waitingQueue =
new ConcurrentQueue>();
object queueSyncLock = new object();
public void Enqueue(T item)
{
queue.Enqueue(item);
ProcessQueues();
}
public async Task Dequeue()
{
TaskCompletionSource tcs = new TaskCompletionSource();
waitingQueue.Enqueue(tcs);
ProcessQueues();
return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
}
private void ProcessQueues()
{
TaskCompletionSource tcs=null;
T firstItem=default(T);
while (true)
{
bool ok;
lock (queueSyncLock)
{
ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
if (ok)
{
waitingQueue.TryDequeue(out tcs);
queue.TryDequeue(out firstItem);
}
}
if (!ok) break;
tcs.SetResult(firstItem);
}
}
}