Какая конструкция синхронизации Java, вероятно, обеспечит лучшую производительность для параллельного, повторяющегося сценария обработки с постоянным числом потоков как тот, обрисованный в общих чертах ниже? После экспериментирования самостоятельно некоторое время (использующий ExecutorService и CyclicBarrier) и несколько удивляемый результатами, я был бы благодарен за некоторый совет специалиста и возможно некоторые новые идеи. Существующие вопросы здесь, кажется, не фокусируются, прежде всего, на производительности, следовательно эта новая.Заранее спасибо!
Ядро приложения является простым повторяющимся алгоритмом обработки данных, параллелизированным к распространению вычислительная загрузка через 8 ядер на Mac Pro, рабочий OS X 10.6 и Java 1.6.0_07. Данные, которые будут обработаны, разделяются на 8 блоков, и каждый блок питается к Выполнимому, которое будет выполняться одним из постоянного числа потоков. Параллелизация алгоритма была довольно проста, и это функционально работает, как желаемый, но его производительность еще не, что я думаю, что это могло быть. Приложение, кажется, проводит много времени в синхронизации системных вызовов, поэтому после того, как некоторое профилирование интересно, выбрал ли я самый соответствующий механизм (механизмы) синхронизации.
Ключевое требование алгоритма - то, что он должен продолжиться шаг за шагом, таким образом, потоки должны синхронизировать в конце каждого этапа. Основной поток готовит работу (очень низко наверху), передает ее потокам, позволяет им работать над нею, затем продолжается, когда все потоки сделаны, перестраивает работу (снова очень низко наверху) и повторяет цикл. Машина выделена этой задаче, Сборка "мусора" минимизирована при помощи на пулы потоков предварительно выделенных объектов, и количество потоков может быть зафиксировано (никакие входящие запросы и т.п., всего один поток на ядро процессора).
Моя первая реализация использовала ExecutorService с 8 рабочими потоками. Программа создает 8 задач, содержащих работу, и затем позволяет им работать над нею, примерно как это:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
Это работает хорошо функционально (это делает то, что это должно), и для очень больших объектов работы действительно все 8 центральных процессоров становятся очень загруженными, так, как алгоритм обработки, как ожидали бы, позволит (некоторые объекты работы будут заканчиваться быстрее, чем другие, затем бездействовать). Однако, поскольку объекты работы становятся меньшими (и это действительно не находится под контролем программы), пользовательская загрузка ЦП уменьшается существенно:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
Легенда: - размер блока = размер объекта работы (= вычислительные шаги) - система = системная нагрузка, как показано в Мониторе Действия OS X (красная панель) - пользователь = пользовательская нагрузка, как показано в Мониторе Действия OS X (зеленая панель) - циклы/секунда = повторения через основной цикл с условием продолжения, больше лучше
Основной проблемной областью здесь является высокий процент времени, проведенного в системе, которая, кажется, управляется вызовами синхронизации потока. Как ожидалось, для меньших объектов работы, ExecutorService.invokeAll () потребует относительно большего усилия синхронизировать потоки по сравнению с объемом работы, выполняемым в каждом потоке. Но так как ExecutorService более универсален, чем это должно было бы быть для этого варианта использования (это может поставить задачи в очередь для потоков, если бы существует больше задач, чем ядра), я, хотя, возможно, была бы более минимизированная конструкция синхронизации.
Следующая реализация использовала CyclicBarrier для синхронизации потоков прежде, чем получить работу и после завершения его, примерно следующим образом:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Снова, это работает хорошо функционально (это делает то, что это должно), и для очень больших объектов работы действительно все 8 центральных процессоров становятся очень загруженными, как прежде. Однако, поскольку объекты работы становятся меньшими, загрузка все еще уменьшается существенно:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
Для больших объектов работы синхронизация незначительна, и производительность идентична V1. Но неожиданно, результаты (узкоспециализированного) CyclicBarrier кажутся НАМНОГО ХУЖЕ, чем результаты для (универсального) ExecutorService: пропускная способность (циклы/секунда) только о 1/4-м из V1. Предварительное заключение состояло бы в том, что даже при том, что это, кажется, рекламируемый идеальный вариант использования для CyclicBarrier, он работает намного хуже, чем универсальный ExecutorService.
Это казалось стоящим попытки заменить первый циклический барьер, ждут () с простым, ожидают/уведомляют механизм:
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Снова, это работает хорошо функционально (это делает то, что это должно).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
Пропускная способность для небольших объектов работы еще намного хуже, чем пропускная способность ExecutorService, но о 2x пропускная способность CyclicBarrier. Устранение того CyclicBarrier устраняет половину разрыва.
Так как это приложение является основным, работающим на системе и ядрах, неактивных так или иначе, если они не заняты объектом работы, почему бы не попробовать активное ожидание объектов работы в каждом потоке, даже если это вращает ЦП напрасно. Код рабочего потока изменяется следующим образом:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Также работы хорошо функционально (это делает то, что это должно).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
Для небольших объектов работы это увеличивается, пропускная способность на дальнейшие 10% по CyclicBarrier + ожидают/уведомляют вариант, который является весьма значительным. Но это - все еще много более низкой пропускной способности, чем V1 с ExecutorService.
Таким образом, каков лучший механизм синхронизации для такого (по-видимому, весьма распространенный) проблема? Я являюсь утомленным от записи моего собственного синхронизирующего механизма для завершенной замены ExecutorService (предполагающий, что это слишком универсально и должно быть что-то, что может все еще быть вынуто для создания этого более эффективным). Это не моя область знаний, и я обеспокоен, что провел бы много времени, отладив его (так как я даже не уверен, что мои ожидать/уведомлять и варианты активного ожидания корректны) для неопределенного усиления.
Любой совет значительно ценился бы.
Обновление: V5 - ожидание занятости во всех потоках (пока кажется оптимальным)
Поскольку все ядра выделены для этой задачи, казалось, что стоит попробовать просто исключить все сложные конструкции синхронизации и выполнять ожидание занятости в каждом из них. точка синхронизации во всех потоках. Оказывается, это с большим отрывом превосходит все остальные подходы.
Настройка выглядит следующим образом: начните с V4 выше (CyclicBarrier + Busy Wait). Замените CyclicBarrier на AtomicInteger, который основной поток сбрасывает в ноль каждый цикл. Каждый рабочий поток Runnable, который завершает свою работу, увеличивает атомарное целое число на единицу. Основной поток занят ждет:
while( true ) {
// busy-wait for threads to complete their work
if( atomicInt.get() >= workerThreadCount ) break;
}
Вместо 8 запускаются только 7 рабочих потоков (поскольку все потоки, включая основной поток, теперь почти полностью загружают ядро). Результаты следующие:
blocksize | system | user | cycles/sec
256k 1.0% 98% 1.36
64k 1.0% 98% 6.8
16k 1.0% 98% 44.6
4096 1.0% 98% 354
1024 1.0% 98% 1189
256 1.0% 98% 3222
64 1.5% 98% 8333
16 2.0% 98% 16129
Использование ожидания / уведомления в рабочих потоках снижает пропускную способность примерно до 1/3 от этого решения.
Обновление: V6 - ожидание занятости, основной поток также работает
Очевидное улучшение V5 (занятое ожидание работы в 7 рабочих потоках, занятое ожидание завершения в основном потоке) снова разделило работу на 7+ 1 и позволить основному потоку обрабатывать одну часть одновременно с другими рабочими потоками (вместо простого ожидания-занятости), а затем для последующего ожидания-занятости завершения рабочих элементов всех других потоков. Это позволит использовать 8-й процессор (в примере с 8-ядерной конфигурацией) и добавить его циклы в доступный пул вычислительных ресурсов.
Это было действительно просто реализовать. И результаты действительно снова немного лучше:
blocksize | system | user | cycles/sec
256k 1.0% 98% 1.39
64k 1.0% 98% 6.8
16k 1.0% 98% 50.4
4096 1.0% 98% 372
1024 1.0% 98% 1317
256 1.0% 98% 3546
64 1.5% 98% 9091
16 2.0% 98% 16949
Так что, похоже, это лучшее решение на данный момент.
Мне также интересно, можете ли вы попробовать более 8 потоков. Если ваш процессор поддерживает HyperThreading, тогда (по крайней мере, теоретически) вы можете сжать 2 потока на ядро и посмотреть, что из этого получится.
Обновление: V7 - ожидание занятости, которое возвращается к ожиданию / уведомлению
После некоторой игры с V6 оказывается, что занятые ожидания немного скрывают реальные горячие точки приложения при профилировании. Кроме того, вентилятор в системе продолжает работать с перегрузкой, даже если никакие рабочие элементы не обрабатываются. Таким образом, дальнейшее улучшение заключалось в том, что занятое ожидание рабочих элементов в течение фиксированного периода времени (скажем, около 2 миллисекунд), а затем возвращение к более «приятной» комбинации wait () / notify (). Рабочие потоки просто публикуют свой текущий режим ожидания в основном потоке через атомарное логическое значение, которое указывает, заняты ли они ожиданием (и, следовательно, им просто нужно установить рабочий элемент), или они ожидают вызова notify (), потому что они находятся в ждать().
Еще одно усовершенствование, которое оказалось довольно простым, заключалось в том, что потоки, завершившие свой основной рабочий элемент, могли многократно вызывать обратный вызов, предоставляемый клиентом, в то время как они ожидают, пока другие потоки завершат свои основные рабочие элементы. Таким образом, время ожидания (которое происходит из-за того, что потоки должны получать немного разные рабочие нагрузки) не нужно полностью терять для приложения.
Мне все еще очень интересно услышать от других пользователей, которые сталкивались с подобным вариантом использования.