Как заставить сокет UDP заменять старые сообщения (еще не recv () 'd) при поступлении новых?

Сначала немного контекста, чтобы объяснить, почему я нахожусь на маршруте "UDP-сэмплирования":
Я хотел бы попробовать данные, полученные с высокой скоростью в течение неизвестного периода времени. Данные, которые я хочу получить, находятся на другом компьютере, а не на том, который потребляет данные. У меня есть выделенное соединение Ethernet между ними, поэтому пропускная способность не является проблемой. У меня проблема в том, что машина, потребляющая данные, работает намного медленнее, чем та, которая их производит. Дополнительным ограничением является то, что хотя все в порядке, что я не получаю все сэмплы (это просто сэмплы), обязательно я получу последний последний . производитель данных отправляет дейтаграмму UDP для каждого созданного образца и позволяет потребителю данных пытаться получить образцы, которые он может, и позволяет другим отбрасываться уровнем сокетов, когда сокет UDP заполнен. Проблема с этим решением состоит в том, что, когда прибывают новые дейтаграммы UDP, и сокет полон, это новые дейтаграммы, которые удаляются, а не старые . Поэтому я не гарантирую, что последний получится!

Мой вопрос: есть ли способ заставить сокет UDP заменять старые дейтаграммы при поступлении новых?

В настоящее время получатель является машиной Linux, но это может измениться в предпочтение другой Unix-подобной ОС в будущем (Windows может быть возможной, поскольку она реализует сокеты BSD, но менее вероятно)
Идеальное решение будет использовать широко распространенные механизмы (например, setsockopt () s).

PS: я думал о других решениях, но они более сложны (включают в себя тяжелую модификацию отправителя), поэтому я хотел бы сначала иметь определенный Статус о целесообразности того, что я прошу! :)

Обновление: - Я знаю, что ОС на принимающей машине может обрабатывать сетевую нагрузку + повторную сборку трафика, генерируемого отправителем. Просто его поведение по умолчанию - отбрасывать новые дейтаграммы, когда буфер сокета заполнен. И из-за времени обработки в процессе получения, я знаю, что он будет заполнен, что бы я ни делал (напрасно тратить половину памяти в буфере сокета:)).
- Я действительно хотел бы избежать того, чтобы вспомогательный процесс делал то, что могла бы сделать ОС во время диспетчеризации пакетов, и тратил бы впустую ресурсы, просто копируя сообщения в SHM.
- Проблема, с которой я сталкиваюсь при изменении отправителя, состоит в том, что код, к которому у меня есть доступ, является просто функцией PleaseSendThisData (), она не знает, что это может быть последний раз, когда она вызывается раньше, поэтому я не увидеть любые выполнимые трюки на этом конце ... но я открыт для предложений! :)

Если на самом деле нет способа изменить поведение приема UDP в BSD-сокете, тогда хорошо ... просто скажите, я готов принять эту ужасную правду и начну работать над решением "вспомогательного процесса" когда я вернусь к нему:)

6
задан Nawak 11 August 2010 в 17:42
поделиться

5 ответов

Это будет сложно получить полностью правильно только на стороне слушателя, поскольку он может фактически пропустить последний пакет в микросхеме сетевого интерфейса, что не позволит вашей программе когда-либо увидеть его.

UDP-код операционной системы был бы лучшим местом, чтобы попытаться справиться с этим, поскольку он будет получать новые пакеты, даже если он решит отбросить их, потому что у него уже слишком много очереди. Затем он может принять решение отказаться от старого или отказаться от нового, но я не знаю, как сказать ему, что это именно то, что вы хотели бы, чтобы он сделал.

Вы можете попытаться справиться с этим на получателе, имея одну программу или поток, который всегда пытается прочитать самый новый пакет, а другой, который всегда пытается получить этот самый новый пакет. Как это сделать, будет зависеть от того, делали ли вы это как две отдельные программы или как два потока.

В качестве потоков вам понадобится мьютекс (семафор или что-то в этом роде) для защиты указателя (или ссылки) на структуру, используемую для хранения 1 полезной нагрузки UDP и всего, что вы хотите там (размер, IP-адрес отправителя, порт отправителя, отметка времени и т. д.).

Поток, который фактически считывает пакеты из сокета, будет хранить данные пакета в структуре, получить мьютекс, защищающий этот указатель, заменить текущий указатель на указатель на только что созданную структуру, освободить мьютекс, передать сигнал процессору. поток, что он должен что-то сделать, а затем очистить структуру, на которую он только что получил указатель, и использовать ее для удержания следующего входящего пакета.

Поток, который фактически обработал полезные данные пакета, должен дождаться сигнала от другого потока и / или периодически (примерно 500 мс, вероятно, является хорошей отправной точкой для этого, но вы решаете) и получить мьютекс, поменяв его указатель на структура полезной нагрузки UDP с той, которая есть, освободите мьютекс, а затем, если в структуре есть какие-либо пакетные данные, она должна обработать их и затем дождаться следующего сигнала. Если у него нет данных, он должен просто продолжить и дождаться следующего сигнала.

Поток процессора, вероятно, должен работать с более низким приоритетом, чем слушатель UDP, чтобы слушатель с меньшей вероятностью когда-либо пропустил пакет. При обработке последнего пакета (действительно важного для вас) процессор не будет прерван, потому что нет новых пакетов, которые слушатель мог бы услышать.

Вы можете расширить это, используя очередь, а не просто один указатель в качестве места обмена для двух потоков. Единственный указатель - это просто очередь длиной 1, и ее очень легко обработать.

Вы также можете расширить это, попытавшись заставить поток слушателя определять наличие нескольких ожидающих пакетов и фактически помещать только последний из них в очередь для потока процессора. То, как вы это делаете, зависит от платформы, но если вы используете * nix, тогда он должен вернуть 0 для сокетов, ничего не ожидающих:

while (keep_doing_this()) {
    ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len); 
    // this could have been recv or recvfrom
    if (len < 0) {
        error();
    }
    int sz;
    int rc = ioctl(udp_socket_fd, FIONREAD, &sz);
    if (rc < 0) {
        error();
    }
    if (!sz) {
        // There aren't any more packets ready, so queue up the one we got
        my_udp_packet->current_len = len;

        my_udp_packet = swap_udp_packet(my_ucp_packet);
        /* swap_udp_packet is code you would have to write to implement what I talked
           about above. */

        tgkill(this_group, procesor_thread_tid, SIGUSR1);
    } else if (sz > my_udp_packet->buf_len) {
        /* You could resize the buffer for the packet payload here if it is too small.*/
    }
}

udp_packet должен быть выделен для каждого потока, а также 1 для указателя подкачки. Если вы используете очередь для обмена, тогда у вас должно быть достаточно udp_packets для каждой позиции в очереди - поскольку указатель представляет собой просто очередь длиной 1, ему нужна только 1.

Если вы используете систему POSIX, подумайте о том, чтобы не использовать сигнал реального времени для сигнализации, потому что они выстраиваются в очередь. Использование обычного сигнала позволит вам относиться к многократному сигналу так же, как к сигналу только один раз, пока сигнал не будет обработан, пока сигналы реального времени не выстраиваются в очередь. Периодическое пробуждение для проверки очереди также позволяет вам обрабатывать возможность поступления последнего сигнала сразу после того, как вы проверили, есть ли у вас новые пакеты, но перед вызовом pause для ожидания сигнала.

2
ответ дан 8 December 2019 в 18:29
поделиться

Просто установите для сокета неблокирующий режим и выполняйте цикл recv () , пока он не вернет <0 с errno == EAGAIN . Затем обработайте последний полученный пакет, промойте и повторите.

5
ответ дан 8 December 2019 в 18:29
поделиться

Я почти уверен, что это доказуемая неразрешимая проблема, тесно связанная с проблемой двух армий .

Я могу придумать грязное решение: установить TCP-соединение по боковой полосе "управления", которое переносит последний пакет, который также является индикатором "окончания передачи". В противном случае вам нужно использовать один из более общих прагматических средств, указанных в Engineering Approaches .

1
ответ дан 8 December 2019 в 18:29
поделиться

Другая идея состоит в том, чтобы иметь специальный процесс чтения, который ничего не делает, кроме цикла в сокете и считывает входящие пакеты в кольцевой буфер в общей памяти (вам придется беспокоиться о правильном порядке записи). Что-то вроде kfifo . Неблокирование здесь тоже подойдет. Новые данные заменяют старые. Тогда другой процесс (а) всегда будет иметь доступ к последнему блоку в начале очереди и ко всем предыдущим блокам, еще не перезаписанным.

Может быть слишком сложным для простого одностороннего читателя, просто вариант.

1
ответ дан 8 December 2019 в 18:29
поделиться

Я согласен с "caf". Установите сокет в неблокирующий режим.

Всякий раз, когда вы получаете что-то в сокете - читайте в цикле, пока ничего не останется. Затем обработайте последнюю прочитанную дейтаграмму.

Только одно замечание: вы должны установить большой системный буфер приема для сокета

int nRcvBufSize = 5*1024*1024; // or whatever you think is ok
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize));
4
ответ дан 8 December 2019 в 18:29
поделиться
Другие вопросы по тегам:

Похожие вопросы: