Java: распараллеливание быстрой сортировки с помощью многопоточности

Я экспериментирую с алгоритмами распараллеливания в Java. Я начал с сортировки слиянием, и опубликовал мою попытку в этом вопросе . Моя пересмотренная попытка в приведенном ниже коде, где я сейчас пытаюсь распараллелить быструю сортировку.

Есть ли какие-либо ошибки новичка в моей многопоточной реализации или подходе к этой проблеме? Если нет, разве я не должен ожидать увеличения скорости более чем на 32% между последовательным и параллельным алгоритмом на дуэльном ядре (см. Время внизу)?

Вот алгоритм многопоточности:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Вот как я его запускаю:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

Я проверил это с помощью Arrays.sort и аналогичного последовательного алгоритма быстрой сортировки. Вот результаты синхронизации на ноутбуке Intel Duel-Core Dell, в секундах:

Элементы: 500 000, не следует ли ожидать более 32% увеличения скорости между последовательным и распараллеленным алгоритмом на дуэльном ядре (см. время внизу)?

Вот алгоритм многопоточности:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Вот как я его запускаю:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

Я проверил это с помощью Arrays.sort и аналогичного последовательного алгоритма быстрой сортировки. Вот результаты синхронизации на ноутбуке Intel Duel-Core Dell, в секундах:

Элементы: 500 000, не следует ли ожидать более 32% увеличения скорости между последовательным и распараллеленным алгоритмом на дуэльном ядре (см. время внизу)?

Вот алгоритм многопоточности:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Вот как я его запускаю:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

Я проверил это с помощью Arrays.sort и аналогичного последовательного алгоритма быстрой сортировки. Вот результаты синхронизации на ноутбуке Intel Duel-Core Dell, в секундах:

Элементы: 500 000, последовательный: 0,068592, с резьбой: 0,046871, Arrays.sort: 0,079677

Элементы: 1 000 000, последовательный: 0,14416, с резьбой: 0,095492, Arrays.sort: 0,167155

Элементы: 2 000 000, последовательный: 0.301666, с резьбой: 0,205719, Arrays.sort: 0,350982

Элементы: 4 000 000, последовательный: 0,623291, с резьбой: 0,424119, Arrays.sort: 0,712698

Элементы: 8 000 000, последовательный: 1.279374, с резьбой: 0,859363, Arrays.sort: 1.487671

Каждое число, указанное выше, представляет собой среднее время 100 тестов, отбрасывая 3 самых низких и 3 самых высоких случая. Я использовал Random.nextInt (Integer.MAX_VALUE) для генерации массива для каждого теста, который инициализировался один раз каждые 10 тестов с одним и тем же начальным числом. Каждый тест состоял из синхронизации данного алгоритма с System.nanoTime. Я округлил до шести десятичных знаков после усреднения. И, очевидно, я проверил, работает ли каждый вид .

Как вы можете видеть, скорость между последовательными и многопоточными случаями увеличивается примерно на 32% в каждом наборе тестов. Как я уже говорил выше, не следует ли ожидать большего?

6
задан Community 23 May 2017 в 12:06
поделиться

2 ответа

Если сделать numThreads статическим, это может привести к проблемам, весьма вероятно, что в какой-то момент у вас будет работать больше, чем MAX_THREADS.

Вероятно, причина, по которой вы не получаете полного удвоения производительности, заключается в том, что ваша быстрая сортировка не может быть полностью распараллелена. Обратите внимание, что первый вызов quicksort сделает проход по всему массиву в начальном потоке, прежде чем он начнет действительно работать параллельно. Кроме того, при распараллеливании алгоритма возникают накладные расходы в виде переключения контекста и переходов между режимами при передаче в отдельные потоки.

Взгляните на фреймворк Fork/Join, возможно, эта проблема будет решена именно там.

Пара замечаний по реализации. Реализуйте Runnable, а не расширяйте Thread. Расширение Thread должно использоваться только тогда, когда вы создаете новую версию класса Thread. Когда вы просто хотите сделать некоторую работу, которая будет выполняться параллельно, лучше использовать Runnable. При создании Runnable вы также можете расширить другой класс, что дает вам больше гибкости в OO-проектировании. Используйте пул потоков, который ограничен количеством потоков, доступных в системе. Также не используйте numThreads для принятия решения о том, следует ли форкнуть новый поток или нет. Вы можете рассчитать это заранее. Используйте минимальный размер раздела, который представляет собой размер всего массива, деленный на количество доступных процессоров. Что-то вроде:

public class ThreadedQuick implements Runnable {

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

    final int[] my_array;
    final int start, end;

    private final int minParitionSize;

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) {
        this.minParitionSize = minParitionSize;
        this.my_array = array;
        this.start = start;
        this.end = end;
    }

    public void run() {
        quicksort(my_array, start, end);
    }

    public void quicksort(int[] array, int start, int end) {
        int len = end - start + 1;

        if (len <= 1)
            return;

        int pivot_index = medianOfThree(array, start, end);
        int pivotValue = array[pivot_index];

        swap(array, pivot_index, end);

        int storeIndex = start;
        for (int i = start; i < end; i++) {
            if (array[i] <= pivotValue) {
                swap(array, i, storeIndex);
                storeIndex++;
            }
        }

        swap(array, storeIndex, end);

        if (len > minParitionSize) {

            ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1);
            Future<?> future = executor.submit(quick);
            quicksort(array, storeIndex + 1, end);

            try {
                future.get(1000, TimeUnit.SECONDS);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } else {
            quicksort(array, start, storeIndex - 1);
            quicksort(array, storeIndex + 1, end);
        }
    }    
}

Вы можете запустить сортировку, сделав:

ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1);
quick.run();

Это запустит сортировку в том же потоке, что позволит избежать ненужного переключения потоков при запуске.

Оговорка: не уверен, что приведенная выше реализация будет действительно быстрее, так как я не проводил сравнительных тестов.

10
ответ дан 8 December 2019 в 17:17
поделиться

Пара комментариев, если я правильно понимаю ваш код:

  1. Я не вижу блокировки вокруг объекта numthreads, хотя к нему можно получить доступ через несколько потоков. Возможно, вам стоит сделать его AtomicInteger.

  2. Используйте пул потоков и упорядочивайте задачи, то есть один вызов быстрой сортировки, чтобы воспользоваться преимуществами пула потоков. Используйте фьючерсы.

Ваш текущий метод разделения вещей так, как вы это делаете, может оставить меньшее деление с нитью, а большее деление без нити. Другими словами, он не отдает приоритет более крупным сегментам с их собственными потоками.

1
ответ дан 8 December 2019 в 17:17
поделиться
Другие вопросы по тегам:

Похожие вопросы: