Много UDP-запросов, потерянных на UDP-сервере с Netty

Я написал простой UDP-сервер с Netty, который просто распечатывает в журналах полученные сообщения (фреймы). Для этого я создал простой декодер декодера кадров и простой обработчик сообщений. У меня также есть клиент, который может отправлять несколько запросов последовательно и/или параллельно.

Когда я настраиваю свой клиентский тестер на отправку, например, нескольких сотен запросов последовательно с небольшой задержкой между ними, мой сервер, написанный с помощью Netty, получает их все правильно. Но в данный момент я увеличиваю количество одновременных запросов в моем клиенте (например, 100) в сочетании с последовательными и несколькими повторами, мой сервер начинает терять много запросов. Например, когда я отправляю 50000 запросов, мой сервер получает около 49000 только при использовании простого ChannelHandler, который распечатывает полученное сообщение.

И когда я добавляю простой декодер кадров (который распечатывает кадр и копирует его в другой буфер) перед этим обработчиком, сервер обрабатывает только половину запросов!!

Я заметил, что независимо от количества рабочих процессов, которые я указываю для созданной NioDatagramChannelFactory, всегда есть один и только один поток, который обрабатывает запросы (я использую рекомендуемый Executors.newCachedThreadPool() в качестве другого параметра).

Я также создал еще один похожий простой UDP-сервер на основе DatagramSocket, поставляемого с JDK, и он отлично обрабатывает все запросы с 0 (нулем) потерянным!! Когда я отправляю 50000 запросов в своем клиенте (например, с 1000 потоков), я получаю 50000 запросов на своем сервере.

Я делаю что-то неправильно при настройке своего UDP-сервера с помощью Netty? А может Netty просто не рассчитана на такую ​​нагрузку?? Почему данный кэшированный пул потоков использует только один поток (я заметил, что только один поток и всегда один и тот же используется при просмотре JMX jconsole и при проверке имени потока в выходных журналах)? Я думаю, что если бы больше потоков использовалось, как ожидалось, сервер мог бы легко справиться с такой нагрузкой, потому что я могу сделать это без каких-либо проблем, не используя Netty!

См. мой код инициализации ниже:

...

lChannelfactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool(), nbrWorkers );
lBootstrap = new ConnectionlessBootstrap( lChannelfactory );

lBootstrap.setPipelineFactory( new ChannelPipelineFactory() {
    @Override
    public ChannelPipeline getPipeline()
    {
        ChannelPipeline lChannelPipeline = Channels.pipeline();
        lChannelPipeline.addLast( "Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder( null ) );            
        lChannelPipeline.addLast( "Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler( lOuterFrameStatsCollector ) );            
        return lChannelPipeline;
    }
} );

bindChannel = lBootstrap.bind( socketAddress );

...

И содержимое метода decode() в моем декодере:

protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception
{
    ChannelBuffer lDuplicatedChannelBuffer = null;
    sLogger.debug( "Decode method called." );

    if ( iBuffer.readableBytes() < 8 ) return null;
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( iBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( iBuffer.array(), iBuffer.readableBytes() ) );                     
        lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer( iBuffer.readableBytes() );            
        iBuffer.readBytes( lDuplicatedChannelBuffer );
    }

    return lDuplicatedChannelBuffer;
}

И содержимое метода messageReceived() в моем обработчике:

public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception
{
    ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage();
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( lMessageBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( lMessageBuffer.array(), lMessageBuffer.readableBytes() ) );            
        lMessageBuffer.discardReadBytes();
    }
}
6
задан dzikoysk 4 August 2017 в 07:19
поделиться