Как мне написать функцию расширения с поддержкой потоков для PLINQ?

Кто-нибудь знает, как написать функцию расширения, возвращающую ParallelQuery в PLINQ?

В частности, у меня есть следующая проблема: я хочу выполнить преобразование в запросе PLINQ, для которого требуется механизм, создание которого требует больших затрат и к которому нельзя обращаться одновременно.

Я мог бы сделать следующее:

var result = source.AsParallel ().Select ( (i) => { var e = new Engine (); return e.Process(i); } )

Здесь движок создается один раз для каждого предмета, что слишком дорого.

Я хочу, чтобы движок создавался один раз для каждого потока.

С помощью Aggregate я могу приблизиться к тому, что хочу, с чем-то вроде

// helper class: engine to use plus list of results obtained in thread so far
class EngineAndResults {
   public Engine engine = null;
   public IEnumerable<ResultType> results;
}

var result = source.AsParallel ().Aggregate (

   // done once per block of items (=thread),
   // returning an empty list, but a new engine
   () => new EngineAndList () {
       engine = new Engine (),
       results = Enumerable.Empty<ResultType> ()
   },

   // we process a new item and put it to the thread-local list,
   // preserving the engine for further use
   (engineAndResults, item) => new EngineAndResults () {
       engine = engineAndResults.engine,
       results = Enumerable.Concat (
           engineAndResults.results,
           new ResultType [] { engineAndResults.engine.Process (item) }
       )
   },

   // tell linq how to aggregate across threads
   (engineAndResults1, engineAndResults2) => new EngineAndResults () {
       engine = engineAndResults1.engine,
       results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results)
   },

   // after all aggregations, how do we come to the result?
   engineAndResults => engineAndResults.results
);

Как видите, я неправильно использую аккумулятор для переноса движка на поток. Проблема здесь в том, что PLINQ в конце концов агрегирует результаты в один IEnumerable, что приводит к синхронизации потоков. Это не очень хорошо, если я хочу впоследствии добавить еще одно расширение PLINQ.

Я был бы признателен за что-то вроде

   var result = source.AsParallel ()
                  .SelectWithThreadwiseInitWhichIAmLookingFor (
                       () => new Engine (),
                       (engine, item) => engine.Process (item)
              )

Кто-нибудь знает, как этого добиться?

8
задан JohnB 22 June 2012 в 13:11
поделиться