C# Begin/EndReceive - как я считываю большие данные?

from collections import defaultdict

l = [ ["blablabla", "blabliblou"], ["blablablou", "blibloubla"], ["oubabababa", "baboulila"] ]

d = defaultdict(list)
for i, line in enumerate(l):
    [d[word].append(i) for word in line]

print(dict(d))
>>> {'blablabla': [0], 'oubabababa': [2], 'blablablou': [1], 'blabliblou': [0], 'baboulila': [2], 'blibloubla': [1]}

12
задан ryeguy 25 February 2009 в 15:17
поделиться

4 ответа

Нет - вызов BeginReceive снова от обработчика обратного вызова, до EndReceive возвраты 0. В основном необходимо продолжить получать асинхронно, предположив желание самого полного преимущества асинхронного IO.

Если Вы смотрите на страницу MSDN для Socket.BeginReceive Вы будете видеть пример этого. (По общему признанию не столь легко следовать, как это могло бы быть.)

14
ответ дан 2 December 2019 в 06:27
поделиться

Вы считали бы префикс длины сначала. После того как у Вас есть это, Вы просто продолжали бы читать байты в блоках (и можно сделать это асинхронное, когда Вы предположили), пока Вы не исчерпали число байтов, которые Вы знаете, входят от провода.

Обратите внимание, что в какой-то момент, при чтении последнего блока Вы не захотите читать полные 1 024 байта, в зависимости от того, что префикс длины говорит, что общее количество, и сколько байтов Вы читали.

3
ответ дан 2 December 2019 в 06:27
поделиться

Для получения информации (общий Начинают/Заканчивают использование), Вы могли бы хотеть видеть это сообщение в блоге; этот подход работает хорошо на меня и экономит много усилий...

0
ответ дан 2 December 2019 в 06:27
поделиться

Блин. Я не решаюсь даже отвечать на это, учитывая высокопоставленных лиц, которые уже взвесились, но здесь идет. Будьте нежны, O Большие!

Не обладая преимуществом чтения блога Marc (это заблокировано здесь должное корпоративная интернет-политика), я собираюсь предложить "иначе".

Прием, в моем уме, должен разделить получение данных из обработки тех данных.

Я использую класс StateObject, определенный как это. Это отличается от реализации MSDN StateObject, в которую это не включает объект StringBuilder, константа BUFFER_SIZE является частной, и это включает конструктора для удобства.

public class StateObject
{
    private const int BUFFER_SIZE = 65535;
    public byte[] Buffer = new byte[BUFFER_SIZE];
    public readonly Socket WorkSocket = null;

    public StateObject(Socket workSocket)
    {
        WorkSocket = workSocket;
    }
}

У меня также есть Пакетный класс, который является просто оберткой вокруг буфера и метки времени.

public class Packet
{
    public readonly byte[] Buffer;
    public readonly DateTime Timestamp;

    public Packet(DateTime timestamp, byte[] buffer, int size)
    {
        Timestamp = timestamp;
        Buffer = new byte[size];
        System.Buffer.BlockCopy(buffer, 0, Buffer, 0, size);
    }
}

Мой ReceiveCallback () функция похож на это.

public static ManualResetEvent PacketReceived = new ManualResetEvent(false);
public static List<Packet> PacketList = new List<Packet>();
public static object SyncRoot = new object();
public static void ReceiveCallback(IAsyncResult ar)
{
    try {
        StateObject so = (StateObject)ar.AsyncState;
        int read = so.WorkSocket.EndReceive(ar);

        if (read > 0) {
            Packet packet = new Packet(DateTime.Now, so.Buffer, read);
            lock (SyncRoot) {
                PacketList.Add(packet);
            }
            PacketReceived.Set();
        }

        so.WorkSocket.BeginReceive(so.Buffer, 0, so.Buffer.Length, 0, ReceiveCallback, so);
    } catch (ObjectDisposedException) {
        // Handle the socket being closed with an async receive pending
    } catch (Exception e) {
        // Handle all other exceptions
    }
}

Заметьте, что эта реализация не делает абсолютно никакой обработки полученных данных, и при этом это не имеет никаких ожиданий относительно того, сколько байтов, как предполагается, были получены. Это просто получает любые данные, оказывается, находится на сокете (до 65 535 байтов) и хранит те данные в пакетном списке, и затем это сразу стоит в очереди, другой асинхронный получает.

Начиная с обработки больше не происходит в потоке, который обрабатывает, каждый асинхронный получает, данные будут, очевидно, обработаны другим потоком, который является, почему Добавление () операция синхронизируется через оператор блокировки. Кроме того, поток обработки (является ли это основным потоком или некоторым другим специализированным потоком) должен знать, когда существуют данные для обработки. Чтобы сделать это, я обычно использую ManualResetEvent, который является тем, что я показал выше.

Вот то, как обработка работает.

static void Main(string[] args)
{
    Thread t = new Thread(
        delegate() {
            List<Packet> packets;
            while (true) {
                PacketReceived.WaitOne();
                PacketReceived.Reset();
                lock (SyncRoot) {
                    packets = PacketList;
                    PacketList = new List<Packet>();
                }

                foreach (Packet packet in packets) {
                    // Process the packet
                }
            }
        }
    );
    t.IsBackground = true;
    t.Name = "Data Processing Thread";
    t.Start();
}

Это - основная инфраструктура, которую я использую для всей моей связи с сокетом. Это обеспечивает хорошее разделение между получением данных и обработкой тех данных.

Относительно другого вопроса Вы имели, важно помнить с этим подходом, что каждый Пакетный экземпляр не обязательно представляет полное сообщение в контексте Вашего приложения. Пакетный экземпляр мог бы содержать частичное сообщение, единственное сообщение или несколько сообщений, и Ваши сообщения могли бы охватить несколько Пакетных экземпляров. Я обратился, как знать, когда Вы получили полное сообщение в связанном вопросе, Вы отправили здесь.

6
ответ дан 2 December 2019 в 06:27
поделиться
Другие вопросы по тегам:

Похожие вопросы: