У меня есть количественная и повторяемая проблема с использованием библиотеки параллельных задач, BlockingCollection
, ConcurrentQueue
& GetConsumingEnumerable
при попытке создать простой конвейер.
В двух словах, добавление записей в коллекцию по умолчанию BlockingCollection
(которая внутри полагается на ConcurrentQueue
) из одного потока не гарантирует, что они будут извлечены из BlockingCollection
из другого потока, вызывающего метод GetConsumingEnumerable()
.
Я создал очень простое приложение Winforms для воспроизведения/симуляции этого, которое просто выводит на экран целые числа.
Timer1
отвечает за постановку рабочих элементов в очередь...Он использует параллельный словарь с именем _tracker
, чтобы знать, что уже добавлено в блокирующий набор.Timer2
просто регистрирует состояние счетчика BlockingCollection
и _tracker
Paralell.ForEach
, который просто перебирает блокирующие коллекции GetConsumingEnumerable()
и начинает печатать их во втором списке.Таймер 1
, предотвращая добавление новых записей в коллекцию блокировки.public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
Вот последовательность событий:
Вы можете видеть, что параллельный словарь все еще отслеживает, что 1 элемент еще не был обработан и впоследствии удален из _tracker
Если я снова нажму «Пуск», то timer1 начинает добавлять еще 3 записи, и цикл Parallel возвращается к жизни, печатая 5, 6, 7 и 8.
Я совершенно не понимаю, почему это происходит.Вызов start снова, очевидно, вызывает новую задачу, которая вызывает Paralell foreach и повторно выполняет GetConsumingEnumerable(), который волшебным образом находит отсутствующую запись... I
Почему BlockingCollection.GetConsumingEnumerable()
не гарантирует для перебора каждого элемента, добавленного в коллекцию.
Почему последующее добавление новых записей приводит к «отклеиванию» и продолжению обработки?