Хотя код, о котором я буду говорить здесь, я записал в F#, он основан на.NET 4 платформы, не конкретно в зависимости от любой особенности F# (по крайней мере, это кажется так!).
У меня есть некоторые части данных по моему диску, который я должен обновить из сети, сохранив последнюю версию к диску:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
Проблема - это к loadAndSaveAndUpdate
все мои данные, я должен был бы много раз выполнять функцию:
{1 .. 5000} |> loadAndSaveAndUpdate
Каждый шаг сделал бы
Разве не было бы хорошо сделать это параллельно, до некоторой степени? К сожалению, ни одно из моего чтения и парсинга функций не является "async-workflows-ready".
Первая вещь, которую я сделал, состояла в том, чтобы настроить a Task[]
и запустите их всех:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
Затем я поразил CTRL+ESC только для наблюдения, сколько потоков он использовал. 15, 17..., 35..., 170..., пока не уничтожено приложение! Что-то шло не так, как надо.
Я сделал почти то же самое, но использование Parallel.ForEach(...)
и результатами было то же: партии и партии и много потоков.
Затем я решил запуститься только n
потоки, Task.WaitAll(of them)
, затем другой n
, до больше не было доступных задач.
Это работает, но проблема - это, когда она закончила обрабатывать, скажем, n-1
задачи, это будет ожидать, ожидать, ожидать проклятой последней Задачи, которые настаивают на том, чтобы блокироваться из-за большого количества сетевой задержки. Это не хорошо!
Так, как Вы принялись бы за решение этой проблемы? Я ценил бы для просмотра различных решений, включая любой Асинхронные Рабочие процессы (и в этом случае как адаптировать мои неасинхронные функции), Параллельные Расширения, странные параллельные шаблоны, и т.д.
Спасибо.
Вы уверены, что ваши индивидуальные задания выполняются своевременно? Я считаю, что оба класса Parallel.ForEach
и Task
уже используют нитевой пул .NET. Как правило, задачи должны быть недолговечными рабочими элементами, в этом случае нитевой пул породит только небольшое количество реальных потоков, но если ваши задачи не прогрессируют и есть другие задачи, стоящие в очереди, то количество используемых потоков будет неуклонно увеличиваться до максимума (который по умолчанию составляет 250/процессор в .NET 2.0 SP1, но отличается в разных версиях фреймворка). Стоит также отметить, что (по крайней мере, в .NET 2.0 SP1) создание новых потоков дросселируется до 2-х новых потоков в секунду, поэтому увеличение количества потоков свидетельствует о том, что задачи не выполняются в течение короткого промежутка времени (поэтому может быть не совсем верно повесить вину на Parallel.ForEach
).
Я думаю, что предложение Брайана использовать async
workflows является хорошим, особенно если источником долгоживущих задач является IO, так как async
вернет ваши потоки в потоковую пул до тех пор, пока IO не завершится. Другой вариант - просто признать, что ваши задачи не завершаются быстро, и позволить порождать множество потоков (что в некоторой степени можно контролировать с помощью System.Threading.ThreadPool.SetMaxThreads
) - в зависимости от вашей ситуации, может быть не так уж и сложно, что вы используете много потоков.
Использование "асинхронизации" позволит вам выполнять работу, связанную с вводом/выводом, без прожигания потоков, в то время как различные вызовы ввода/вывода находятся "в море", так что это было бы моим первым предложением. Должно быть просто преобразовать код в асинхронный, обычно вдоль строк
async{...}
, добавить return
, где необходимо Async. FromBeginEnd
let r = Foo()
to let! r = AsyncFoo()
Async.Parallel
to convert the 5000 async objects into a single Async that run in parallelДля этого существуют различные руководства; одно из них - here.
.Вы всегда можете использовать ThreadPool
.
http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx
в основном:
QueueUserWorkItem(WaitCallback)
ParallelOptions.MaxDegreeOfParallelism ограничивает количество параллельных операций, выполняемых вызовами метода Parallel