Существует ли способ использовать Библиотеку параллели задачи (TPL) с SQLDataReader?

Мне нравится простота Параллели. Для и Параллель. Методы расширения ForEach в TPL. Я задавался вопросом, был ли способ использовать в своих интересах что-то подобное или даже с немного более усовершенствованными Задачами.

Ниже типичное использование для SqlDataReader, и я задавался вопросом, было ли возможно и раз так как заменить цикл с условием продолжения ниже чем-то в TPL. Поскольку читатель не может обеспечить, постоянное число повторений Для дополнительного метода не возможно, который оставляет контакт с Задачами, которые я собрал бы. Я надеялся, что кто-то уже, возможно, занялся этим и разработал некоторый do's и Дон с ADO.net.

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}
20
задан BlackWasp 22 July 2012 в 14:11
поделиться

2 ответа

Вы почти у цели. Оберните код, который вы разместили в функции, этой подписью:

IEnumerable<IDataRecord> MyQuery()

, а затем замените свой код // Сделайте что-нибудь с Reader следующим образом:

yield return reader;

Теперь у вас есть что-то, что работает в одном потоке. К сожалению, когда вы читаете результаты запроса, он каждый раз возвращает ссылку на тот же объект, и этот объект просто мутирует себя для каждой итерации. Это означает, что если вы попытаетесь запустить его параллельно, вы получите действительно странные результаты, поскольку параллельное чтение изменяет объект, используемый в разных потоках. Вам нужен код, чтобы взять копию записи для отправки в параллельный цикл.

На данном этапе, однако, мне нравится пропускать лишнюю копию записи и сразу переходить к классу со строгой типизацией. Более того, мне нравится использовать для этого общий метод:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}

Предполагая, что ваши фабричные методы создают копию, как ожидалось, этот код должен быть безопасным для использования в цикле Parallel.ForEach. Вызов метода будет выглядеть примерно так (при условии, что это класс Employee со статическим фабричным методом с именем «Create»):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());

Важное обновление:
Я не так уверен в этом коде, как когда-то. Отдельный поток все еще может изменять читателя, пока другой поток делает его копию. Я мог бы заблокировать это, но меня также беспокоит, что другой поток может вызвать update читателя после того, как оригинал сам вызвал Read (), но до того, как он начнет делать копию. Следовательно, критическая секция здесь состоит из всего цикла while ... и на этом этапе вы снова возвращаетесь к однопоточности.Я ожидаю, что есть способ изменить этот код, чтобы он работал должным образом в многопоточных сценариях, но это потребует дополнительного изучения.

19
ответ дан 29 November 2019 в 23:36
поделиться

Вам будет сложно напрямую заменить этот цикл while. SqlDataReader не является потокобезопасный класс, поэтому вы не можете использовать его напрямую из нескольких потоков.

При этом вы потенциально можете обработать данные, которые вы читаете, используя TPL. Здесь есть несколько вариантов. Самым простым может быть создание собственной реализации IEnumerable , которая работает с устройством чтения и возвращает класс или структуру, содержащую ваши данные. Затем вы можете использовать PLINQ или оператор Parallel.ForEach для параллельной обработки данных:

public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}

Если у вас есть этот метод, вы можете обрабатывать его напрямую, через PLINQ или TPL:

Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});

Или:

this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});
25
ответ дан 29 November 2019 в 23:36
поделиться
Другие вопросы по тегам:

Похожие вопросы: