Как я могу создать постоянный «поток» обработки с использованием TPL в C # 4

Я не уверен, возможно ли следующее, но я хотел бы вызвать ряд действий в параллелле в ограниченном режиме , но сохраняйте поток обработки непрерывным, не возвращаясь к использованию таймеров или циклов цикла / сна.

Пока что у меня получилось, что он загружает большой пакет входных данных из какого-то источника ... а затем обрабатывает их параллельно контролируемым образом и зацикливается, как показано ниже.

static void Main(string[] args)
{
    while(true) //Simulate a Timer Elapsing...
    {
        IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};  
        //Simulate querying database queue tables for next batch of entries

        RunAllActions(inputs, 3); //Max 3 at a time.
    }
}

static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
    var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};

    Parallel.ForEach<int>(inputs, options, DoWork);
    //Blocks here until all inputs are processed.
    Console.WriteLine("Batch of Work Done!!!");
}

static void DoWork(int input)
{
    Console.WriteLine("Starting Task {0}", input);
    System.Threading.Thread.Sleep(3000);
    Console.WriteLine("Finishing Task {0}", input);
}

я хотел бы знать, есть ли в TPL конструкция, которую я мог бы использовать, чтобы она всегда работала ... чтобы я мог заменить «Истечение таймера» и «Опрос базы данных» полученным MessageQueue событие.

Ниже приводится приблизительная версия того, чего я хотел бы достичь ... есть другие способы, которыми я могу это сделать, но я хочу знать, встроен ли этот тип паттерна в TPL.

internal class Engine
{
    private MessageQueue mq;
    private Queue<int> myInternalApplicationQueue;

    public Engine()
    {
        //Message Queue to get new task inputs from
        mq = new MessageQueue();
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

        // internal Queue to put them in.
        myInternalApplicationQueue = new Queue<int>();
    }

    void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //On MQ Receive, pop the input in a queue in my app
        int input = (int) e.Message.Body;

        myInternalApplicationQueue.Enqueue(input);
    }

    public void StartWorking()
    {
        //Once this gets called, it doesn't stop... it just keeps processing/watching that queue
        //processing the tasks as fast as it's allowed while the app is running.
        var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
        Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
        //       ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
    }

}
5
задан Eoin Campbell 10 February 2012 в 18:22
поделиться