Реализация асинхронной операции «чтение всех доступных на данный момент данных из потока»

Недавно я дал ответ на этот вопрос: C # - Перенаправление вывода консоли в реальном времени .

Как часто бывает, объяснения (здесь «материал» - это то, как я решил подобную проблему) приводит вас к большему пониманию и / или, как в данном случае, к моментам «упс». Я понял, что мое решение в том виде, в котором оно реализовано, содержит ошибку. Ошибка не имеет большого практического значения, но имеет чрезвычайно большое значение для меня как разработчика: я не могу успокоиться, зная, что мой код может взорваться.

Цель этого вопроса - подавить ошибку. Прошу прощения за длинное вступление, так что давайте испачкаемся.

Я хотел создать класс, который позволяет мне получать ввод из стандартного вывода консоли Stream . Потоки вывода консоли имеют тип FileStream ; реализация может привести к этому, если необходимо. Также существует связанный StreamReader , который уже доступен для использования.

Есть только одна вещь, которую мне нужно реализовать в этом классе для достижения желаемой функциональности: асинхронная операция «прочитать все данные, доступные в данный момент». . Чтение до конца потока нецелесообразно, потому что поток не завершится, если процесс не закроет дескриптор вывода консоли, и он не будет этого делать, потому что он интерактивен и ожидает ввода перед продолжением.

Я буду использовать этот гипотетический асинхронная операция для реализации уведомления на основе событий, что будет более удобно для моих вызывающих.

Открытый интерфейс класса следующий:

public class ConsoleAutomator {
    public event EventHandler StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}

StartSendingEvents и StopSendingEvents делают то, что они рекламируют; для целей этого обсуждения, мы можем предположить, что события всегда отправляются без потери общности.

Класс использует эти два поля внутри:

    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

Функциональные возможности класса реализованы в методах ниже. Для начала:

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

Для чтения данных из Stream без блокировки, а также без использования символа возврата каретки, вызывается BeginRead :

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

Сложная часть :

BeginRead требует использования буфера. Это означает, что при чтении из потока возможно, что байты, доступные для чтения («входящий фрагмент»), больше, чем размер буфера.

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

Для чтения данных из потока без блокировки, а также без использования символа возврата каретки, вызывается BeginRead :

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

Сложная часть:

BeginRead требует использования буфера. Это означает, что при чтении из потока возможно, что байты, доступные для чтения («входящий фрагмент»), больше, чем размер буфера.

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

Для чтения данных из потока без блокировки, а также без использования символа возврата каретки, вызывается BeginRead :

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

Сложная часть:

BeginRead требует использования буфера. Это означает, что при чтении из потока возможно, что байты, доступные для чтения («входящий фрагмент»), больше, чем размер буфера. Помните, что здесь цель - прочитать весь блок и вызвать подписчиков событий ровно один раз для каждого блока .

Для этого, если буфер заполнен после EndRead , мы не отправляем его содержимое подписчикам сразу, а вместо этого добавляем его в StringBuilder . Содержимое StringBuilder отправляется обратно только тогда, когда больше нет для чтения из потока.

    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

После любого чтения, которого недостаточно для заполнения буфера (в этом случае мы знаем, что не было больше данных, которые нужно прочитать во время последней операции чтения), все накопленные данные отправляются подписчикам:

    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

(я знаю, что пока нет подписчиков, данные накапливаются навсегда. Это осознанное решение).

Хорошее

Эта схема работает почти идеально:

  • Асинхронная функциональность без создания потоков
  • Очень удобна для вызывающего кода (просто подписывайтесь на событие)
  • Никогда больше более одного события за каждый раз, когда данные доступны для чтения
  • Почти не зависит от размера буфера

Плохо

Последнее почти очень большое. Рассмотрим, что происходит, когда есть входящий фрагмент, длина которого точно равна размеру буфера. Чанк будет прочитан и буферизован, но событие не будет запущено. За этим последует BeginRead , который ожидает найти больше данных, принадлежащих текущему блоку, чтобы отправить их обратно целиком, но ... в потоке больше не будет данных.

Фактически, пока данные помещаются в поток кусками, длина которых точно равна размеру буфера, данные будут буферизоваться, и событие никогда не будет запущено.

Этот сценарий может быть очень маловероятным на практике, тем более что мы можно выбрать любое число для размера буфера, но проблема есть.

Решение?

К сожалению, после проверки доступных методов в FileStream и StreamReader , я могу ' я не могу найти что-либо, что позволяет мне заглядывать в поток, а также позволяет использовать в нем асинхронные методы.

Одним из «решений» может быть ожидание потока на ManualResetEvent после «заполнения буфера» состояние обнаружено. Если событие не сигнализируется (асинхронным обратным вызовом) в течение небольшого промежутка времени, тогда больше данных из потока поступать не будет, и данные, накопленные на данный момент, должны быть отправлены подписчикам. Однако это приводит к необходимости в другом потоке, требует синхронизации потоков и является совершенно неэлегантным.

Также достаточно указать тайм-аут для BeginRead (время от времени обращайтесь к моему коду, чтобы я мог проверить если есть данные, которые нужно отправить обратно; большую часть времени делать нечего, поэтому я ожидаю, что снижение производительности будет незначительным). Но похоже, что таймауты не поддерживаются в FileStream .

Поскольку я предполагаю, что асинхронные вызовы с таймаутами являются вариантом в голом Win32, другим подходом может быть PInvoke, черт возьми проблемы. Но это также нежелательно, так как приведет к усложнению кода и просто затруднит программирование.

Есть ли элегантный способ обойти проблему?

Спасибо, что проявили достаточно терпения, чтобы прочитать все это.

Обновление:

Я определенно плохо изложил сценарий в своей первоначальной записи. С тех пор я немного изменил описание, но для большей уверенности:

Вопрос заключается в том, как реализовать асинхронную операцию «прочитать все данные, доступные на данный момент».

Приношу свои извинения людям, которые приняли время, чтобы прочитать и ответить без того, чтобы я достаточно ясно выразил свое намерение.

7
задан Community 23 May 2017 в 09:58
поделиться