Как (и если), чтобы записать очереди единственного потребителя, использующей TPL?

Я услышал набор подкастов недавно о TPL в.NET 4.0. Большинство из них описывает фоновые операции как загрузка изображений или выполнение вычисления, с помощью задач так, чтобы работа не вмешивалась в поток GUI.

Большая часть кода я продолжаю работать, имеет больше нескольких-производителей / разновидность единственного потребителя, где объекты работы от многочисленных источников должны быть поставлены в очередь и затем обработаны в порядке. Один пример зарегистрировался бы, где строки журнала от нескольких потоков упорядочены в единственную очередь для возможной записи в файл или базу данных. Все записи из любого единственного источника должны остаться в порядке, и записи с того же момента вовремя должны быть "близки" друг к другу в возможном выводе.

Так несколько потоков или задач или независимо от того, что все вызывает queuer:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

И специализированный рабочий поток обрабатывает очередь:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

Всегда казалось разумным выделить рабочий поток для потребительской стороны этих задач. Я должен записать будущие программы с помощью некоторой конструкции от TPL вместо этого? Какой? Почему?

16
задан abatishchev 23 May 2012 в 12:09
поделиться

3 ответа

Вы можете использовать длительную задачу Task для обработки элементов из BlockingCollection, как предлагает Wilka. Вот пример, который в значительной степени соответствует требованиям ваших приложений. Вы увидите что-то вроде этого:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

Не то, чтобы выходы из A, B, C и D выглядели случайными, потому что они зависят от времени начала потоков, но B всегда появляется перед B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}
13
ответ дан 30 November 2019 в 21:53
поделиться

Я не уверен, что TPL подходит для вашего варианта использования. Насколько я понимаю, основным вариантом использования TPL является разделение одной огромной задачи на несколько более мелких задач, которые можно запускать бок о бок. Например, если у вас большой список, и вы хотите применить одно и то же преобразование к каждому элементу. В этом случае у вас может быть несколько задач, применяющих преобразование к подмножеству списка.

Случай, который вы описываете, мне кажется, не укладывается в эту картину. В вашем случае у вас нет нескольких задач, которые выполняют одно и то же параллельно. У вас есть несколько разных задач, каждая из которых выполняет свою работу (производителей) и одна задача, которая потребляет. Возможно, TPL можно использовать для потребительской части, если вы хотите иметь несколько потребителей, потому что в этом случае каждый потребитель выполняет одну и ту же работу (при условии, что вы найдете логику для обеспечения временной согласованности, которую вы ищете).

Ну, это, конечно, просто мой личный взгляд на эту тему

Живи долго и процветай

3
ответ дан 30 November 2019 в 21:53
поделиться

Похоже, вам пригодится BlockingCollection . Поэтому для приведенного выше кода вы можете использовать что-то вроде (при условии, что _queue является экземпляром BlockingCollection ):

// for your producers 
_queue.Add(some_work);

Выделенный рабочий поток, обрабатывающий очередь:

foreach (var some_work in _queue.GetConsumingEnumerable())
{
    deal_with(some_work);
}

Примечание: когда все ваши продюсеры закончили производство материалов, вам нужно вызвать CompleteAdding () в _queue , иначе ваш потребитель застрянет в ожидании дополнительной работы.

2
ответ дан 30 November 2019 в 21:53
поделиться
Другие вопросы по тегам:

Похожие вопросы: