Возвращаемые значения от Потоков Java

У меня есть Поток Java как следующее:

   public class MyThread extends Thread {
        MyService service;
        String id;
        public MyThread(String id) {
            this.id = node;
        }
        public void run() {
            User user = service.getUser(id)
        }
    }

У меня есть приблизительно 300 идентификаторов, и каждые несколько секунд - я разжигаю потоки для создания призыва к каждому идентификатору. Например.

for(String id: ids) {
    MyThread thread = new MyThread(id);
    thread.start();
}

Теперь, я хотел бы собраться, результаты каждого распараллеливает, и сделайте пакетная вставка к базе данных, вместо того, чтобы делать 300 баз данных вставляет каждые 2 секунды.

Какая-либо идея, как я могу выполнить это?

18
задан Langali 22 February 2010 в 21:37
поделиться

8 ответов

Если вы хотите собрать все результаты перед обновлением базы данных, вы можете использовать метод invokeAll . Это позаботится о бухгалтерском учете, который потребуется, если вы отправляете задачи по одной, как предлагает daveb .

private static final ExecutorService workers = Executors.newCachedThreadPool();

...

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>();
for (final String id : ids) {
  tasks.add(new Callable<User>()
  {

    public User call()
      throws Exception
    {
      return svc.getUser(id);
    }

  });
}
/* invokeAll blocks until all service requests complete, 
 * or a max of 10 seconds. */
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
for (Future<User> f : results) {
  User user = f.get();
  /* Add user to batch update. */
  ...
}
/* Commit batch. */
...
19
ответ дан 30 November 2019 в 05:53
поделиться

Канонический подход заключается в использовании Callable и ExecutorService . submit ting Callable в ExecutorService возвращает (typesafe) Future , из которого вы можете получить результат .

class TaskAsCallable implements Callable<Result> {
    @Override
    public Result call() {
        return a new Result() // this is where the work is done.
    }
}

ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready

В вашем случае вы, вероятно, захотите использовать invokeAll , который возвращает List of Futures , или создайте этот список самостоятельно, добавляя задачи к исполнителю. . Чтобы собрать результаты, просто вызовите get для каждого из них.

35
ответ дан 30 November 2019 в 05:53
поделиться

Сохраните результат в своем объекте. Когда он завершится, поместите его в синхронизированную коллекцию (на ум приходит синхронизированная очередь).

Если вы хотите собрать свои результаты для отправки, возьмите все из очереди и прочтите свои результаты с объектов. Вы даже можете указать каждому объекту, как «публиковать» свои результаты в базе данных, таким образом можно отправлять разные классы и обрабатывать их одним и тем же крошечным элегантным циклом.

В JDK есть множество инструментов, которые помогут с этим, но это действительно легко, если вы начнете думать о своем потоке как о настоящем объекте, а не просто о кучке чуши вокруг метода «run». Как только вы начнете думать об объектах, программирование станет намного проще и приятнее.

4
ответ дан 30 November 2019 в 05:53
поделиться

Вы можете создать очередь или список, который вы передаете создаваемым вами потокам, потоки добавляют свой результат в список, который очищается потребителем, который выполняет пакетная вставка.

1
ответ дан 30 November 2019 в 05:53
поделиться
public class TopClass {
     List<User> users = new ArrayList<User>();
     void addUser(User user) {
         synchronized(users) {
             users.add(user);
         }
     }
     void store() throws SQLException {
        //storing code goes here
     }
     class MyThread extends Thread {
            MyService service;
            String id;
            public MyThread(String id) {
                this.id = node;
            }
            public void run() {
                User user = service.getUser(id)
                addUser(user);
            }
        }
}
1
ответ дан 30 November 2019 в 05:53
поделиться

Самый простой подход - передать каждому потоку (по одному объекту на поток) объект, который впоследствии будет содержать результат. Главный поток должен хранить ссылку на каждый объект результата. Когда все потоки объединятся, вы сможете использовать результаты.

1
ответ дан 30 November 2019 в 05:53
поделиться

Это очень распространенная практика. В C . Я стараюсь думать об этом так, будто вы хотите лгать себе в путь «Я не использую goto ». Размышляя об этом, не было бы ничего плохого в goto , используемом аналогично. Фактически это также снизит уровень отступа.

Это сказал, хотя, я заметил, очень часто это делать.. в то время как петли, как правило, растут. И затем они получают , если s и else s внутри, делая код на самом деле не очень читаемым, не говоря уже о тестируемом.

Эти делают.., в то время как обычно предназначены для очистки . Всеми возможными способами я бы предпочел использовать RAI и вернуть ранний из короткой функции. С другой стороны, C не предоставляет вам столько удобств, сколько C++ , делая делать.. в то время как один из лучших подходов для очистки.

-121--687433-

Канонический подход заключается в использовании Callable и ExecutingService . отправка в Callable службе ExecisingService возвращает (typesafe) Future , из которого можно получить результат.

class TaskAsCallable implements Callable<Result> {
    @Override
    public Result call() {
        return a new Result() // this is where the work is done.
    }
}

ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready

В вашем случае вы, вероятно, хотите использовать invokeAll , который возвращает Список из Фьючерсов , или создать этот список самостоятельно при добавлении задач исполнителю. Чтобы собрать результаты, просто вызовите get для каждого.

-121--2233342-

Вы должны сохранить результат в нечто вроде синглтона. Это должно быть правильно синхронизировано.

EDIT : Я знаю, что это не лучший совет, так как не рекомендуется обрабатывать необработанные потоки . Но с учетом вопроса это сработает, не так ли? Я не могу быть поднят, но почему я голосую вниз?

1
ответ дан 30 November 2019 в 05:53
поделиться

Вы можете создать класс, который расширяет Observable. Тогда ваш поток может вызывать метод в классе Observable, который будет уведомлять все классы, зарегистрированные в этом наблюдателе, вызывая Observable.notifyObservers(Object).

Наблюдающий класс реализует Observer и регистрирует себя в Observable. Затем вы реализуете метод update(Observable, Object), который будет вызываться при вызове Observable.notifyObservers(Object).

1
ответ дан 30 November 2019 в 05:53
поделиться