Совместим ли CorrelationManager.LogicalOperationStack с Parallel.For, Tasks, Threads и т. Д.

Дополнительную информацию см. В этом вопросе:

Как задачи в параллельной библиотеке задач влияют на ActivityID?

В этом вопросе задается вопрос, как задачи влияют на Trace.CorrelationManager.ActivityId . @Greg Samson ответил на свой вопрос с помощью тестовой программы, показывающей, что ActivityId надежен в контексте задач. Программа тестирования устанавливает ActivityId в начале делегата задачи, засыпает для моделирования работы, затем проверяет ActivityId в конце, чтобы убедиться, что это то же значение (т. е. что оно не было изменено другим потоком). Программа работает успешно.

Изучая другие параметры «контекста» для потоковой передачи, задач и параллельных операций (в конечном счете, чтобы обеспечить лучший контекст для ведения журнала), я столкнулся со странной проблемой с Trace.CorrelationManager.LogicalOperationStack (мне все равно было странно). Я скопировал свой «ответ» на его вопрос ниже.

Я думаю, что он адекватно описывает проблему, с которой я столкнулся (Trace.CorrelationManager.LogicalOperationStack, по-видимому, поврежден - или что-то в этом роде - при использовании в контексте Parallel.For, но только если сам Parallel.For заключен в логическую операцию).

Вот мои вопросы:

  1. Должен ли Trace.CorrelationManager. LogicalOperationStack можно использовать с Parallel.For? Если да, то должно ли это иметь значение, если логическая операция уже выполняется с Parallel.For is start?

  2. Существует ли «правильный» способ использования LogicalOperationStack с Parallel.For? Могу ли я по-другому закодировать этот пример программы, чтобы он «работал»? Под "работает" я имею в виду, что LogicalOperationStack всегда имеет ожидаемое количество записей, а сами записи являются ожидаемыми записями.

Я провел дополнительное тестирование с использованием потоков Threads и ThreadPool, но мне пришлось бы вернуться и повторить попытку эти тесты, чтобы увидеть, не сталкивался ли я с аналогичными проблемами.

Я скажу, что действительно кажется, что потоки задач / параллелей и потоки ThreadPool «наследуют» Trace.CorrelationManager.ActivityId и Trace.CorrelationManager. LogicalOperationStack значения из родительского потока. Это ожидается, поскольку эти значения сохраняются в CorrelationManager с использованием метода LogicalSetData CallContext (в отличие от SetData).

Опять же, вернитесь к этому вопросу, чтобы получить исходный контекст для "ответа" ", который я опубликовал ниже:

Как задачи в параллельной библиотеке задач влияют на ActivityID?

См. также этот аналогичный вопрос (на который пока нет ответа) на форуме параллельных расширений Microsoft:

http: // social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9[11110457 provided

[BEGIN PASTE]

Пожалуйста, простите меня за то, что я разместил это как ответ, поскольку это не совсем ответ на ваш вопрос, однако он связан с вашим вопросом, поскольку он касается поведения CorrelationManager и потоков / задач / и т.п. Я искал использование методов CorrelationManager LogicalOperationStack StartLogicalOperation / StopLogicalOperation ) для обеспечения дополнительного контекста в сценариях многопоточности.

Я взял ваш пример и немного изменил его, чтобы добавить возможность выполнять работу параллельно с помощью Parallel.For. Кроме того, я использую StartLogicalOperation / StopLogicalOperation для заключения в скобки (внутри) DoLongRunningWork . По сути, DoLongRunningWork делает что-то подобное каждый раз при запуске:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

Я обнаружил, что если я добавлю эти логические операции в ваш код (более или менее как есть), все логические операции останутся синхронизированными (всегда ожидаемое количество операций в стеке и значения операций в стеке всегда такие, как ожидалось).

В ходе некоторых моих собственных испытаний я обнаружил, что это не всегда так. Стек логических операций «повреждался». Лучшее объяснение, которое я мог придумать, заключается в том, что «слияние» обратной информации CallContext с «родительским» контекстом потока при выходе из «дочернего» потока приводило к тому, что «старая» контекстная информация дочернего потока (логическая операция) становилась « унаследовано "другим дочерним потоком-братом.

Проблема также может быть связана с тем, что Parallel. Ведь очевидно, что использует основной поток (по крайней мере, в коде примера, как написано) как один из «рабочих потоков» (или как там они должны вызываться в параллельном домене). Каждый раз, когда выполняется DoLongRunningWork, новая логическая операция запускается (в начале) и останавливается (в конце) (то есть помещается в LogicalOperationStack и возвращается из него). Если в основном потоке уже действует логическая операция и если DoLongRunningWork выполняется НА ГЛАВНОЙ НИТИ, то запускается новая логическая операция, поэтому LogicalOperationStack основного потока теперь имеет ДВЕ операции. Любые последующие выполнения DoLongRunningWork (пока эта «итерация» DoLongRunningWork выполняется в основном потоке) (по-видимому) унаследуют LogicalOperationStack основного потока (который теперь имеет две операции над ним, а не только одна ожидаемая операция).

Мне потребовалось много времени, чтобы понять, почему поведение LogicalOperationStack отличалось в моем примере от моей модифицированной версии вашего примера. Наконец, я увидел, что в своем коде я заключил всю программу в скобки в виде логической операции, тогда как в моей модифицированной версии вашей тестовой программы я этого не сделал. Подразумевается, что в моей тестовой программе каждый раз, когда выполнялась моя «работа» (аналогично DoLongRunningWork), уже выполнялась логическая операция. В моей модифицированной версии вашей тестовой программы я не заключал в скобки всю программу в логической операции.

Итак, когда я модифицировал вашу тестовую программу, чтобы заключить в скобки всю программу в логической операции, И если я использую Parallel.For, Я столкнулся с той же проблемой.

Используя концептуальную модель выше, это будет выполнено успешно:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

Хотя это в конечном итоге будет подтверждено из-за очевидной рассинхронизации LogicalOperationStack:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Вот мой пример программы. Он похож на ваш тем, что в нем есть метод DoLongRunningWork, который управляет ActivityId, а также LogicalOperationStack. У меня также есть два варианта использования DoLongRunningWork. Один вариант использует задачи, другой - Parallel.For. Каждый вариант также может быть выполнен так, что вся распараллеленная операция заключена в логическую операцию или нет. Итак, всего существует 4 способа выполнить параллельную операцию. Чтобы попробовать каждый из них, просто раскомментируйте нужный метод «Использовать ...», перекомпилируйте и запустите. UseTasks , UseTasks (true) и UseParallelFor должны все выполняться до завершения. UseParallelFor (true) будет утверждать в какой-то момент, потому что LogicalOperationStack не имеет ожидаемого количества записей.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List threadIds = new List();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

Вся эта проблема в том, можно ли использовать LogicalOperationStack с Parallel.For (и / или другим потоком / Конструкции задач) или как это можно использовать, вероятно, заслуживает отдельного вопроса. Может, я поставлю вопрос. А пока мне интересно, есть ли у вас какие-нибудь мысли по этому поводу (или, интересно, рассматривали ли вы возможность использования LogicalOperationStack, поскольку ActivityId кажется безопасным).

[КОНЕЦ ВСТАВКИ]

Есть ли у кого-нибудь мысли по этому поводу?

10
задан Community 23 May 2017 в 12:34
поделиться