Непрерывно читая из потока?

У меня есть Потоковый объект, который иногда получает некоторые данные по нему, но в непредсказуемых интервалах. Сообщения, которые появляются на Потоке, четко определены и объявляют размер своей полезной нагрузки заранее (размер является 16-разрядным целым числом, содержавшимся в первых двух байтах каждого сообщения).

Я хотел бы иметь класс StreamWatcher, который обнаруживает, когда Поток имеет некоторые данные по нему. После того как это делает, я хотел бы, чтобы событие было повышено так, чтобы подписанный экземпляр StreamProcessor мог обработать новое сообщение.

Это может быть сделано с событиями C#, не используя Потоки непосредственно? Кажется, что это должно быть просто, но я не могу добраться, вполне получают мою голову вокруг правильного способа разработать это.

14
задан John Saunders 7 May 2010 в 13:54
поделиться

3 ответа

Нормальный подход заключается в использовании .NET паттерна асинхронного программирования, раскрываемого Stream. По сути, вы начинаете асинхронное чтение, вызывая Stream.BeginRead, передавая ему byte[] буфер и метод обратного вызова, который будет вызван, когда данные будут считаны из потока. В методе обратного вызова вы вызываете Stream.EndRead, передавая ему аргумент IAsncResult, который был передан вашему обратному вызову. Возвращаемое значение EndRead сообщает вам, сколько байт было прочитано в буфер.

Получив таким образом несколько первых байт, вы можете дождаться остальной части сообщения (если в первый раз вы не получили достаточно данных), снова вызвав BeginRead. Получив все сообщение, вы можете поднять событие.

1
ответ дан 1 December 2019 в 13:33
поделиться

Когда вы говорите не использовать потоки напрямую, я предполагаю, что вы все еще хотите использовать их косвенно через асинхронные вызовы, иначе это было бы не очень полезно.

Все, что вам нужно сделать, это обернуть методы async в Stream и сохранить результат в буфере. Сначала определим событийную часть спецификации:

public delegate void MessageAvailableEventHandler(object sender,
    MessageAvailableEventArgs e);

public class MessageAvailableEventArgs : EventArgs
{
    public MessageAvailableEventArgs(int messageSize) : base()
    {
        this.MessageSize = messageSize;
    }

    public int MessageSize { get; private set; }
}

Теперь асинхронно прочитайте одно 16-битное целое число из потока и сообщите, когда оно будет готово:

public class StreamWatcher
{
    private readonly Stream stream;

    private byte[] sizeBuffer = new byte[2];

    public StreamWatcher(Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException("stream");
        this.stream = stream;
        WatchNext();
    }

    protected void OnMessageAvailable(MessageAvailableEventArgs e)
    {
        var handler = MessageAvailable;
        if (handler != null)
            handler(this, e);
    }

    protected void WatchNext()
    {
        stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
            null);
    }

    private void ReadCallback(IAsyncResult ar)
    {
        int bytesRead = stream.EndRead(ar);
        if (bytesRead != 2)
            throw new InvalidOperationException("Invalid message header.");
        int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
        OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
        WatchNext();
    }

    public event MessageAvailableEventHandler MessageAvailable;
}

Думаю, это то, что нужно. Это предполагает, что класс, обрабатывающий сообщение, также имеет доступ к Stream и готов прочитать его, синхронно или асинхронно, в зависимости от размера сообщения в событии. Если вы хотите, чтобы класс наблюдателя действительно прочитал все сообщение, вам придется добавить дополнительный код для этого.

14
ответ дан 1 December 2019 в 13:33
поделиться

Да, это можно сделать. Используйте неблокирующий метод Stream.BeginRead с AsyncCallback. Обратный вызов вызывается асинхронно, когда данные становятся доступными. В обратном вызове вызывается Stream.EndRead, чтобы получить данные, и снова вызывается Stream.BeginRead, чтобы получить следующий фрагмент данных. Буферизация входящих данных в массиве байтов, достаточно большом для хранения сообщения. Как только массив байтов будет заполнен (может потребоваться несколько вызовов обратного вызова), поднимите событие. Затем прочитайте сообщение следующего размера, создайте новый буфер, повторите, готово.

2
ответ дан 1 December 2019 в 13:33
поделиться
Другие вопросы по тегам:

Похожие вопросы: