Стратегии параллельных конвейеров в Java

Рассмотрите следующий сценарий оболочки:

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 

Это имеет три процесса, работающие параллельно, чтобы распаковать поток, изменить его и повторно сжать его. Выполнение time Я вижу, что мое пользовательское время приблизительно дважды больше чем это моего реального времени, которое указывает, что программа эффективно работает параллельно.

Я попытался создать ту же программу в Java путем размещения каждой задачи в свой собственный поток. К сожалению, многопоточная программа Java только приблизительно на 30% быстрее, чем единственная потоковая версия для вышеупомянутого образца. Я попытался использовать и Обменник и ConcurrentLinkedQueue. ConcurrentLinkedQueue связал причины очереди много конкуренции, хотя все три потока обычно заставляются напряженно трудиться. Обменник имеет более низкую конкуренцию, но более сложен, и не, кажется, сохраняют самого медленного рабочего рабочими 100% времени.

Я пытаюсь выяснить чистое решение для Java этой проблемы, не смотря на один из кода байта ткацкие платформы или JNI базирующийся MPI.

Большая часть исследования параллелизма и API интересуются алгоритмами делить-и-побеждать, давая каждую работу узла, которая является ортогональной и не зависит от предшествующих вычислений. Другой подход к параллелизму является конвейерным подходом, где каждый рабочий делает некоторую работу и передает данные на следующего рабочего.

Я не пытаюсь найти самый эффективный путь к sed gzip'd файлом, а скорее я смотрю на то, как эффективно сломать задачи в конвейере для сокращения времени выполнения до той из самой медленной задачи.

Текущие синхронизации для файла строки на 10 м следующие:

Testing via shell

real    0m31.848s
user    0m58.946s
sys     0m1.694s

Testing SerialTest

real    0m59.997s
user    0m59.263s
sys     0m1.121s

Testing ParallelExchangerTest

real    0m41.573s
user    1m3.436s
sys     0m1.830s

Testing ConcurrentQueueTest

real    0m44.626s
user    1m24.231s
sys     0m10.856s

Я предлагаю щедрость для 10%-го улучшения Java, как измеряется к реальному времени в четырех базовых системах со строками на 10 м данных тестирования. Текущие источники доступны на Битоприемнике.

25
задан brianegge 19 January 2010 в 03:28
поделиться

5 ответов

Я индивидуально проверил время, необходимое, кажется, что чтение занимает менее 10% времени, а чтение плюс обработка занимает менее 30% всего времени. Поэтому я взял ParallelexchangerTertest (лучший исполнитель в вашем коде) и изменил его Просто провести 2 нить, первый поток делает чтение и заменить, а вторая поток делает запись.

Вот цифры для сравнения (на моей машине Intel Dual Core (не Core2) работают Ubuntu с 1 ГБ ОЗУ)

> Тестирование через Shell

Real 0M41.601S

Пользователь 0M58.604S

SYS 0M1.032S

> Тестирование ParalleLexchangertest

Real 1M55.424S

Пользователь 2M14.160S

SYS 0M4.768S

> ParAlLELEXCHANGERTESTMOD (2 нить)

REAL 1M35.524S

Пользователь 1M55.319S

SYS 0M3.580S

Я знал, что обработка строки занимает больше времени, поэтому я заменил Line.Repalce С MOSTER.REPLACEALL, я получил эту цифры

> ParallelexchangertestMod_regex (2 нить)

Real 1M12.781s

Пользователь 1M33.382S

SYS 0M2.916S

Теперь я сделал шаг вперед, вместо читать одну строчку за раз, я читаю CHAR [] Буфер различных размеров и вовремя (с поиском / заменой Regexp,) Я получил эти цифры

> Тестирование ParalleLexchangertestMod_regex_buff (100 байтов Обработка за раз)

Real 1m13.804s

Пользователь 1M32.494S

SYS 0M2.676S

> Тестирование ParallelexchangertestMod_regex_buff (500 байтов обработки в Время)

REAL 1M6.286S

Пользователь 1M29.334S

SYS 0M2.324S

> Тестирование ParallelexchangertestMod_regex_buff (800 байтов обработки по времени)

REAL 1M12.309S

Пользователь 1M33.910

SYS 0M2.476S

выглядит как 500 байтов, оптимально для размера данных.

Я развесил и имею копию моих изменений здесь

https://bitbucket.org/chinmaya/java-concurrent_Response/

6
ответ дан 28 November 2019 в 21:46
поделиться
- [1125592-

Учитывая, что вы не говорите, как вы измеряете прошедшее время, я предполагаю, что вы используете что-то вроде:

time java org.egge.concurrent.SerialTest < in.gz > out.gz
time java org.egge.concurrent.ConcurrentQueueTest < in.gz > out.gz

проблема с этим, что вы измеряете две вещи здесь:

  1. Как долго JVM принимает, чтобы начать, а
  2. Как долго требуется программа для запуска.

Вы можете изменить только второй с изменениями кода. Используя цифры, которые вы дали:

Testing SerialTest
real    0m6.736s
user    0m6.924s
sys     0m0.245s

Testing ParallelExchangerTest
real    0m4.967s
user    0m7.491s
sys     0m0.850s

, если мы предполагаем, что запуск JVM занимает три секунды, то «время выполнения программы» составляет 3,7 и 1,9 секунды соответственно, это в значительной степени ускорение 100%. Я решительно предположим, что вы используете большой набор набора данных для тестирования, чтобы вы могли минимизировать влияние запуска JVM в результате вашего времени.

Редактировать : На основании ваших ответов на этот вопрос вы вполне можете страдать от конфликта блокировки. Лучший способ решить, что в Java, вероятно, использовать трубопроводные читатели и писатели, прочитанные с труб, байт за раз, и заменить любые @ ' символов в входном потоке с помощью «_AT _» в выходном потоке. Вы можете страдать от того факта, что каждая строка отсканирована три раза, и любая замена требует, чтобы новый объект был создан, и строка заканчивается снова скопированным. Надеюсь, это поможет ...

3
ответ дан 28 November 2019 в 21:46
поделиться

На Java тоже можно использовать трубы. Они реализованы в виде потоков, подробнее см. PipedInputStream и PipedOutputStream.

Чтобы предотвратить блокировку, я бы порекомендовал установить размер пропперной трубы.

3
ответ дан 28 November 2019 в 21:46
поделиться

Уменьшение количества читателей и объектов дают мне более чем на 10% лучшую производительность.

Но производительность Java.util.concurrent еще немного разочаровывает.

ConcularentQueetest:

private static class Reader implements Runnable {

@Override
  public void run() {
   final char buf[] = new char[8192];
   try {

    int len;
    while ((len = reader.read(buf)) != -1) {
     pipe.put(new String(buf,0,len));
    }
    pipe.put(POISON);

   } catch (IOException e) {
    throw new RuntimeException(e);
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }
0
ответ дан 28 November 2019 в 21:46
поделиться

Во-первых, процесс будет таким же быстрым, как и самый медленный фрагмент. Если разбивка по времени выглядит так:

  • gunzip: 1 секунда
  • sed: 5 секунд
  • gzip: 1 секунда

, при многопоточности вы будете работать в в лучшем случае 5 секунд вместо 7.

Во-вторых, вместо использования очередей, которые вы используете, вместо этого попытайтесь воспроизвести функциональность того, что вы копируете, и используйте PipedInputStream и PipedOutputStream для объединить процессы.

Изменить: Есть несколько способов обработки связанных задач с помощью утилит параллелизма Java. Разделите его на нитки. Сначала создайте общий базовый класс:

public interface Worker {
  public run(InputStream in, OutputStream out);
}

Этот интерфейс представляет некоторую произвольную работу, которая обрабатывает ввод и генерирует вывод. Соедините их вместе, и у вас будет конвейер. Вы также можете абстрагироваться от шаблона. Для этого нам понадобится класс:

public class UnitOfWork implements Runnable {
  private final InputStream in;
  private final OutputStream out;
  private final Worker worker;

  public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
    if (in == null) {
      throw new NullPointerException("in is null");
    }
    if (out == null) {
      throw new NullPointerException("out is null");
    }
    if (worker == null) {
      throw new NullPointerException("worker is null");
    }
    this.in = in;
    this.out = out;
    this.worker = worker;
  }

  public final void run() {
    worker.run(in, out);
  }
}

Так, например, Unzip PART:

public class Unzip implements Worker {
  protected void run(InputStream in, OutputStream out) {
    ...
  }
}

и так далее для Sed и Zip . Что затем связывает это воедино, так это следующее:

public static void pipe(InputStream in, OutputStream out, Worker... workers) {
  if (workers.length == 0) {
    throw new IllegalArgumentException("no workers");
  }
  OutputStream last = null;
  List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
  PipedOutputStream last = null;
  for (int i=0; i<workers.length-2; i++) {
    PipedOutputStream out = new PipedOutputStream();
    work.add(new UnitOfWork(
      last == null ? in, new PipedInputStream(last), out, workers[i]);
    last = out;
  }
  work.add(new UnitOfWork(new PipedInputStream(last),
    out, workers[workers.length-1);
  ExecutorService exec = Executors.newFixedThreadPool(work.size());
  for (UnitOfWork w : work) {
    exec.submit(w);
  }
  exec.shutdown();
  try {
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  } catch (InterruptedExxception e) {
    // do whatever
  }
}

Я не уверен, что вы можете сделать что-то лучше, чем это, и для каждой работы нужно писать минимальный код. Тогда ваш код станет:

public static processFile(String inputName, String outputName) {
  pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
    new Zip(), new Sed(), new Unzip());
}
14
ответ дан 28 November 2019 в 21:46
поделиться