Что самый легкий путь состоит в том, чтобы параллелизировать задачу в Java?

Скажите, что у меня есть задача как:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

Что состоит в том, чтобы параллелизировать самый легкий путь, каждый вычисляет () (предположение, что они уже parallelizable)?

Мне не нужен ответ, который соответствует строго коду выше, просто общий ответ. Но если Вам нужно больше информации: моими задачами является связанный IO, и это для веб-приложения Spring, и задачи будут выполняемыми в Запросе HTTP.

40
задан blacktide 18 December 2018 в 13:02
поделиться

7 ответов

Рекомендую взглянуть на ExecutorService .

В частности, что-то вроде этого:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Обратите внимание, что использование newCachedThreadPool может быть плохим, если объекты составляют большой список. Кэшированный пул потоков может создать поток для каждой задачи! Вы можете захотеть использовать newFixedThreadPool(n), где n - это что-то разумное (например, количество имеющихся у вас ядер, предполагая, что compute() привязано к процессору).

Вот полный код, который на самом деле выполняется:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}
60
ответ дан 27 November 2019 в 01:42
поделиться

Параллельный массив Fork/Join является одним из вариантов

.
0
ответ дан 27 November 2019 в 01:42
поделиться

Можно просто создать несколько потоков и получить результат.

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

EDIT : Я думаю, что другие решения круче.

0
ответ дан 27 November 2019 в 01:42
поделиться

Для более подробного ответа прочитайте Java Concurrency in Practice и используйте java.util.concurrent .

2
ответ дан 27 November 2019 в 01:42
поделиться

Вы можете использовать ThreadPoolExecutor . Вот пример кода: http://programmingexamples.wikidot.com/threadpoolexecutor (слишком длинный, чтобы привести его сюда)

.
0
ответ дан 27 November 2019 в 01:42
поделиться

Вот кое-что, что я использую в своих собственных проектах:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

, что печатает чуть больше 2000 на моей двухъядерной коробке.

1
ответ дан 27 November 2019 в 01:42
поделиться

Я хотел упомянуть класс исполнителя. Вот пример кода, который вы поместили бы в класс исполнителя.

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Затем, чтобы его использовать, вы бы вызывали класс исполнителя для его заполнения и выполнения.

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();
0
ответ дан 27 November 2019 в 01:42
поделиться
Другие вопросы по тегам:

Похожие вопросы: