у меня есть алгоритм, который пройдет большой набор данных, читает некоторые текстовые файлы и ищут определенные условия в тех строках. Мне реализовали его в Java, но я не хотел к почтовому индексу так, чтобы не выглядело, что я ищу кого-то для реализации его для меня, но это верно, что я действительно нуждаюсь в большом количестве помощи!!! Это не было запланировано мой проект, но набор данных оказался огромным, таким образом, учитель сказал мне, что я должен сделать это как это.
РЕДАКТИРОВАНИЕ (я не сделал разъяснил меня предыдущая версия), набор данных, который я имею, находится на кластере Hadoop, и я должен сделать его реализацию MapReduce
Я читал о MapReduce и учил, что сначала делаю стандартную реализацию, и затем будет больше/меньше легче сделать это с mapreduce. Но не произошел, так как алгоритм довольно глуп, и ничто специальное, и карта не уменьшают..., я наклоняюсь, переносят мой ум вокруг этого.
Таким образом, вот вскоре псевдо код моего алгоритма
LIST termList (there is method that creates this list from lucene index)
FOLDER topFolder
INPUT topFolder
IF it is folder and not empty
list files (there are 30 sub folders inside)
FOR EACH sub folder
GET file "CheckedFile.txt"
analyze(CheckedFile)
ENDFOR
END IF
Method ANALYZE(CheckedFile)
read CheckedFile
WHILE CheckedFile has next line
GET line
FOR(loops through termList)
GET third word from line
IF third word = term from list
append whole line to string buffer
ENDIF
ENDFOR
END WHILE
OUTPUT string buffer to file
Кроме того, как Вы видите, каждый раз, когда "анализируют", называют, новый файл должен быть создан, я понял, что карта уменьшает, является трудным записать во многие выводы???
Я понимаю mapreduce интуицию, и мой пример кажется совершенно подходящим для mapreduce, но когда дело доходит до делают это, очевидно, я не знаю достаточно, и я застреваю!
Помогите.
Вы можете просто использовать пустой редуктор и разделить свое задание для запуска одного преобразователя для каждого файла. Каждый картограф создаст свой собственный выходной файл в вашей выходной папке.
Map Reduce легко реализуется с использованием некоторых хороших функций параллелизма Java 6, особенно Future, Callable и ExecutorService.
Я создал вызываемый объект, который будет анализировать файл указанным вами способом.
public class FileAnalyser implements Callable<String> {
private Scanner scanner;
private List<String> termList;
public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
this.termList = termList;
scanner = new Scanner(new File(filename));
}
@Override
public String call() throws Exception {
StringBuilder buffer = new StringBuilder();
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
String[] tokens = line.split(" ");
if ((tokens.length >= 3) && (inTermList(tokens[2])))
buffer.append(line);
}
return buffer.toString();
}
private boolean inTermList(String term) {
return termList.contains(term);
}
}
Нам нужно создать новый вызываемый объект для каждого найденного файла и отправить его в службу-исполнитель. Результатом отправки является будущее, которое мы можем использовать позже, чтобы получить результат синтаксического анализа файла.
public class Analayser {
private static final int THREAD_COUNT = 10;
public static void main(String[] args) {
//All callables will be submitted to this executor service
//Play around with THREAD_COUNT for optimum performance
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
//Store all futures in this list so we can refer to them easily
List<Future<String>> futureList = new ArrayList<Future<String>>();
//Some random term list, I don't know what you're using.
List<String> termList = new ArrayList<String>();
termList.add("terma");
termList.add("termb");
//For each file you find, create a new FileAnalyser callable and submit
//this to the executor service. Add the future to the list
//so we can check back on the result later
for each filename in all files {
try {
Callable<String> worker = new FileAnalyser(filename, termList);
Future<String> future = executor.submit(worker);
futureList.add(future);
}
catch (FileNotFoundException fnfe) {
//If the file doesn't exist at this point we can probably ignore,
//but I'll leave that for you to decide.
System.err.println("Unable to create future for " + filename);
fnfe.printStackTrace(System.err);
}
}
//You may want to wait at this point, until all threads have finished
//You could maybe loop through each future until allDone() holds true
//for each of them.
//Loop over all finished futures and do something with the result
//from each
for (Future<String> current : futureList) {
String result = current.get();
//Do something with the result from this future
}
}
}
Мой пример здесь далеко не полный и далеко не эффективный. Я не рассматривал размер выборки, если она действительно огромна, вы можете продолжать перебирать futureList, удаляя завершенные элементы, что-то вроде:
while (futureList.size() > 0) {
for (Future<String> current : futureList) {
if (current.isDone()) {
String result = current.get();
//Do something with result
futureList.remove(current);
break; //We have modified the list during iteration, best break out of for-loop
}
}
}
В качестве альтернативы вы можете реализовать настройку типа производитель-потребитель, при которой производитель отправляет вызываемые объекты в служба исполнителя создает будущее, а потребитель берет результат будущего и отбрасывает будущее.
Это может потребовать, чтобы производящий и потребительский потоки были самими потоками и синхронизированным списком для добавления / удаления фьючерсов.
Если есть вопросы, задавайте.