C# межраспараллеливают коммуникацию

Я хочу, чтобы сотрудничали два потока; производитель и потребитель. потребитель является довольно медленным, и производитель очень быстр и работает в пакетах.

например, потребитель может обработать одно сообщение в 20 секунд, и производитель может создать 10 сообщений через одну секунду, но делает это об однажды в долго, таким образом, потребитель может нагнать.

Я хочу что-то как:

Stream commonStream;
AutoResetEvent commonLock;

void Producer()
{
    while (true)
    {
        magic.BlockUntilMagicAvalible();
        byte[] buffer = magic.Produce();
        commonStream.Write(buffer);
        commonLock.Set();
    }
}

void Consumer()
{
    while(true)
    { 
        commonLock.WaitOne();
        MagicalObject o = binarySerializer.Deserialize(commonStream);
        DoSomething(o);
    }
}
5
задан AbdelAziz AbdelLatef 6 October 2019 в 03:26
поделиться

3 ответа

Я бы прочитал следующие статьи, они описывают вашу проблему. По сути вы не получаете правильную изоляцию для вашей единицы работы.

http://blogs.msdn.com/b/ricom/archive/2006/04/24/582643.aspx http://blogs.msdn.com/b/ricom/archive/2006/04/26/584802.aspx

1
ответ дан 13 December 2019 в 19:20
поделиться

Если у вас .Net 4.0 или выше, вы можете сделать это таким образом, используя BlockingCollection

int maxBufferCap = 500;
BlockingCollection<MagicalObject> Collection 
                           = new BlockingCollection<MagicalObject>(maxBufferCap);
void Producer()
{
    while (magic.HasMoreMagic)
    {
        this.Collection.Add(magic.ProduceMagic());
    }
    this.Collection.CompleteAdding();
}

void Consumer()
{
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable())
    {
        DoSomthing(magicalObject);
    }
}

Строка foreach будет спать, если в буфере нет данных, она также будет автоматически просыпаться, когда что-то добавляется в коллекцию.

Причина, по которой я устанавливаю максимальный размер буфера, заключается в том, что если ваш производитель намного быстрее потребителя, вы можете занять много памяти, поскольку все больше и больше объектов попадает в коллекцию. Установив максимальный размер буфера при создании блокирующей коллекции, когда размер буфера будет достигнут, вызов Add на производителе будет блокироваться до тех пор, пока потребитель не удалит объект из коллекции.

Еще один плюс класса BlockingCollection в том, что он может иметь столько производителей и потребителей, сколько вы хотите, не обязательно соотношение 1:1. Если DoSomthing поддерживает это, вы можете иметь foreach цикл на каждое ядро компьютера (или даже использовать Parallel. ForEach и использовать потребляющий enumerable как источник данных)

void ConsumersInParalell()
{
    //This assumes the method signature of DoSomthing is one of the following:
    //    Action<MagicalObject>
    //    Action<MagicalObject, ParallelLoopState>
    //    Action<MagicalObject, ParallelLoopState, long>
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing);
}
11
ответ дан 13 December 2019 в 19:20
поделиться

Вы можете получить желаемое, используя очередь и таймер. Производитель добавляет значения в очередь и запускает таймер потребителя. Истекшее событие таймера потребителя (которое находится в потоке Threadpool) останавливает таймер, прокручивает очередь до тех пор, пока она не станет пустой, а затем исчезает (без лишних опросов). Производитель может добавлять в очередь, пока потребитель все еще работает.

System.Timers.Timer consumerTimer;
Queue<byte[]> queue = new Queue<byte[]>();

void Producer()
{
    consumerTimer = new System.Timers.Timer(1000);
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed);
    while (true)
    {
        magic.BlockUntilMagicAvailable();
        lock (queue)
        {
            queue.Enqueue(magic.Produce());
            if (!consumerTimer.Enabled)
            {
                consumerTimer.Start();
            }
        }
    }
}

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
    while (true)
    {
        consumerTimer.Stop();
        lock (queue)
        {
            if (queue.Count > 0)
            {
                DoSomething(queue.Dequeue());
            }
            else
            {
                break;
            }
        }
    }
}
0
ответ дан 13 December 2019 в 19:20
поделиться
Другие вопросы по тегам:

Похожие вопросы: