Я происхожу из Java, где я отправил бы Runnable
s к ExecutorService
поддержанный пулом потоков. Очень ясно в Java, как установить пределы к размеру пула потоков.
Я интересуюсь использованием агентов Scala, но я неясен о том, как ограничить параллелизм.
Позвольте нам просто сказать, гипотетически, что я создаю веб-сервис, который принимает "задания". Задание отправлено с POST
запросы, и я хочу, чтобы мой сервис ставил в очередь задание, затем сразу возвращаются 202 Accepted
— т.е. задания обрабатываются асинхронно.
Если я использую агентов для обработки заданий в очереди, как я могу ограничить количество одновременных заданий, которые обрабатываются?
Я могу думать о нескольких различных способах приблизиться к этому; я задаюсь вопросом, существует ли общественная лучшая практика, или по крайней мере, некоторые явно установленные подходы, которые являются несколько стандартными в мире Scala.
Один подход, о котором я думал, имеет единственного агента координатора, который управлял бы очередью заданий и обрабатывающими задание агентами; я предполагаю, что это могло использовать простое международное поле для отслеживания, сколько заданий в настоящее время обрабатывается. Я уверен, что был бы некоторый gotchyas с тем подходом, однако, таким как проверка отследить, когда ошибка происходит, чтобы постепенно уменьшить число. Вот почему я задаюсь вопросом, обеспечивает ли Scala уже более простой или более инкапсулированный подход к этому.
BTW, который я пытался задать этому вопросу только что, но я спросил это плохо.
Спасибо!
Вы можете переопределить системные свойства ctors.maxPoolSize
и ctors.corePoolSize
, которые ограничивают размер пула потоков акторов, а затем помещают в пул столько заданий, сколько могут ваши актеры. ручка. Как вы думаете, почему вам нужно ограничивать свою реакцию?
Я бы очень советовал вам взглянуть на Akka, альтернативную реализацию Actor для Scala.
Akka уже имеет интеграцию JAX-RS[1], и вы можете использовать ее в сочетании с LoadBalancer[2] для дросселирования количества действий, которые могут выполняться параллельно:
[1] http://doc.akkasource.org/rest [2] http://github.com/jboner/akka/blob/master/akka-patterns/src/main/scala/Patterns.scala
На самом деле у вас две проблемы.
Первая - держать под контролем пул потоков, используемый акторами. Это можно сделать, установив системное свойство actors.maxPoolSize.
Вторая - неудержимый рост числа задач, которые были переданы в пул. Вы можете беспокоиться или не беспокоиться об этом, однако вполне возможно, что слишком быстрое генерирование большого количества заданий может привести к таким сбоям, как ошибки нехватки памяти, а в некоторых случаях и к потенциально более тонким проблемам.
Каждый рабочий поток поддерживает декею задач. Очередь реализована в виде массива, который рабочий поток динамически увеличивает до некоторого максимального размера. В 2.7.x очередь может увеличиваться до довольно больших размеров, и я видел, как это приводило к ошибкам нехватки памяти в сочетании с большим количеством параллельных потоков. Максимальный размер dequeue меньше в 2.8. Очередь также может переполняться.
Для решения этой проблемы необходимо контролировать количество задач, которые вы генерируете, что, вероятно, означает наличие какого-то координатора, как вы описали. Я сталкивался с этой проблемой, когда агенты, которые инициируют своего рода конвейер обработки данных, работают намного быстрее, чем те, которые находятся позже в этом конвейере. Чтобы контролировать процесс, я обычно заставляю агентов, находящихся позже в цепочке, пинговать агентов, находящихся раньше в цепочке, каждые X сообщений, а агентов, находящихся раньше в цепочке, останавливаться после X сообщений и ждать ответного пинга. Вы также можете сделать это с более централизованным координатором.