Apparent BufferBlock.Post/Receive/ReceiveAsync race/bug

перекрестный-отправлен вhttp://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

Я знаю... Я не использую TplDataflow в полной мере. ATM я просто использую BufferBlockкак безопасную очередь для передачи сообщений, где производитель и потребитель работают с разной скоростью. Я наблюдаю какое-то странное поведение, которое ставит меня в тупик относительно того, как продолжить.

private BufferBlock messageQueue = new BufferBlock();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

В приведенном выше коде (, который является частью распределенного решения на 2000 строк), Sendвызывается периодически каждые 100 мс или около того. Это означает, что элемент Postпреобразуется в messageQueueпримерно 10 раз в секунду. Это проверено. Однако иногда оказывается, что ReceiveAsyncне завершается в течение времени ожидания (, т.е. Postне приводит к завершению ReceiveAsync), а TimeoutExceptionподнимается через 30 с. На данный момент messageQueue.Countисчисляется сотнями. Это неожиданно. Эта проблема наблюдалась и при более низкой скорости публикации (1 сообщение в секунду)и обычно возникает до того, как через BufferBlockпройдет 1000 элементов.

Итак, чтобы обойти эту проблему, я использую следующий код, который работает, но иногда вызывает 1-секундную задержку при получении (из-за ошибки, описанной выше)

    public async Task GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

Это похоже на состояние гонки в TDF для меня., но я не могу понять, почему этого не происходит в других местах, где я использую BufferBlockаналогичным образом. Экспериментальное изменение с ReceiveAsyncна Receiveне помогает. Я не проверял, но думаю, что приведенный выше код работает отлично. Это шаблон, который я видел в документации «Введение в поток данных TPL» tpldataflow.docx .

Что я могу сделать, чтобы разобраться в этом? Существуют ли какие-либо показатели, которые могут помочь сделать вывод о том, что происходит? Если я не могу создать надежный тестовый пример, какую дополнительную информацию я могу предложить?

Помогите!

17
задан i3arnon 1 January 2014 в 03:36
поделиться