Использование Java ThreadPool

Я пытаюсь записать многопоточный поисковый робот.

Мой основной класс записи имеет следующий код:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

URLCrawler выбирает указанный URL, анализирует ссылки извлечений HTML от него и планирует невидимые ссылки назад на границу.

Граница является очередью непроверенных URL. Проблема состоит в том, как записать получение () метод. Если очередь пуста, она должна ожидать, пока любые URLCrawlers не заканчиваются и затем попробовали еще раз. Это должно возвратить пустой указатель только, когда очередь пуста и нет никакого в настоящее время активного URLCrawler.

Моя первая идея состояла в том, чтобы использовать AtomicInteger для подсчета текущего количества работы URLCrawlers, и вспомогательный объект для notifyAll () / ожидают () вызовы. Каждый поисковый робот на запуске увеличивает количество текущей работы, URLCrawlers, и на выходе постепенно уменьшает его, и уведомьте объект, что это завершилось.

Но я считал, что уведомляют ()/notifyAll () и ожидают (), несколько устаревшие методы сделать коммуникацию потока.

Что я должен использовать в этом графике работы? Это подобно производителям M и потребителям N, вопрос состоит в том, как иметь дело с exaustion производителей.

7
задан Anton Kazennikov 4 August 2010 в 05:39
поделиться

4 ответа

Я думаю, что использование ожидания / уведомления в этом случае оправдано. Не могу придумать какой-либо простой способ сделать это с помощью j.u.c.
В классе позвоните Координатору:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

, затем

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)
2
ответ дан 6 December 2019 в 21:08
поделиться

Я не уверен, что понимаю ваш дизайн, но это может быть работа для Semaphore

3
ответ дан 6 December 2019 в 21:08
поделиться

Один из вариантов - сделать "frontier" блокирующей очередью, так что любой поток, пытающийся "получить" из нее, будет блокироваться. Как только любой другой URLCrawler поместит объекты в эту очередь, все остальные потоки будут автоматически уведомлены (при этом объект будет декеирован)

.
3
ответ дан 6 December 2019 в 21:08
поделиться

Я думаю, что основным строительным блоком для вашего варианта использования является «защелка», похожая на CountDownLatch, но в отличие от CountDownLatch, которая также позволяет увеличивать счетчик.

Интерфейс для такой защелки может иметь вид

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

Допустимые значения счетчиков - от 0 и выше. Метод await () позволит вам блокировать, пока счетчик не опустится до нуля.

Если у вас есть такая защелка, ваш вариант использования можно описать довольно легко. Я также подозреваю, что очередь (граница) может быть устранена в этом решении (исполнитель в любом случае предоставляет ее, поэтому она несколько избыточна). Я бы переписал вашу основную процедуру как

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

Ваш URLCrawler будет использовать защелку следующим образом:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

Что касается реализаций защелки, может быть несколько возможных реализаций, начиная от той, которая основана на wait () и notifyAll ( ), который использует Lock и Condition, в реализацию, использующую AbstractQueuedSynchronizer. Я думаю, что все эти реализации будут довольно простыми. Обратите внимание, что версия wait () - notifyAll () и версия Lock-Condition будут основаны на взаимном исключении, тогда как версия AQS будет использовать CAS (сравнение и замена) и, таким образом, может лучше масштабироваться в определенных ситуациях.

2
ответ дан 6 December 2019 в 21:08
поделиться