Я хочу, чтобы сотрудничали два потока; производитель и потребитель. потребитель является довольно медленным, и производитель очень быстр и работает в пакетах.
например, потребитель может обработать одно сообщение в 20 секунд, и производитель может создать 10 сообщений через одну секунду, но делает это об однажды в долго, таким образом, потребитель может нагнать.
Я хочу что-то как:
Stream commonStream;
AutoResetEvent commonLock;
void Producer()
{
while (true)
{
magic.BlockUntilMagicAvalible();
byte[] buffer = magic.Produce();
commonStream.Write(buffer);
commonLock.Set();
}
}
void Consumer()
{
while(true)
{
commonLock.WaitOne();
MagicalObject o = binarySerializer.Deserialize(commonStream);
DoSomething(o);
}
}
Я бы прочитал следующие статьи, они описывают вашу проблему. По сути вы не получаете правильную изоляцию для вашей единицы работы.
http://blogs.msdn.com/b/ricom/archive/2006/04/24/582643.aspx http://blogs.msdn.com/b/ricom/archive/2006/04/26/584802.aspx
Если у вас .Net 4.0 или выше, вы можете сделать это таким образом, используя BlockingCollection
int maxBufferCap = 500;
BlockingCollection<MagicalObject> Collection
= new BlockingCollection<MagicalObject>(maxBufferCap);
void Producer()
{
while (magic.HasMoreMagic)
{
this.Collection.Add(magic.ProduceMagic());
}
this.Collection.CompleteAdding();
}
void Consumer()
{
foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable())
{
DoSomthing(magicalObject);
}
}
Строка foreach
будет спать, если в буфере нет данных, она также будет автоматически просыпаться, когда что-то добавляется в коллекцию.
Причина, по которой я устанавливаю максимальный размер буфера, заключается в том, что если ваш производитель намного быстрее потребителя, вы можете занять много памяти, поскольку все больше и больше объектов попадает в коллекцию. Установив максимальный размер буфера при создании блокирующей коллекции, когда размер буфера будет достигнут, вызов Add
на производителе будет блокироваться до тех пор, пока потребитель не удалит объект из коллекции.
Еще один плюс класса BlockingCollection
в том, что он может иметь столько производителей и потребителей, сколько вы хотите, не обязательно соотношение 1:1. Если DoSomthing
поддерживает это, вы можете иметь foreach
цикл на каждое ядро компьютера (или даже использовать Parallel. ForEach
и использовать потребляющий enumerable как источник данных)
void ConsumersInParalell()
{
//This assumes the method signature of DoSomthing is one of the following:
// Action<MagicalObject>
// Action<MagicalObject, ParallelLoopState>
// Action<MagicalObject, ParallelLoopState, long>
Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing);
}
Вы можете получить желаемое, используя очередь и таймер. Производитель добавляет значения в очередь и запускает таймер потребителя. Истекшее событие таймера потребителя (которое находится в потоке Threadpool) останавливает таймер, прокручивает очередь до тех пор, пока она не станет пустой, а затем исчезает (без лишних опросов). Производитель может добавлять в очередь, пока потребитель все еще работает.
System.Timers.Timer consumerTimer;
Queue<byte[]> queue = new Queue<byte[]>();
void Producer()
{
consumerTimer = new System.Timers.Timer(1000);
consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed);
while (true)
{
magic.BlockUntilMagicAvailable();
lock (queue)
{
queue.Enqueue(magic.Produce());
if (!consumerTimer.Enabled)
{
consumerTimer.Start();
}
}
}
}
void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
while (true)
{
consumerTimer.Stop();
lock (queue)
{
if (queue.Count > 0)
{
DoSomething(queue.Dequeue());
}
else
{
break;
}
}
}
}