awaitable очередь на основе задач

Мне интересно, существует ли реализация / оболочка для ConcurrentQueue , аналогичная BlockingCollection , где взятие из коллекции не блокируется, но вместо этого является асинхронным и вызовет асинхронное ожидание, пока элемент не будет помещен в очередь.

Я придумал свою собственную реализацию, но, похоже, она работает не так, как ожидалось. Мне интересно, изобретаю ли я что-то, что уже существует.

Вот моя реализация:

public class MessageQueue
{
    ConcurrentQueue queue = new ConcurrentQueue();

    ConcurrentQueue> waitingQueue = 
        new ConcurrentQueue>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task Dequeue()
    {
        TaskCompletionSource tcs = new TaskCompletionSource();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

33
задан Stephen Cleary 24 October 2011 в 13:14
поделиться