Это то, что я использую и отлично работаю. Он дополнительно поддерживает обработку исключений ошибок и имеет режим отладки, который значительно облегчает отслеживание вещей вниз
public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false)
{
var exceptions = new ConcurrentQueue<Exception>();
if (debugMode)
{
foreach (var item in items)
{
try
{
action(item);
}
// Store the exception and continue with the loop.
catch (Exception e)
{
exceptions.Enqueue(e);
}
}
}
else
{
var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() =>
{
while (partition.MoveNext())
{
try
{
action(partition.Current);
}
// Store the exception and continue with the loop.
catch (Exception e)
{
exceptions.Enqueue(e);
}
}
}));
Task.WaitAll(partitions.ToArray());
}
return exceptions;
}
. Вы используете его так, как показано ниже, где db - это исходный DbContext и db.CreateInstance ( ) создает новый экземпляр, используя ту же строку соединения.
var batch = db.Set<SomeListToIterate>().ToList();
var exceptions = batch.Parallel((item) =>
{
using (var batchDb = db.CreateInstance())
{
var batchTime = batchDb.GetDBTime();
var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList();
//do stuff to someData
item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called
batchDb.SaveChanges();
}
});
if (exceptions.Count > 0)
{
logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions));
throw new AggregateException(exceptions); //optionally throw an exception
}
db.SaveChanges(); //save the item modifications