У меня есть Потоковый объект, который иногда получает некоторые данные по нему, но в непредсказуемых интервалах. Сообщения, которые появляются на Потоке, четко определены и объявляют размер своей полезной нагрузки заранее (размер является 16-разрядным целым числом, содержавшимся в первых двух байтах каждого сообщения).
Я хотел бы иметь класс StreamWatcher, который обнаруживает, когда Поток имеет некоторые данные по нему. После того как это делает, я хотел бы, чтобы событие было повышено так, чтобы подписанный экземпляр StreamProcessor мог обработать новое сообщение.
Это может быть сделано с событиями C#, не используя Потоки непосредственно? Кажется, что это должно быть просто, но я не могу добраться, вполне получают мою голову вокруг правильного способа разработать это.
Нормальный подход заключается в использовании .NET паттерна асинхронного программирования, раскрываемого Stream
. По сути, вы начинаете асинхронное чтение, вызывая Stream.BeginRead
, передавая ему byte[]
буфер и метод обратного вызова, который будет вызван, когда данные будут считаны из потока. В методе обратного вызова вы вызываете Stream.EndRead
, передавая ему аргумент IAsncResult
, который был передан вашему обратному вызову. Возвращаемое значение EndRead
сообщает вам, сколько байт было прочитано в буфер.
Получив таким образом несколько первых байт, вы можете дождаться остальной части сообщения (если в первый раз вы не получили достаточно данных), снова вызвав BeginRead
. Получив все сообщение, вы можете поднять событие.
Когда вы говорите не использовать потоки напрямую, я предполагаю, что вы все еще хотите использовать их косвенно через асинхронные вызовы, иначе это было бы не очень полезно.
Все, что вам нужно сделать, это обернуть методы async в Stream
и сохранить результат в буфере. Сначала определим событийную часть спецификации:
public delegate void MessageAvailableEventHandler(object sender,
MessageAvailableEventArgs e);
public class MessageAvailableEventArgs : EventArgs
{
public MessageAvailableEventArgs(int messageSize) : base()
{
this.MessageSize = messageSize;
}
public int MessageSize { get; private set; }
}
Теперь асинхронно прочитайте одно 16-битное целое число из потока и сообщите, когда оно будет готово:
public class StreamWatcher
{
private readonly Stream stream;
private byte[] sizeBuffer = new byte[2];
public StreamWatcher(Stream stream)
{
if (stream == null)
throw new ArgumentNullException("stream");
this.stream = stream;
WatchNext();
}
protected void OnMessageAvailable(MessageAvailableEventArgs e)
{
var handler = MessageAvailable;
if (handler != null)
handler(this, e);
}
protected void WatchNext()
{
stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
null);
}
private void ReadCallback(IAsyncResult ar)
{
int bytesRead = stream.EndRead(ar);
if (bytesRead != 2)
throw new InvalidOperationException("Invalid message header.");
int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
WatchNext();
}
public event MessageAvailableEventHandler MessageAvailable;
}
Думаю, это то, что нужно. Это предполагает, что класс, обрабатывающий сообщение, также имеет доступ к Stream
и готов прочитать его, синхронно или асинхронно, в зависимости от размера сообщения в событии. Если вы хотите, чтобы класс наблюдателя действительно прочитал все сообщение, вам придется добавить дополнительный код для этого.
Да, это можно сделать. Используйте неблокирующий метод Stream.BeginRead с AsyncCallback. Обратный вызов вызывается асинхронно, когда данные становятся доступными. В обратном вызове вызывается Stream.EndRead, чтобы получить данные, и снова вызывается Stream.BeginRead, чтобы получить следующий фрагмент данных. Буферизация входящих данных в массиве байтов, достаточно большом для хранения сообщения. Как только массив байтов будет заполнен (может потребоваться несколько вызовов обратного вызова), поднимите событие. Затем прочитайте сообщение следующего размера, создайте новый буфер, повторите, готово.