У меня есть Поток Java как следующее:
public class MyThread extends Thread {
MyService service;
String id;
public MyThread(String id) {
this.id = node;
}
public void run() {
User user = service.getUser(id)
}
}
У меня есть приблизительно 300 идентификаторов, и каждые несколько секунд - я разжигаю потоки для создания призыва к каждому идентификатору. Например.
for(String id: ids) {
MyThread thread = new MyThread(id);
thread.start();
}
Теперь, я хотел бы собраться, результаты каждого распараллеливает, и сделайте пакетная вставка к базе данных, вместо того, чтобы делать 300 баз данных вставляет каждые 2 секунды.
Какая-либо идея, как я могу выполнить это?
Если вы хотите собрать все результаты перед обновлением базы данных, вы можете использовать метод invokeAll
. Это позаботится о бухгалтерском учете, который потребуется, если вы отправляете задачи по одной, как предлагает daveb .
private static final ExecutorService workers = Executors.newCachedThreadPool();
...
Collection<Callable<User>> tasks = new ArrayList<Callable<User>>();
for (final String id : ids) {
tasks.add(new Callable<User>()
{
public User call()
throws Exception
{
return svc.getUser(id);
}
});
}
/* invokeAll blocks until all service requests complete,
* or a max of 10 seconds. */
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
for (Future<User> f : results) {
User user = f.get();
/* Add user to batch update. */
...
}
/* Commit batch. */
...
Канонический подход заключается в использовании Callable
и ExecutorService
. submit
ting Callable
в ExecutorService
возвращает (typesafe) Future
, из которого вы можете получить
результат .
class TaskAsCallable implements Callable<Result> {
@Override
public Result call() {
return a new Result() // this is where the work is done.
}
}
ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready
В вашем случае вы, вероятно, захотите использовать invokeAll
, который возвращает List
of Futures
, или создайте этот список самостоятельно, добавляя задачи к исполнителю. . Чтобы собрать результаты, просто вызовите get
для каждого из них.
Сохраните результат в своем объекте. Когда он завершится, поместите его в синхронизированную коллекцию (на ум приходит синхронизированная очередь).
Если вы хотите собрать свои результаты для отправки, возьмите все из очереди и прочтите свои результаты с объектов. Вы даже можете указать каждому объекту, как «публиковать» свои результаты в базе данных, таким образом можно отправлять разные классы и обрабатывать их одним и тем же крошечным элегантным циклом.
В JDK есть множество инструментов, которые помогут с этим, но это действительно легко, если вы начнете думать о своем потоке как о настоящем объекте, а не просто о кучке чуши вокруг метода «run». Как только вы начнете думать об объектах, программирование станет намного проще и приятнее.
Вы можете создать очередь или список, который вы передаете создаваемым вами потокам, потоки добавляют свой результат в список, который очищается потребителем, который выполняет пакетная вставка.
public class TopClass {
List<User> users = new ArrayList<User>();
void addUser(User user) {
synchronized(users) {
users.add(user);
}
}
void store() throws SQLException {
//storing code goes here
}
class MyThread extends Thread {
MyService service;
String id;
public MyThread(String id) {
this.id = node;
}
public void run() {
User user = service.getUser(id)
addUser(user);
}
}
}
Самый простой подход - передать каждому потоку (по одному объекту на поток) объект, который впоследствии будет содержать результат. Главный поток должен хранить ссылку на каждый объект результата. Когда все потоки объединятся, вы сможете использовать результаты.
Это очень распространенная практика. В C . Я стараюсь думать об этом так, будто вы хотите лгать себе в путь «Я не использую goto
». Размышляя об этом, не было бы ничего плохого в goto
, используемом аналогично. Фактически это также снизит уровень отступа.
Это сказал, хотя, я заметил, очень часто это делать.. в то время как
петли, как правило, растут. И затем они получают , если
s и else
s внутри, делая код на самом деле не очень читаемым, не говоря уже о тестируемом.
Эти делают.., в то время как
обычно предназначены для очистки . Всеми возможными способами я бы предпочел использовать RAI и вернуть ранний из короткой функции. С другой стороны, C не предоставляет вам столько удобств, сколько C++ , делая делать.. в то время как
один из лучших подходов для очистки.
Канонический подход заключается в использовании Callable
и ExecutingService
. отправка
в Callable
службе ExecisingService
возвращает (typesafe) Future
, из которого можно получить
результат.
class TaskAsCallable implements Callable<Result> {
@Override
public Result call() {
return a new Result() // this is where the work is done.
}
}
ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready
В вашем случае вы, вероятно, хотите использовать invokeAll
, который возвращает Список
из Фьючерсов
, или создать этот список самостоятельно при добавлении задач исполнителю. Чтобы собрать результаты, просто вызовите get
для каждого.
Вы должны сохранить результат в нечто вроде синглтона. Это должно быть правильно синхронизировано.
EDIT : Я знаю, что это не лучший совет, так как не рекомендуется обрабатывать необработанные потоки
. Но с учетом вопроса это сработает, не так ли? Я не могу быть поднят, но почему я голосую вниз?
Вы можете создать класс, который расширяет Observable. Тогда ваш поток может вызывать метод в классе Observable, который будет уведомлять все классы, зарегистрированные в этом наблюдателе, вызывая Observable.notifyObservers(Object).
Наблюдающий класс реализует Observer и регистрирует себя в Observable. Затем вы реализуете метод update(Observable, Object), который будет вызываться при вызове Observable.notifyObservers(Object).