Почему итерация по GetConsumingEnumerable() не полностью очищает базовую блокирующую коллекцию

У меня есть количественная и повторяемая проблема с использованием библиотеки параллельных задач, BlockingCollection, ConcurrentQueue& GetConsumingEnumerableпри попытке создать простой конвейер.

В двух словах, добавление записей в коллекцию по умолчанию BlockingCollection(которая внутри полагается на ConcurrentQueue) из одного потока не гарантирует, что они будут извлечены из BlockingCollectionиз другого потока, вызывающего метод GetConsumingEnumerable().

Я создал очень простое приложение Winforms для воспроизведения/симуляции этого, которое просто выводит на экран целые числа.

  • Timer1отвечает за постановку рабочих элементов в очередь...Он использует параллельный словарь с именем _tracker, чтобы знать, что уже добавлено в блокирующий набор.
  • Timer2просто регистрирует состояние счетчика BlockingCollectionи _tracker
  • Кнопка START запускает Paralell.ForEach, который просто перебирает блокирующие коллекции GetConsumingEnumerable()и начинает печатать их во втором списке.
  • Кнопка STOP останавливает Таймер 1, предотвращая добавление новых записей в коллекцию блокировки.
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

Вот последовательность событий:

  • Нажмите «Старт»
  • Таймер 1 тикает, и ListBox1 немедленно обновляется 3 сообщениями (Добавление 0, 1, 2)
  • ListBox2 затем обновляется 3 сообщениями с интервалом в 1 секунду
    • Обработка 0
    • Обработка 1
    • Обработка 2
  • Такты Timer1 и ListBox1 немедленно обновляются 3 сообщениями (Добавление 3, 4, 5)
  • ListBox2 последовательно обновляется 2 сообщениями с интервалом в 1 секунду
    • Обработка 3
    • Обработка 4
    • Обработка 5не распечатывается... кажется, что она "пропала"
  • Нажмите STOP, чтобы предотвратить добавление новых сообщений таймером 1
  • Подождите ... «Обработка 5» по-прежнему не отображается.

Missing Entry

Вы можете видеть, что параллельный словарь все еще отслеживает, что 1 элемент еще не был обработан и впоследствии удален из _tracker

Если я снова нажму «Пуск», то timer1 начинает добавлять еще 3 записи, и цикл Parallel возвращается к жизни, печатая 5, 6, 7 и 8.

Entry returned after subsequent items shoved in behind it

Я совершенно не понимаю, почему это происходит.Вызов start снова, очевидно, вызывает новую задачу, которая вызывает Paralell foreach и повторно выполняет GetConsumingEnumerable(), который волшебным образом находит отсутствующую запись... I

Почему BlockingCollection.GetConsumingEnumerable()не гарантирует для перебора каждого элемента, добавленного в коллекцию.

Почему последующее добавление новых записей приводит к «отклеиванию» и продолжению обработки?

17
задан Gennady Vanin Геннадий Ванин 6 April 2013 в 03:51
поделиться