Я задаюсь вопросом - ли это лучший способ сделать это. У меня есть приблизительно 500 потоков, которые работают неограниченно долго, но Thread.sleep в течение минуты при выполнении один цикл обработки.
ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
for (int i = 0; i < list.size(); i++) {
es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
}
Код, который выполняется, действительно прост и в основном просто это
class aThread extends Thread {
public void run(){
while(true){
Thread.sleep(ONE_MINUTE);
//Lots of computation every minute
}
}
}
Мне действительно нужны отдельные потоки для каждой выполняющейся задачи, так изменение архитектуры не является опцией. Я пытался делать свой размер пула потоков равным Runtime.getRuntime () .availableProcessors (), который попытался выполнить все 500 потоков, но только позволить 8 (4xhyperthreading) их, выполняются. Другие потоки не сдали бы и позволили бы другим потокам иметь свою очередь. Я пытался включить ожидание (), и уведомьте (), но все еще никакая удача. Если бы у кого-либо есть простой пример или некоторые подсказки, я был бы благодарен!
Ну, дизайн возможно испорчен. Потоки реализуют Генетическое Программирование или GP, тип изучения алгоритма. Каждый поток анализирует усовершенствованные тенденции, делает прогнозы. Если поток когда-нибудь завершается, изучение потеряно. Тем не менее я надеялся, что сон () позволит мне совместно использовать некоторые ресурсы, в то время как один поток не "учится"
Таким образом, фактические требования
как я могу запланировать задачи, которые поддерживают состояние и работают каждые 2 минуты, но управляют, сколько выполняется когда-то.
Почему бы не использовать ScheduledExecutorService
для планирования выполнения каждой задачи раз в минуту, вместо того, чтобы оставлять все эти потоки простаивать целую минуту?
ScheduledExecutorService workers =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) {
workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}
Что вы имеете в виду, говоря "изменение архитектуры - не вариант"? Если вы имеете в виду, что вы не можете изменить задачу вообще (в частности, задачи должны зацикливаться, а не выполняться один раз, и вызов Thread.sleep()
не может быть удален), то "хорошая производительность - не вариант".
Я не уверен, что ваш код семантически верен в том, как он использует пул потоков. ExecutionService создает потоки и управляет ими внутри, клиент должен просто предоставить экземпляр Runnable, чей метод run () будет выполняться в контексте одного из потоков в пуле. Вы можете проверить мой пример . Также обратите внимание, что каждый запущенный поток занимает ~ 10 МБ системной памяти для стека, а в Linux отображение Java-к-родным потокам выполняется 1 к 1.
Если ваши потоки не завершаются, это вина кода внутри потока, а не пула потоков. Для получения более подробной помощи вам необходимо опубликовать код, который выполняется.
Также, почему вы переводите каждый поток в спящий режим, когда он завершен; не лучше ли просто позволить ему завершиться?
Кроме того, я думаю, что вы неправильно используете пул потоков, имея количество потоков, равное количеству задач, которые вы хотите выполнить. Смысл пула потоков в том, чтобы ограничить количество используемых ресурсов; такой подход не лучше, чем вообще не использовать пул потоков.
Наконец, вам не нужно передавать экземпляры Thread
вашему ExecutorService
, только экземпляры Runnable
. ExecutorService
поддерживает свой собственный пул потоков, который циклически повторяется бесконечно, извлекая работу из внутренней очереди (работа - это Runnable
, которые вы отправляете).
Вместо того, чтобы усыплять тред, вы должны позволить ему вернуться и использовать ThreadPoolexecutor для выполнения работы, публикуемой каждую минуту в вашу очередь работ.
Чтобы ответить на ваш вопрос, какой тип пула потоков?
Я опубликовал свои комментарии, но это действительно должно решить вашу проблему. У вас есть вычисление, которое может занять 2 секунды. У вас много задач (500), которые вы хотите выполнить как можно быстрее. Максимально возможная пропускная способность, которую вы можете достичь, при условии отсутствия ввода-вывода или сетевого трафика, достигается при Runtime.getRuntime (). AvailableProcessors ()
количестве потоков.
Если вы увеличите число до 500 потоков, то каждая задача будет выполняться в своем собственном потоке, но ОС будет периодически планировать поток, чтобы передать его другому потоку. Это 125 переключений контекста в любой момент. Каждое переключение контекста увеличивает время выполнения каждой задачи.
Общая картина заключается в том, что добавление большего количества потоков НЕ увеличивает пропускную способность, когда количество процессоров значительно превышает их.
Edit: быстрое обновление. Тебе здесь не нужно спать. Когда вы выполняете 500 задач с 8 процессорами, каждая задача завершается за 2 секунды, завершается, и поток, в котором она выполнялся, затем берет следующую задачу и завершает ее.
Это должно делать то, что вы хотите, но не то, о чем вы просили :-) Вы должны удалить Thread.sleep ()
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledRunnable
{
public static void main(final String[] args)
{
final int numTasks = 10;
final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (int i = 0; i < numTasks; i++)
{
ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
}
}
private static class MyRunnable implements Runnable
{
private int id;
private int numRuns;
private MyRunnable(final int id)
{
this.id = id;
this.numRuns = 0;
}
@Override
public void run()
{
this.numRuns += 1;
System.out.format("%d - %d\n", this.id, this.numRuns);
}
}
}
Это планирует Runnables
каждые 10 СЕКУНД, чтобы показать поведение.
Если вам действительно нужно подождать фиксированное время ПОСЛЕ завершения обработки , возможно, вам придется поиграть с тем, какой метод .scheduleXXX
вам нужен. Я думаю, что fixedWait будет запускать его каждые N раз, независимо от времени выполнения.
Вам нужен семафор.
class AThread extends Thread {
Semaphore sem;
AThread(Semaphore sem) {
this.sem = sem;
}
public void run(){
while(true){
Thread.sleep(ONE_MINUTE);
sem.acquire();
try {
//Lots of computation every minute
} finally {
sem.release();
}
}
}
}
При создании экземпляра AThreads вам нужно передать один и тот же экземпляр семафора:
Semaphore sem = new Semaphore(MAX_AVAILABLE, true);
Edit: Кто проголосовал против, можете объяснить, почему? Что-то не так в моем решении?
Мне нужны отдельные потоки для каждой выполняемой задачи, поэтому изменение архитектуры не вариант.
Если, что это верно (например, вызов внешней функции блокировки), то создайте для них отдельные потоки и запустите их. Вы не можете создать пул потоков с ограниченным количеством потоков,как блокирующая функция в одном из потоков предотвратит попадание в него любого другого запускаемого потока и не получит много, создавая пул потоков с одним потоком на задачу.
Я попытался сделать размер threadPool равным Runtime.getRuntime().availableProcessors(), который попытался запустить все 500 потоков, но позволил выполнить только 8 (4xhyperthreading) из них.
Когда вы передаете создаваемые объекты Thread в пул потоков, он видит только то, что они реализуют Runnable
. Поэтому он будет запускать каждый Runnable
до завершения. Любой цикл, который останавливает возврат метода run()
, не позволит выполнить следующую задачу, поставленную в очередь; eg:
public static void main (String...args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; ++i) {
final int task = i;
executor.execute(new Runnable () {
private long lastRunTime = 0;
@Override
public void run () {
for (int iteration = 0; iteration < 4; )
{
if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
{
// do your work here
++iteration;
System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());
this.lastRunTime = System.currentTimeMillis();
}
else
{
Thread.yield(); // otherwise, let other threads run
}
}
}
});
}
executor.shutdown();
}
распечатывает:
Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...
показывает, что первые (размер пула потоков) задачи выполняются до завершения до того, как будут запланированы следующие задачи.
Что вам нужно сделать, так это создать задачи, которые будут выполняться некоторое время, а затем позволить другим задачам работать. То, как вы их структурируете, зависит от того, чего вы хотите достичь
В зависимости от ответов на них для координации задач может использоваться некоторая комбинация ScheduledExecutorService, семафоров или мьютексов. Простейшим случаем являются неблокирующие, несинхронные задачи, и в этом случае используйте ScheduledExecutorService непосредственно для запуска ваших runnables один раз в минуту.
Вы, безусловно, можете найти некоторое улучшение пропускной способности, уменьшив количество потоков до того, что система может реально обработать. Вы готовы немного изменить дизайн резьбы? Планировщик избавится от нагрузки и поместит спящие в очередь вместо того, чтобы иметь сотни спящих потоков.
class RepeatingWorker implements Runnable {
private ExecutorService executor;
private Date lastRan;
//constructor takes your executor
@Override
public void run() {
try {
if (now > lastRan + ONE_MINUTE) {
//do job
lastRan = now;
} else {
return;
} finally {
executor.submit(this);
}
}
}
Это сохраняет вашу основную семантику «задание повторяется бесконечно, но между выполнениями ожидается не менее одной минуты», но теперь вы можете настроить пул потоков на то, что машина может обрабатывать, а те, которые не работают, вместо этого помещаются в очередь. бездельничать в планировщике как спящие потоки. Если на самом деле никто ничего не делает, есть некоторое поведение ожидания, но я предполагаю из вашего сообщения, что вся цель приложения состоит в том, чтобы запускать эти потоки, и в настоящее время оно ругает ваши процессоры. Возможно, вам придется настроить это, если нужно освободить место для других вещей :)
Можете ли вы переписать свой проект для использования некоторой среды параллелизма на основе агентов, например Akka ?
8 потоков - это максимум, который может выдержать ваша система, если больше, то вы будете тормозить себя переключением контекста.
Посмотрите эту статью http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4 Она даст вам общее представление о том, как это сделать.