Я пытаюсь записать многопоточный поисковый робот.
Мой основной класс записи имеет следующий код:
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
URL url = frontier.get();
if(url == null)
return;
exec.execute(new URLCrawler(this, url));
}
URLCrawler выбирает указанный URL, анализирует ссылки извлечений HTML от него и планирует невидимые ссылки назад на границу.
Граница является очередью непроверенных URL. Проблема состоит в том, как записать получение () метод. Если очередь пуста, она должна ожидать, пока любые URLCrawlers не заканчиваются и затем попробовали еще раз. Это должно возвратить пустой указатель только, когда очередь пуста и нет никакого в настоящее время активного URLCrawler.
Моя первая идея состояла в том, чтобы использовать AtomicInteger для подсчета текущего количества работы URLCrawlers, и вспомогательный объект для notifyAll () / ожидают () вызовы. Каждый поисковый робот на запуске увеличивает количество текущей работы, URLCrawlers, и на выходе постепенно уменьшает его, и уведомьте объект, что это завершилось.
Но я считал, что уведомляют ()/notifyAll () и ожидают (), несколько устаревшие методы сделать коммуникацию потока.
Что я должен использовать в этом графике работы? Это подобно производителям M и потребителям N, вопрос состоит в том, как иметь дело с exaustion производителей.
Я думаю, что использование ожидания / уведомления в этом случае оправдано. Не могу придумать какой-либо простой способ сделать это с помощью j.u.c.
В классе позвоните Координатору:
private final int numOfCrawlers;
private int waiting;
public boolean shouldTryAgain(){
synchronized(this){
waiting++;
if(waiting>=numOfCrawlers){
//Everybody is waiting, terminate
return false;
}else{
wait();//spurious wake up is okay
//waked up for whatever reason. Try again
waiting--;
return true;
}
}
public void hasEnqueued(){
synchronized(this){
notifyAll();
}
}
, затем
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
URL url = frontier.get();
if(url == null){
if(!coordinator.shouldTryAgain()){
//all threads are waiting. No possibility of new jobs.
return;
}else{
//Possible that there are other jobs. Try again
continue;
}
}
exec.execute(new URLCrawler(this, url));
}//while(true)
Я не уверен, что понимаю ваш дизайн, но это может быть работа для Semaphore
Один из вариантов - сделать "frontier" блокирующей очередью, так что любой поток, пытающийся "получить" из нее, будет блокироваться. Как только любой другой URLCrawler поместит объекты в эту очередь, все остальные потоки будут автоматически уведомлены (при этом объект будет декеирован)
.Я думаю, что основным строительным блоком для вашего варианта использования является «защелка», похожая на CountDownLatch, но в отличие от CountDownLatch, которая также позволяет увеличивать счетчик.
Интерфейс для такой защелки может иметь вид
public interface Latch {
public void countDown();
public void countUp();
public void await() throws InterruptedException;
public int getCount();
}
Допустимые значения счетчиков - от 0 и выше. Метод await () позволит вам блокировать, пока счетчик не опустится до нуля.
Если у вас есть такая защелка, ваш вариант использования можно описать довольно легко. Я также подозреваю, что очередь (граница) может быть устранена в этом решении (исполнитель в любом случае предоставляет ее, поэтому она несколько избыточна). Я бы переписал вашу основную процедуру как
ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();
Ваш URLCrawler будет использовать защелку следующим образом:
public class URLCrawler implements Runnable {
private final Latch latch;
public URLCrawler(..., Latch l) {
...
latch = l;
latch.countUp(); // increment the count as early as possible
}
public void run() {
try {
List<URL> secondaryUrls = crawl();
for (URL url: secondaryUrls) {
// submit new tasks directly
executor.execute(new URLCrawler(..., latch));
}
} finally {
// as a last step, decrement the count
latch.countDown();
}
}
}
Что касается реализаций защелки, может быть несколько возможных реализаций, начиная от той, которая основана на wait () и notifyAll ( ), который использует Lock и Condition, в реализацию, использующую AbstractQueuedSynchronizer. Я думаю, что все эти реализации будут довольно простыми. Обратите внимание, что версия wait () - notifyAll () и версия Lock-Condition будут основаны на взаимном исключении, тогда как версия AQS будет использовать CAS (сравнение и замена) и, таким образом, может лучше масштабироваться в определенных ситуациях.