Параллель. Foreach, порождающий слишком много потоков

Проблема

Хотя код, о котором я буду говорить здесь, я записал в F#, он основан на.NET 4 платформы, не конкретно в зависимости от любой особенности F# (по крайней мере, это кажется так!).

У меня есть некоторые части данных по моему диску, который я должен обновить из сети, сохранив последнюю версию к диску:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

Проблема - это к loadAndSaveAndUpdate все мои данные, я должен был бы много раз выполнять функцию:

{1 .. 5000} |> loadAndSaveAndUpdate

Каждый шаг сделал бы

  • некоторый диск IO,
  • некоторое уплотнение данных,
  • некоторая сеть IO (с возможностью большого количества задержки),
  • больше уплотнения данных,
  • и некоторый диск IO.

Разве не было бы хорошо сделать это параллельно, до некоторой степени? К сожалению, ни одно из моего чтения и парсинга функций не является "async-workflows-ready".

Первые (не очень хорошие) решения я придумал

Задачи

Первая вещь, которую я сделал, состояла в том, чтобы настроить a Task[] и запустите их всех:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Затем я поразил CTRL+ESC только для наблюдения, сколько потоков он использовал. 15, 17..., 35..., 170..., пока не уничтожено приложение! Что-то шло не так, как надо.

Параллель

Я сделал почти то же самое, но использование Parallel.ForEach(...) и результатами было то же: партии и партии и много потоков.

Решение, которое работает... отчасти

Затем я решил запуститься только n потоки, Task.WaitAll(of them), затем другой n, до больше не было доступных задач.

Это работает, но проблема - это, когда она закончила обрабатывать, скажем, n-1 задачи, это будет ожидать, ожидать, ожидать проклятой последней Задачи, которые настаивают на том, чтобы блокироваться из-за большого количества сетевой задержки. Это не хорошо!

Так, как Вы принялись бы за решение этой проблемы? Я ценил бы для просмотра различных решений, включая любой Асинхронные Рабочие процессы (и в этом случае как адаптировать мои неасинхронные функции), Параллельные Расширения, странные параллельные шаблоны, и т.д.

Спасибо.

13
задан Gennady Vanin Геннадий Ванин 15 April 2013 в 10:50
поделиться

4 ответа

Вы уверены, что ваши индивидуальные задания выполняются своевременно? Я считаю, что оба класса Parallel.ForEach и Task уже используют нитевой пул .NET. Как правило, задачи должны быть недолговечными рабочими элементами, в этом случае нитевой пул породит только небольшое количество реальных потоков, но если ваши задачи не прогрессируют и есть другие задачи, стоящие в очереди, то количество используемых потоков будет неуклонно увеличиваться до максимума (который по умолчанию составляет 250/процессор в .NET 2.0 SP1, но отличается в разных версиях фреймворка). Стоит также отметить, что (по крайней мере, в .NET 2.0 SP1) создание новых потоков дросселируется до 2-х новых потоков в секунду, поэтому увеличение количества потоков свидетельствует о том, что задачи не выполняются в течение короткого промежутка времени (поэтому может быть не совсем верно повесить вину на Parallel.ForEach).

Я думаю, что предложение Брайана использовать async workflows является хорошим, особенно если источником долгоживущих задач является IO, так как async вернет ваши потоки в потоковую пул до тех пор, пока IO не завершится. Другой вариант - просто признать, что ваши задачи не завершаются быстро, и позволить порождать множество потоков (что в некоторой степени можно контролировать с помощью System.Threading.ThreadPool.SetMaxThreads) - в зависимости от вашей ситуации, может быть не так уж и сложно, что вы используете много потоков.

7
ответ дан 1 December 2019 в 20:43
поделиться

Использование "асинхронизации" позволит вам выполнять работу, связанную с вводом/выводом, без прожигания потоков, в то время как различные вызовы ввода/вывода находятся "в море", так что это было бы моим первым предложением. Должно быть просто преобразовать код в асинхронный, обычно вдоль строк

  • обернуть каждое тело функции в async{...}, добавить return, где необходимо
  • создать версии Async для любых примитивов ввода/вывода, которых еще нет в библиотеке с помощью Async. FromBeginEnd
  • Switch calls of the form let r = Foo() to let! r = AsyncFoo()
  • Use Async.Parallel to convert the 5000 async objects into a single Async that run in parallel

Для этого существуют различные руководства; одно из них - here.

.
10
ответ дан 1 December 2019 в 20:43
поделиться

Вы всегда можете использовать ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

в основном:

  1. Создайте пул потоков
  2. Установите максимальное количество потоков
  3. Очередь всех задач с помощью QueueUserWorkItem(WaitCallback)
0
ответ дан 1 December 2019 в 20:43
поделиться

ParallelOptions.MaxDegreeOfParallelism ограничивает количество параллельных операций, выполняемых вызовами метода Parallel

12
ответ дан 1 December 2019 в 20:43
поделиться
Другие вопросы по тегам:

Похожие вопросы: