У меня в настоящее время есть проблема потери сообщения. Эта ошибка редко происходит, но, оказывается, достаточно часто является раздражающей. Вот контекст проблемы:
Действительно ли это возможно для тайм-аута, и сообщение получают для появления одновременно? Существует ли лучший способ для меня обработать сервисную остановку проверить, чтобы помочь избежать этой ошибки?
private void workerFunction()
{
logger.Info("Connecting to queue: " + Settings.Default.goldmine_service_queue);
MessageQueue q = new MessageQueue(Settings.Default.goldmine_service_queue);
q.Formatter = new ActiveXMessageFormatter();
while (serviceStarted)
{
Message currentMessage = null;
try
{
currentMessage = q.Peek(new TimeSpan(0,0,30));
}
catch (System.Messaging.MessageQueueException mqEx)
{
if (mqEx.ToString().Contains("Timeout for the requested operation has expired"))
{
logger.Info("Check for service stop request");
}
else
{
logger.Error("Exception while peeking into MSMQ: " + mqEx.ToString());
}
}
catch (Exception e)
{
logger.Error("Exception while peeking into MSMQ: " + e.ToString());
}
if (currentMessage != null)
{
logger.Info(currentMessage.Body.ToString());
try
{
ProcessMessage(currentMessage);
}
catch (Exception processMessageException)
{
logger.Error("Error in process message: " + processMessageException.ToString());
}
//Remove message from queue.
logger.Info("Message removed from queue.");
q.Receive();
//logPerformance(ref transCount, ref startTime);
}
}//end while
Thread.CurrentThread.Abort();
}
Я не думаю, что сообщения должны быть пропущено на основе быстрого обзора, но вы работаете очень странным образом с большим количеством возможностей для условий гонки.
Почему бы просто не получить сообщение и не передать его в ProcessMessage
(если ProcessMessage
не работает, вы все равно выполняете чтение). Если вам нужно обрабатывать несколько получателей, тогда выполняйте получение в транзакции MSMQ, чтобы сообщение было недоступно для других получателей, но не удалялось из очереди до тех пор, пока транзакция не будет зафиксирована.
Кроме того, вместо опроса очереди, почему бы не сделать асинхронный прием и позволить пулу потоков обработать завершение (где вы должны вызвать EndReceive
). Это избавляет от необходимости связывать нить, а вы не делаете этого.
Я буду изменение currentMessage = q.Peek (new TimeSpan (0,0,30));
на currentMessage = q.Receive ();
решит вашу проблему. Я использую MSMQ для передачи сообщений в том же контексте, но использую только peek (и ожидаю исключения тайм-аута), чтобы определить, пуста ли очередь. Вызов Receive
блокируется, поэтому планируйте его соответствующим образом.
Изменить: также для перехвата исключений - вы можете проверить, является ли ваше исключение исключением из-за тайм-аута, сравнив код ошибки.
mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout
где mqe - это исключение вашей очереди сообщений. Возможно, вам это понравится больше, чем сравнение строк.
Просто некоторые комментарии для пояснения того, как здесь работает MSMQ.
"Я могу доказать, что сообщение вставляется в goldmine_service_queue, так как оно появляется в журнале сообщений.
Сообщение попадает в очередь журнала, когда оригинальное сообщение удаляется из goldmine_service_queue. Таким образом, вы можете сказать, что сообщение успешно доставлено в очередь И успешно удалено из очереди
"Я сильно подозреваю, что моя проблема может быть связана с MessageQueue.Peek и поведением тайм-аута".
"Peek ничего не делает, чтобы удалить сообщение из очереди". Только "q.Receive();" делает это. В вашем коде нет явной связи между подглядываемым и принимаемым сообщением. "q.Receive();" просто говорит "получить сообщение из верхней части очереди". В многопоточной среде можно ожидать непоследовательного прочтения сообщений - некоторые из них могут быть просмотрены и обработаны несколько раз. Вы должны получить ID Peeked сообщения и использовать ReceiveByID, чтобы вы могли получить только прочитанное сообщение.
.