Я пытаюсь определить поведение нескольких соединение с базой данных в распределенной транзакции.
У меня есть длительный процесс, который порождает серию потоков, и каждый поток затем ответственен за соединения с БД руководящих it и такой. Все это работает в области транзакций, и каждый поток включается в список в транзакцию через a DependentTransaction
объект.
Когда я пошел для помещения этого процесса параллельно, я столкнулся с несколькими проблемами, а именно, что, кажется, существует своего рода блок, препятствующий тому, чтобы запросы выполнились одновременно на транзакции.
То, что я хотел бы знать, - то, как координатор транзакции обрабатывает запросы от многочисленных связей до того же DB и если даже желательно передать объект соединения через потоки?
Я читал, тот SQL MS только позволяет одно соединение на транзакцию, но я ясно могу создать и инициализировать больше чем одно соединение с тем же DB в той же транзакции. Я просто не в состоянии выполнить потоки параллельно, не получая “Контекст транзакции, используемый другой сессией” исключение при открытии соединений. Результат состоит в том, что соединения должны ожидать для выполнения вместо того, чтобы работать одновременно и в конце выполнения кода к завершению, но нет никакой чистой прибыли к поточной обработке приложения из-за этой проблемы блокировки.
Код выглядит примерно так.
Sub StartThreads()
Using Scope As New TransactionScope
Dim TL(100) As Tasks.Task
Dim dTx As DependentTransaction
For i As Int32 = 0 To 100
Dim A(1) As Object
dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
'A(0) = some_other_data
A(1) = dTx 'the Dependent Transaction
TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
Next
Tasks.Task.WaitAll(TL) 'Wait for threads to finish
Scope.Complete()
End Using
End Sub
Dim TransLock As New Object
Sub ProcessData(ByVal A As Object)
Dim DTX As DependentTransaction = A(1)
Dim Trans As Transactions.TransactionScope
Dim I As Int32
Do While True
Try
SyncLock (TransLock)
Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
End SyncLock
Exit Do
Catch ex As TransactionAbortedException
If ex.ToString.Contains("Failure while attempting to promote transaction") Then
ElseIf ex.Message = "The transaction has aborted." Then
Throw New Exception(ex.ToString)
Exit Sub
End If
I += 1
If I > 5 Then
Throw New Exception(ex.ToString)
End If
Catch ex As Exception
End Try
Thread.Sleep(10)
Loop
Using Trans
Using DALS As New DAC.DALScope
Do While True
Try
SyncLock (TransLock)
'This opens two connection to the same DB for later use.
DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
End SyncLock
Exit Do
Catch ex As Exception
'This is usually where I find the bottleneck
'"Transaction context in use by another session" is the exception that I get
Thread.Sleep(100)
End Try
Loop
'*****************
'Do some work here
'*****************
Trans.Complete()
End Using
End Using
DTX.Complete()
End Sub
Править
Мои тесты имеют, окончательно показал, что это просто не может быть сделано. Даже если существует больше чем одно соединение, или то же соединение используется, весь запрос s в транзакции или вопросах обрабатывается последовательно.
Возможно, они изменят это поведение в будущем.
Во-первых, вы должны разделить то, что вы читаете здесь и там о транзакциях SQL Server, на два разных случая: локальный и распределенный.
Локальные транзакции SQL :
Распределенные транзакции :
Итак, когда клиент создает .Net TransactionScope и в рамках этой транзакции выполняет несколько запросов на одном сервере, все эти запросы являются локальными транзакциями, зарегистрированными в распределенной транзакции.Простой пример:
class Program
{
static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
insert into test (a) values (replicate('a',100));
set @i = @i+1;
end";
static void Main(string[] args)
{
try
{
TransactionOptions to = new TransactionOptions();
to.IsolationLevel = IsolationLevel.ReadCommitted;
using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
{
using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
{
connA.Open();
using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
{
connB.Open();
SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
SqlCommand cmdB = new SqlCommand(sqlBatch, connB);
IAsyncResult arA = cmdA.BeginExecuteNonQuery();
IAsyncResult arB = cmdB.BeginExecuteNonQuery();
WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });
cmdA.EndExecuteNonQuery(arA);
cmdB.EndExecuteNonQuery(arB);
}
}
scp.Complete();
}
}
catch (Exception e)
{
Console.Error.Write(e);
}
}
}
Создайте фиктивную тестовую таблицу:
create table test (id int not null identity(1,1) primary key, a varchar(100));
и запустите код в моем примере. Вы увидите, что оба запроса выполняются параллельно, каждый из которых содержит 100 тыс. Строк в таблице, а затем оба фиксируются, когда область транзакции завершена. Таким образом, проблемы, которые вы видите, не связаны ни с SQL Server, ни с TransactionScope, они могут легко справиться с описанным вами сценарием. Более того, код очень прост и понятен, и нет необходимости ни в создании зависимых транзакций, ни в клонировании, ни в продвижении транзакций.
Обновлено
Использование явных потоков и зависимых транзакций:
private class ThreadState
{
public DependentTransaction Transaction {get; set;}
public EventWaitHandle Done {get; set;}
public SqlConnection Connection { get; set; }
}
static void Main(string[] args)
{
try
{
TransactionOptions to = new TransactionOptions();
to.IsolationLevel = IsolationLevel.ReadCommitted;
using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
{
ThreadState stateA = new ThreadState
{
Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
Done = new AutoResetEvent(false),
Connection = new SqlConnection(Settings.Default.connString),
};
stateA.Connection.Open();
ThreadState stateB = new ThreadState
{
Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
Done = new AutoResetEvent(false),
Connection = new SqlConnection(Settings.Default.connString),
};
stateB.Connection.Open();
ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);
WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });
scp.Complete();
//TODO: dispose the open connections
}
}
catch (Exception e)
{
Console.Error.Write(e);
}
}
private static void Worker(object args)
{
Debug.Assert(args is ThreadState);
ThreadState state = (ThreadState) args;
try
{
using (TransactionScope scp = new TransactionScope(state.Transaction))
{
SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
cmd.ExecuteNonQuery();
scp.Complete();
}
state.Transaction.Complete();
}
catch (Exception e)
{
Console.Error.WriteLine(e);
state.Transaction.Rollback();
}
finally
{
state.Done.Set();
}
}