Какова самая быстрая циклическая синхронизация в Java (ExecutorService по сравнению с CyclicBarrier по сравнению с X)?

Какая конструкция синхронизации Java, вероятно, обеспечит лучшую производительность для параллельного, повторяющегося сценария обработки с постоянным числом потоков как тот, обрисованный в общих чертах ниже? После экспериментирования самостоятельно некоторое время (использующий ExecutorService и CyclicBarrier) и несколько удивляемый результатами, я был бы благодарен за некоторый совет специалиста и возможно некоторые новые идеи. Существующие вопросы здесь, кажется, не фокусируются, прежде всего, на производительности, следовательно эта новая.Заранее спасибо!

Ядро приложения является простым повторяющимся алгоритмом обработки данных, параллелизированным к распространению вычислительная загрузка через 8 ядер на Mac Pro, рабочий OS X 10.6 и Java 1.6.0_07. Данные, которые будут обработаны, разделяются на 8 блоков, и каждый блок питается к Выполнимому, которое будет выполняться одним из постоянного числа потоков. Параллелизация алгоритма была довольно проста, и это функционально работает, как желаемый, но его производительность еще не, что я думаю, что это могло быть. Приложение, кажется, проводит много времени в синхронизации системных вызовов, поэтому после того, как некоторое профилирование интересно, выбрал ли я самый соответствующий механизм (механизмы) синхронизации.

Ключевое требование алгоритма - то, что он должен продолжиться шаг за шагом, таким образом, потоки должны синхронизировать в конце каждого этапа. Основной поток готовит работу (очень низко наверху), передает ее потокам, позволяет им работать над нею, затем продолжается, когда все потоки сделаны, перестраивает работу (снова очень низко наверху) и повторяет цикл. Машина выделена этой задаче, Сборка "мусора" минимизирована при помощи на пулы потоков предварительно выделенных объектов, и количество потоков может быть зафиксировано (никакие входящие запросы и т.п., всего один поток на ядро процессора).

V1 - ExecutorService

Моя первая реализация использовала ExecutorService с 8 рабочими потоками. Программа создает 8 задач, содержащих работу, и затем позволяет им работать над нею, примерно как это:

// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
    // package data into 8 work items
    ...

    // create one Callable task per work item
    ...

    // submit the Callables to the worker threads
    executorService.invokeAll( taskList );
}

Это работает хорошо функционально (это делает то, что это должно), и для очень больших объектов работы действительно все 8 центральных процессоров становятся очень загруженными, так, как алгоритм обработки, как ожидали бы, позволит (некоторые объекты работы будут заканчиваться быстрее, чем другие, затем бездействовать). Однако, поскольку объекты работы становятся меньшими (и это действительно не находится под контролем программы), пользовательская загрузка ЦП уменьшается существенно:

blocksize | system | user | cycles/sec
256k        1.8%    85%     1.30
64k         2.5%    77%     5.6
16k         4%      64%     22.5
4096        8%      56%     86
1024       13%      38%     227
256        17%      19%     420
64         19%      17%     948
16         19%      13%     1626

Легенда: - размер блока = размер объекта работы (= вычислительные шаги) - система = системная нагрузка, как показано в Мониторе Действия OS X (красная панель) - пользователь = пользовательская нагрузка, как показано в Мониторе Действия OS X (зеленая панель) - циклы/секунда = повторения через основной цикл с условием продолжения, больше лучше

Основной проблемной областью здесь является высокий процент времени, проведенного в системе, которая, кажется, управляется вызовами синхронизации потока. Как ожидалось, для меньших объектов работы, ExecutorService.invokeAll () потребует относительно большего усилия синхронизировать потоки по сравнению с объемом работы, выполняемым в каждом потоке. Но так как ExecutorService более универсален, чем это должно было бы быть для этого варианта использования (это может поставить задачи в очередь для потоков, если бы существует больше задач, чем ядра), я, хотя, возможно, была бы более минимизированная конструкция синхронизации.

V2 - CyclicBarrier

Следующая реализация использовала CyclicBarrier для синхронизации потоков прежде, чем получить работу и после завершения его, примерно следующим образом:

main() {
    // create the barrier
    barrier = new CyclicBarrier( 8 + 1 );

    // create Runable for thread, tell it about the barrier
    Runnable task = new WorkerThreadRunnable( barrier );

    // start the threads
    for( int i = 0; i < 8; i++ )
    {
        // create one thread per core
        new Thread( task ).start();
    }

    while( ... ) {
        // tell threads about the work
        ...

        // N threads + this will call await(), then system proceeds
        barrier.await();

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }

    public void run()
    {
        while( true )
        {
            // wait for work
            barrier.await();

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Снова, это работает хорошо функционально (это делает то, что это должно), и для очень больших объектов работы действительно все 8 центральных процессоров становятся очень загруженными, как прежде. Однако, поскольку объекты работы становятся меньшими, загрузка все еще уменьшается существенно:

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.7%     78%    6.1
16k         5.5%     52%    25
4096        9%       29%    64
1024       11%       15%    117
256        12%        8%    169
64         12%        6.5%  285
16         12%        6%    377

Для больших объектов работы синхронизация незначительна, и производительность идентична V1. Но неожиданно, результаты (узкоспециализированного) CyclicBarrier кажутся НАМНОГО ХУЖЕ, чем результаты для (универсального) ExecutorService: пропускная способность (циклы/секунда) только о 1/4-м из V1. Предварительное заключение состояло бы в том, что даже при том, что это, кажется, рекламируемый идеальный вариант использования для CyclicBarrier, он работает намного хуже, чем универсальный ExecutorService.

V3 - Ожидайте/Уведомляйте + CyclicBarrier

Это казалось стоящим попытки заменить первый циклический барьер, ждут () с простым, ожидают/уведомляют механизм:

main() {
    // create the barrier
    // create Runable for thread, tell it about the barrier
    // start the threads

    while( ... ) {
        // tell threads about the work
        // for each: workerThreadRunnable.setWorkItem( ... );

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;
    @NotNull volatile private Callable<Integer> workItem;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        synchronized( this )
        {
            workItem = callable;
            notify();
        }
    }

    public void run()
    {
        while( true )
        {
            // wait for work
            while( true )
            {
                synchronized( this )
                {
                    if( workItem != NO_WORK ) break;

                    try
                    {
                        wait();
                    }
                    catch( InterruptedException e ) { e.printStackTrace(); }
                }
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Снова, это работает хорошо функционально (это делает то, что это должно).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.4%     80%    6.3
16k         4.6%     60%    30.1
4096        8.6%     41%    98.5
1024       12%       23%    202
256        14%       11.6%  299
64         14%       10.0%  518
16         14.8%      8.7%  679

Пропускная способность для небольших объектов работы еще намного хуже, чем пропускная способность ExecutorService, но о 2x пропускная способность CyclicBarrier. Устранение того CyclicBarrier устраняет половину разрыва.

V4 - Активное ожидание вместо ожидает/уведомляет

Так как это приложение является основным, работающим на системе и ядрах, неактивных так или иначе, если они не заняты объектом работы, почему бы не попробовать активное ожидание объектов работы в каждом потоке, даже если это вращает ЦП напрасно. Код рабочего потока изменяется следующим образом:

class WorkerThreadRunnable implements Runnable {
    // as before

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        workItem = callable;
    }

    public void run()
    {
        while( true )
        {
            // busy-wait for work
            while( true )
            {
                if( workItem != NO_WORK ) break;
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Также работы хорошо функционально (это делает то, что это должно).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.2%     81%    6.3
16k         4.2%     62%     33
4096        7.5%     40%    107
1024       10.4%     23%    210
256        12.0%    12.0%   310
64         11.9%    10.2%   550
16         12.2%     8.6%   741

Для небольших объектов работы это увеличивается, пропускная способность на дальнейшие 10% по CyclicBarrier + ожидают/уведомляют вариант, который является весьма значительным. Но это - все еще много более низкой пропускной способности, чем V1 с ExecutorService.

V5-?

Таким образом, каков лучший механизм синхронизации для такого (по-видимому, весьма распространенный) проблема? Я являюсь утомленным от записи моего собственного синхронизирующего механизма для завершенной замены ExecutorService (предполагающий, что это слишком универсально и должно быть что-то, что может все еще быть вынуто для создания этого более эффективным). Это не моя область знаний, и я обеспокоен, что провел бы много времени, отладив его (так как я даже не уверен, что мои ожидать/уведомлять и варианты активного ожидания корректны) для неопределенного усиления.

Любой совет значительно ценился бы.

15
задан krlmlr 27 September 2012 в 22:26
поделиться

4 ответа

Обновление: V5 - ожидание занятости во всех потоках (пока кажется оптимальным)

Поскольку все ядра выделены для этой задачи, казалось, что стоит попробовать просто исключить все сложные конструкции синхронизации и выполнять ожидание занятости в каждом из них. точка синхронизации во всех потоках. Оказывается, это с большим отрывом превосходит все остальные подходы.

Настройка выглядит следующим образом: начните с V4 выше (CyclicBarrier + Busy Wait). Замените CyclicBarrier на AtomicInteger, который основной поток сбрасывает в ноль каждый цикл. Каждый рабочий поток Runnable, который завершает свою работу, увеличивает атомарное целое число на единицу. Основной поток занят ждет:

while( true ) {
    // busy-wait for threads to complete their work
    if( atomicInt.get() >= workerThreadCount ) break;
}

Вместо 8 запускаются только 7 рабочих потоков (поскольку все потоки, включая основной поток, теперь почти полностью загружают ядро). Результаты следующие:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.36
64k         1.0%     98%       6.8
16k         1.0%     98%      44.6
4096        1.0%     98%     354
1024        1.0%     98%    1189
256         1.0%     98%    3222
64          1.5%     98%    8333
16          2.0%     98%   16129

Использование ожидания / уведомления в рабочих потоках снижает пропускную способность примерно до 1/3 от этого решения.

1
ответ дан 1 December 2019 в 04:40
поделиться

Обновление: V6 - ожидание занятости, основной поток также работает

Очевидное улучшение V5 (занятое ожидание работы в 7 рабочих потоках, занятое ожидание завершения в основном потоке) снова разделило работу на 7+ 1 и позволить основному потоку обрабатывать одну часть одновременно с другими рабочими потоками (вместо простого ожидания-занятости), а затем для последующего ожидания-занятости завершения рабочих элементов всех других потоков. Это позволит использовать 8-й процессор (в примере с 8-ядерной конфигурацией) и добавить его циклы в доступный пул вычислительных ресурсов.

Это было действительно просто реализовать. И результаты действительно снова немного лучше:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.39
64k         1.0%     98%       6.8
16k         1.0%     98%      50.4
4096        1.0%     98%     372
1024        1.0%     98%    1317
256         1.0%     98%    3546
64          1.5%     98%    9091
16          2.0%     98%   16949

Так что, похоже, это лучшее решение на данный момент.

3
ответ дан 1 December 2019 в 04:40
поделиться

Мне также интересно, можете ли вы попробовать более 8 потоков. Если ваш процессор поддерживает HyperThreading, тогда (по крайней мере, теоретически) вы можете сжать 2 потока на ядро ​​и посмотреть, что из этого получится.

1
ответ дан 1 December 2019 в 04:40
поделиться

Обновление: V7 - ожидание занятости, которое возвращается к ожиданию / уведомлению

После некоторой игры с V6 оказывается, что занятые ожидания немного скрывают реальные горячие точки приложения при профилировании. Кроме того, вентилятор в системе продолжает работать с перегрузкой, даже если никакие рабочие элементы не обрабатываются. Таким образом, дальнейшее улучшение заключалось в том, что занятое ожидание рабочих элементов в течение фиксированного периода времени (скажем, около 2 миллисекунд), а затем возвращение к более «приятной» комбинации wait () / notify (). Рабочие потоки просто публикуют свой текущий режим ожидания в основном потоке через атомарное логическое значение, которое указывает, заняты ли они ожиданием (и, следовательно, им просто нужно установить рабочий элемент), или они ожидают вызова notify (), потому что они находятся в ждать().

Еще одно усовершенствование, которое оказалось довольно простым, заключалось в том, что потоки, завершившие свой основной рабочий элемент, могли многократно вызывать обратный вызов, предоставляемый клиентом, в то время как они ожидают, пока другие потоки завершат свои основные рабочие элементы. Таким образом, время ожидания (которое происходит из-за того, что потоки должны получать немного разные рабочие нагрузки) не нужно полностью терять для приложения.

Мне все еще очень интересно услышать от других пользователей, которые сталкивались с подобным вариантом использования.

1
ответ дан 1 December 2019 в 04:40
поделиться
Другие вопросы по тегам:

Похожие вопросы: