Анализ Observable Network IO

Я пытаюсь использовать Rx для чтения из потока приема TCPClient и анализа данных в IObservable строки, разделенной символом новой строки "\r\n". Вот как я Получаю из сокета поток...

var messages = new Subject<string>();

var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };

Observable
   .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

Вот что я придумал для разбора строки. В настоящее время это не работает...

obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

Субъект сообщения получает сообщение порциями, поэтому я пытаюсь объединить их и проверить, содержит ли объединенная строка новую строку, тем самым сигнализируя буферу о закрытии и выводе буферизованных порций. Не уверен, почему это не работает. Кажется, я получаю только первый фрагмент из obsStrings.

Итак, я ищу две вещи. Я хотел бы упростить чтение потока ввода-вывода и исключить использование темы сообщений. Во-вторых, я хотел бы, чтобы синтаксический анализ строк работал. Я немного взломал это и не могу найти рабочего решения. Я новичок с Rx.

РЕДАКТИРОВАТЬ :Вот готовый продукт после решения проблемы....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
           .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
           .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
           .Where(x => x.EndsWith("\r\n"))
           .Select(buffered => String.Join("", buffered))
           .Select(a => a.Replace("\n", ""));

"ReceiveUntilCompleted" является расширением проекта RXX.

7
задан TK3 7 May 2012 в 04:44
поделиться