Я знаю... Я не использую TplDataflow в полной мере. ATM я просто использую BufferBlockкак безопасную очередь для передачи сообщений, где производитель и потребитель работают с разной скоростью. Я наблюдаю какое-то странное поведение, которое ставит меня в тупик относительно того, как продолжить.
private BufferBlock
В приведенном выше коде (, который является частью распределенного решения на 2000 строк), Sendвызывается периодически каждые 100 мс или около того. Это означает, что элемент Postпреобразуется в messageQueueпримерно 10 раз в секунду. Это проверено. Однако иногда оказывается, что ReceiveAsyncне завершается в течение времени ожидания (, т.е. Postне приводит к завершению ReceiveAsync), а TimeoutExceptionподнимается через 30 с. На данный момент messageQueue.Countисчисляется сотнями. Это неожиданно. Эта проблема наблюдалась и при более низкой скорости публикации (1 сообщение в секунду)и обычно возникает до того, как через BufferBlockпройдет 1000 элементов.
Итак, чтобы обойти эту проблему, я использую следующий код, который работает, но иногда вызывает 1-секундную задержку при получении (из-за ошибки, описанной выше)
public async Task GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Это похоже на состояние гонки в TDF для меня., но я не могу понять, почему этого не происходит в других местах, где я использую BufferBlockаналогичным образом. Экспериментальное изменение с ReceiveAsyncна Receiveне помогает. Я не проверял, но думаю, что приведенный выше код работает отлично. Это шаблон, который я видел в документации «Введение в поток данных TPL» tpldataflow.docx .
Что я могу сделать, чтобы разобраться в этом? Существуют ли какие-либо показатели, которые могут помочь сделать вывод о том, что происходит? Если я не могу создать надежный тестовый пример, какую дополнительную информацию я могу предложить?