Почему асинхронные дейтаграммы UDP Haskell Server имеют потерю пакетов?

Я отправляю простые UDP-пакеты на этот сервер Haskell. В качестве источника пакетов я использую простой текстовый файл, сгенерированный "aspell -l en dump master". Однако любой список из более чем 120 000 сообщений должен работать. Если я запускаю потребителя и производителя одновременно, я не теряю пакеты. Однако я хочу иметь возможность имитировать очень занятого потребителя. Если я ввожу threadDelay на 20 секунд перед запуском потребителя, я получаю потерю пакетов. Для меня это противоречит интуиции, потому что я делаю меньше со стандартным выводом и дисковым вводом-выводом, когда откладываю потребление. Кто-нибудь может объяснить, почему я получаю потери с отложенной версией? Как я могу лучше управлять сокетом и TChan, чтобы не было никаких потерь (просто более высокое использование памяти), когда мой потребитель очень занят?

import Control.Monad (forever)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (writeTChan, readTChan, atomically)
import Control.Concurrent.STM.TChan
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Data.ByteString hiding(putStrLn, head)
import qualified Data.ByteString.Char8 as Char8 (putStrLn, putStr)
import System.IO

main :: IO ()
main = withSocketsDo $  do
    hSetBuffering stdout NoBuffering
    addrinfos <- getAddrInfo
                 (Just (defaultHints {addrFlags = [AI_PASSIVE]}))
                 Nothing (Just "2000")
    let serveraddr = head addrinfos
    sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
    bindSocket sock (addrAddress serveraddr)
    chan <- newTChanIO
    forkIO(producer chan sock)
    -- Uncomment the threadDelay below to see lossy version
    -- threadDelay (1000000 * 20)
    forkIO(consumer chan)
    forever $ threadDelay (1000000 * 60)

producer :: TChan ByteString -> Socket -> IO ()
producer chan sock = forever $ do
    (msg) <- recv sock 256
    atomically $ writeTChan chan msg

consumer :: TChan ByteString -> IO ()
consumer chan = forever $ do
    msg <- atomically $ readTChan chan
    Char8.putStr msg
6
задан rrmckinley 30 May 2012 в 20:44
поделиться