Как распределенные транзакции ведут себя с многочисленными связями к тому же DB в потоковой среде?

Я пытаюсь определить поведение нескольких соединение с базой данных в распределенной транзакции.

У меня есть длительный процесс, который порождает серию потоков, и каждый поток затем ответственен за соединения с БД руководящих it и такой. Все это работает в области транзакций, и каждый поток включается в список в транзакцию через a DependentTransaction объект.

Когда я пошел для помещения этого процесса параллельно, я столкнулся с несколькими проблемами, а именно, что, кажется, существует своего рода блок, препятствующий тому, чтобы запросы выполнились одновременно на транзакции.

То, что я хотел бы знать, - то, как координатор транзакции обрабатывает запросы от многочисленных связей до того же DB и если даже желательно передать объект соединения через потоки?

Я читал, тот SQL MS только позволяет одно соединение на транзакцию, но я ясно могу создать и инициализировать больше чем одно соединение с тем же DB в той же транзакции. Я просто не в состоянии выполнить потоки параллельно, не получая “Контекст транзакции, используемый другой сессией” исключение при открытии соединений. Результат состоит в том, что соединения должны ожидать для выполнения вместо того, чтобы работать одновременно и в конце выполнения кода к завершению, но нет никакой чистой прибыли к поточной обработке приложения из-за этой проблемы блокировки.

Код выглядит примерно так.

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

Править

Мои тесты имеют, окончательно показал, что это просто не может быть сделано. Даже если существует больше чем одно соединение, или то же соединение используется, весь запрос s в транзакции или вопросах обрабатывается последовательно.

Возможно, они изменят это поведение в будущем.

6
задан Middletone 11 February 2010 в 22:04
поделиться

1 ответ

Во-первых, вы должны разделить то, что вы читаете здесь и там о транзакциях SQL Server, на два разных случая: локальный и распределенный.

Локальные транзакции SQL :

  • SQL Server позволяет выполнять только один запрос для каждой локальной транзакции.
  • По умолчанию только один сеанс может быть зарегистрирован в локальной транзакции. С помощью процедур sp_getbindtoken и sp_bindsession можно зарегистрировать несколько сеансов в локальной транзакции. Сеансы по-прежнему ограничены только одним выполнением запроса в любое время.
  • При использовании нескольких активных наборов результатов (MARS) один сеанс может выполнять несколько запросов. Все запросы должны быть зарегистрированы в одной локальной транзакции.

Распределенные транзакции :

  • Несколько сеансов могут иметь свою локальную транзакцию, зарегистрированную в одной распределенной транзакции.
  • Каждый сеанс по-прежнему регистрируется в локальной транзакции с учетом всех ограничений, упомянутых выше для локальных транзакций
  • Локальные транзакции, зарегистрированные в распределенной транзакции, подлежат двухфазной фиксации, координируемой распределенной транзакцией
  • Все локальные транзакции на экземпляр, зарегистрированный в распределенной транзакции, по-прежнему независимые локальные транзакции, в первую очередь это означает, что у них есть конфликтующие пространства имен блокировок.

Итак, когда клиент создает .Net TransactionScope и в рамках этой транзакции выполняет несколько запросов на одном сервере, все эти запросы являются локальными транзакциями, зарегистрированными в распределенной транзакции.Простой пример:

class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

Создайте фиктивную тестовую таблицу:

create table test (id int not null identity(1,1) primary key, a varchar(100));

и запустите код в моем примере. Вы увидите, что оба запроса выполняются параллельно, каждый из которых содержит 100 тыс. Строк в таблице, а затем оба фиксируются, когда область транзакции завершена. Таким образом, проблемы, которые вы видите, не связаны ни с SQL Server, ни с TransactionScope, они могут легко справиться с описанным вами сценарием. Более того, код очень прост и понятен, и нет необходимости ни в создании зависимых транзакций, ни в клонировании, ни в продвижении транзакций.

Обновлено

Использование явных потоков и зависимых транзакций:

 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }
9
ответ дан 10 December 2019 в 02:47
поделиться
Другие вопросы по тегам:

Похожие вопросы: