500 Рабочих потоков, какой пул потоков?

Я задаюсь вопросом - ли это лучший способ сделать это. У меня есть приблизительно 500 потоков, которые работают неограниченно долго, но Thread.sleep в течение минуты при выполнении один цикл обработки.

   ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
   for (int i = 0; i < list.size(); i++) {
      es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
   }

Код, который выполняется, действительно прост и в основном просто это

class aThread extends Thread {
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         //Lots of computation every minute
      }
   }
}

Мне действительно нужны отдельные потоки для каждой выполняющейся задачи, так изменение архитектуры не является опцией. Я пытался делать свой размер пула потоков равным Runtime.getRuntime () .availableProcessors (), который попытался выполнить все 500 потоков, но только позволить 8 (4xhyperthreading) их, выполняются. Другие потоки не сдали бы и позволили бы другим потокам иметь свою очередь. Я пытался включить ожидание (), и уведомьте (), но все еще никакая удача. Если бы у кого-либо есть простой пример или некоторые подсказки, я был бы благодарен!

Ну, дизайн возможно испорчен. Потоки реализуют Генетическое Программирование или GP, тип изучения алгоритма. Каждый поток анализирует усовершенствованные тенденции, делает прогнозы. Если поток когда-нибудь завершается, изучение потеряно. Тем не менее я надеялся, что сон () позволит мне совместно использовать некоторые ресурсы, в то время как один поток не "учится"

Таким образом, фактические требования

как я могу запланировать задачи, которые поддерживают состояние и работают каждые 2 минуты, но управляют, сколько выполняется когда-то.

8
задан Submerged 19 May 2010 в 09:41
поделиться

11 ответов

Почему бы не использовать ScheduledExecutorService для планирования выполнения каждой задачи раз в минуту, вместо того, чтобы оставлять все эти потоки простаивать целую минуту?

ScheduledExecutorService workers = 
  Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) { 
  workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}

Что вы имеете в виду, говоря "изменение архитектуры - не вариант"? Если вы имеете в виду, что вы не можете изменить задачу вообще (в частности, задачи должны зацикливаться, а не выполняться один раз, и вызов Thread.sleep() не может быть удален), то "хорошая производительность - не вариант".

10
ответ дан 3 November 2019 в 13:35
поделиться

Я не уверен, что ваш код семантически верен в том, как он использует пул потоков. ExecutionService создает потоки и управляет ими внутри, клиент должен просто предоставить экземпляр Runnable, чей метод run () будет выполняться в контексте одного из потоков в пуле. Вы можете проверить мой пример . Также обратите внимание, что каждый запущенный поток занимает ~ 10 МБ системной памяти для стека, а в Linux отображение Java-к-родным потокам выполняется 1 к 1.

3
ответ дан 3 November 2019 в 13:35
поделиться

Если ваши потоки не завершаются, это вина кода внутри потока, а не пула потоков. Для получения более подробной помощи вам необходимо опубликовать код, который выполняется.

Также, почему вы переводите каждый поток в спящий режим, когда он завершен; не лучше ли просто позволить ему завершиться?

Кроме того, я думаю, что вы неправильно используете пул потоков, имея количество потоков, равное количеству задач, которые вы хотите выполнить. Смысл пула потоков в том, чтобы ограничить количество используемых ресурсов; такой подход не лучше, чем вообще не использовать пул потоков.

Наконец, вам не нужно передавать экземпляры Thread вашему ExecutorService, только экземпляры Runnable. ExecutorService поддерживает свой собственный пул потоков, который циклически повторяется бесконечно, извлекая работу из внутренней очереди (работа - это Runnable, которые вы отправляете).

13
ответ дан 3 November 2019 в 13:35
поделиться

Вместо того, чтобы усыплять тред, вы должны позволить ему вернуться и использовать ThreadPoolexecutor для выполнения работы, публикуемой каждую минуту в вашу очередь работ.

2
ответ дан 3 November 2019 в 13:35
поделиться

Чтобы ответить на ваш вопрос, какой тип пула потоков?

Я опубликовал свои комментарии, но это действительно должно решить вашу проблему. У вас есть вычисление, которое может занять 2 секунды. У вас много задач (500), которые вы хотите выполнить как можно быстрее. Максимально возможная пропускная способность, которую вы можете достичь, при условии отсутствия ввода-вывода или сетевого трафика, достигается при Runtime.getRuntime (). AvailableProcessors () количестве потоков.

Если вы увеличите число до 500 потоков, то каждая задача будет выполняться в своем собственном потоке, но ОС будет периодически планировать поток, чтобы передать его другому потоку. Это 125 переключений контекста в любой момент. Каждое переключение контекста увеличивает время выполнения каждой задачи.

Общая картина заключается в том, что добавление большего количества потоков НЕ увеличивает пропускную способность, когда количество процессоров значительно превышает их.

Edit: быстрое обновление. Тебе здесь не нужно спать. Когда вы выполняете 500 задач с 8 процессорами, каждая задача завершается за 2 секунды, завершается, и поток, в котором она выполнялся, затем берет следующую задачу и завершает ее.

2
ответ дан 3 November 2019 в 13:35
поделиться

Это должно делать то, что вы хотите, но не то, о чем вы просили :-) Вы должны удалить Thread.sleep ()

ScheduledRunnable.java

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledRunnable
{
    public static void main(final String[] args)
    {
        final int numTasks = 10;
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < numTasks; i++)
        {
            ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
        }
    }

    private static class MyRunnable implements Runnable
    {
        private int id;
        private int numRuns;

        private MyRunnable(final int id)
        {
            this.id = id;
            this.numRuns = 0;
        }

        @Override
        public void run()
        {
            this.numRuns += 1;
            System.out.format("%d - %d\n", this.id, this.numRuns);
        }
    }
}

Это планирует Runnables каждые 10 СЕКУНД, чтобы показать поведение. Если вам действительно нужно подождать фиксированное время ПОСЛЕ завершения обработки , возможно, вам придется поиграть с тем, какой метод .scheduleXXX вам нужен. Я думаю, что fixedWait будет запускать его каждые N раз, независимо от времени выполнения.

1
ответ дан 3 November 2019 в 13:35
поделиться

Вам нужен семафор.

class AThread extends Thread {
   Semaphore sem;
   AThread(Semaphore sem) {
     this.sem = sem;
   }
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         sem.acquire();
         try {
           //Lots of computation every minute
         } finally {
           sem.release();
         }
      }
   }
}

При создании экземпляра AThreads вам нужно передать один и тот же экземпляр семафора:

Semaphore sem = new Semaphore(MAX_AVAILABLE, true);

Edit: Кто проголосовал против, можете объяснить, почему? Что-то не так в моем решении?

-1
ответ дан 3 November 2019 в 13:35
поделиться

Мне нужны отдельные потоки для каждой выполняемой задачи, поэтому изменение архитектуры не вариант.

Если, что это верно (например, вызов внешней функции блокировки), то создайте для них отдельные потоки и запустите их. Вы не можете создать пул потоков с ограниченным количеством потоков,как блокирующая функция в одном из потоков предотвратит попадание в него любого другого запускаемого потока и не получит много, создавая пул потоков с одним потоком на задачу.

Я попытался сделать размер threadPool равным Runtime.getRuntime().availableProcessors(), который попытался запустить все 500 потоков, но позволил выполнить только 8 (4xhyperthreading) из них.

Когда вы передаете создаваемые объекты Thread в пул потоков, он видит только то, что они реализуют Runnable. Поэтому он будет запускать каждый Runnable до завершения. Любой цикл, который останавливает возврат метода run(), не позволит выполнить следующую задачу, поставленную в очередь; eg:

public static void main (String...args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; ++i) {
        final int task = i;

        executor.execute(new Runnable () {
        private long lastRunTime = 0;
            @Override
            public void run () {

                for (int iteration = 0; iteration < 4; )
                {
                    if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
                    {
                        // do your work here
                        ++iteration;
                        System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());

                        this.lastRunTime = System.currentTimeMillis();
                    }
                    else
                    {
                        Thread.yield(); // otherwise, let other threads run
                    }
                }
            }
        });
    }

    executor.shutdown();
}

распечатывает:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...

показывает, что первые (размер пула потоков) задачи выполняются до завершения до того, как будут запланированы следующие задачи.

Что вам нужно сделать, так это создать задачи, которые будут выполняться некоторое время, а затем позволить другим задачам работать. То, как вы их структурируете, зависит от того, чего вы хотите достичь

  • , хотите ли вы, чтобы все задачи запускалися одновременно,Все ждут минуту, затем все запускаются в одно и то же время снова, или задачи не синхронизированы друг с другом
  • Действительно ли вы хотите, чтобы каждая задача запускалась с интервалом в одну минуту
  • , являются ли ваши задачи потенциально блокирующими или нет, и поэтому действительно требуют отдельных потоков
  • Какое поведение ожидается, если задача блокируется дольше ожидаемого окна для запуска
  • какое поведение ожидается, если блоки задач длиннее частоты повторения (блоки более одной минуты)

В зависимости от ответов на них для координации задач может использоваться некоторая комбинация ScheduledExecutorService, семафоров или мьютексов. Простейшим случаем являются неблокирующие, несинхронные задачи, и в этом случае используйте ScheduledExecutorService непосредственно для запуска ваших runnables один раз в минуту.

0
ответ дан 3 November 2019 в 13:35
поделиться

Вы, безусловно, можете найти некоторое улучшение пропускной способности, уменьшив количество потоков до того, что система может реально обработать. Вы готовы немного изменить дизайн резьбы? Планировщик избавится от нагрузки и поместит спящие в очередь вместо того, чтобы иметь сотни спящих потоков.

class RepeatingWorker implements Runnable {

private ExecutorService executor;
private Date lastRan;

//constructor takes your executor

@Override
public void run() {

  try {
    if (now > lastRan + ONE_MINUTE) {
      //do job
      lastRan = now;
    } else {
      return;
  } finally {
    executor.submit(this);
  }
}
}

Это сохраняет вашу основную семантику «задание повторяется бесконечно, но между выполнениями ожидается не менее одной минуты», но теперь вы можете настроить пул потоков на то, что машина может обрабатывать, а те, которые не работают, вместо этого помещаются в очередь. бездельничать в планировщике как спящие потоки. Если на самом деле никто ничего не делает, есть некоторое поведение ожидания, но я предполагаю из вашего сообщения, что вся цель приложения состоит в том, чтобы запускать эти потоки, и в настоящее время оно ругает ваши процессоры. Возможно, вам придется настроить это, если нужно освободить место для других вещей :)

-1
ответ дан 3 November 2019 в 13:35
поделиться

Можете ли вы переписать свой проект для использования некоторой среды параллелизма на основе агентов, например Akka ?

0
ответ дан 3 November 2019 в 13:35
поделиться

8 потоков - это максимум, который может выдержать ваша система, если больше, то вы будете тормозить себя переключением контекста.

Посмотрите эту статью http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4 Она даст вам общее представление о том, как это сделать.

1
ответ дан 3 November 2019 в 13:35
поделиться
Другие вопросы по тегам:

Похожие вопросы: