Нуждаюсь в помощи реализовывая этот алгоритм с картой MapReduce Hadoop

у меня есть алгоритм, который пройдет большой набор данных, читает некоторые текстовые файлы и ищут определенные условия в тех строках. Мне реализовали его в Java, но я не хотел к почтовому индексу так, чтобы не выглядело, что я ищу кого-то для реализации его для меня, но это верно, что я действительно нуждаюсь в большом количестве помощи!!! Это не было запланировано мой проект, но набор данных оказался огромным, таким образом, учитель сказал мне, что я должен сделать это как это.

РЕДАКТИРОВАНИЕ (я не сделал разъяснил меня предыдущая версия), набор данных, который я имею, находится на кластере Hadoop, и я должен сделать его реализацию MapReduce

Я читал о MapReduce и учил, что сначала делаю стандартную реализацию, и затем будет больше/меньше легче сделать это с mapreduce. Но не произошел, так как алгоритм довольно глуп, и ничто специальное, и карта не уменьшают..., я наклоняюсь, переносят мой ум вокруг этого.

Таким образом, вот вскоре псевдо код моего алгоритма

LIST termList   (there is method that creates this list from lucene index)
FOLDER topFolder

INPUT topFolder
IF it is folder and not empty
    list files (there are 30 sub folders inside)
    FOR EACH sub folder
        GET file "CheckedFile.txt"
        analyze(CheckedFile)
    ENDFOR
END IF


Method ANALYZE(CheckedFile)

read CheckedFile
WHILE CheckedFile has next line
    GET line
    FOR(loops through termList)
            GET third word from line
          IF third word = term from list
        append whole line to string buffer
    ENDIF
ENDFOR
END WHILE
OUTPUT string buffer to file

Кроме того, как Вы видите, каждый раз, когда "анализируют", называют, новый файл должен быть создан, я понял, что карта уменьшает, является трудным записать во многие выводы???

Я понимаю mapreduce интуицию, и мой пример кажется совершенно подходящим для mapreduce, но когда дело доходит до делают это, очевидно, я не знаю достаточно, и я застреваю!

Помогите.

6
задан Julia 7 June 2010 в 17:17
поделиться

2 ответа

Вы можете просто использовать пустой редуктор и разделить свое задание для запуска одного преобразователя для каждого файла. Каждый картограф создаст свой собственный выходной файл в вашей выходной папке.

3
ответ дан 17 December 2019 в 04:42
поделиться

Map Reduce легко реализуется с использованием некоторых хороших функций параллелизма Java 6, особенно Future, Callable и ExecutorService.

Я создал вызываемый объект, который будет анализировать файл указанным вами способом.

public class FileAnalyser implements Callable<String> {

  private Scanner scanner;
  private List<String> termList;

  public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
    this.termList = termList;
    scanner = new Scanner(new File(filename));
  }

  @Override
  public String call() throws Exception {
    StringBuilder buffer = new StringBuilder();
    while (scanner.hasNextLine()) {
      String line = scanner.nextLine();
      String[] tokens = line.split(" ");
      if ((tokens.length >= 3) && (inTermList(tokens[2])))
        buffer.append(line);
    }
    return buffer.toString();
  }

  private boolean inTermList(String term) {
    return termList.contains(term);
  }
}

Нам нужно создать новый вызываемый объект для каждого найденного файла и отправить его в службу-исполнитель. Результатом отправки является будущее, которое мы можем использовать позже, чтобы получить результат синтаксического анализа файла.

public class Analayser {

  private static final int THREAD_COUNT = 10;

  public static void main(String[] args) {

    //All callables will be submitted to this executor service
    //Play around with THREAD_COUNT for optimum performance
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

    //Store all futures in this list so we can refer to them easily
    List<Future<String>> futureList = new ArrayList<Future<String>>();

    //Some random term list, I don't know what you're using.
    List<String> termList = new ArrayList<String>();
    termList.add("terma");
    termList.add("termb");

    //For each file you find, create a new FileAnalyser callable and submit
    //this to the executor service. Add the future to the list
    //so we can check back on the result later
    for each filename in all files {
      try {
        Callable<String> worker = new FileAnalyser(filename, termList);
        Future<String> future = executor.submit(worker);
        futureList.add(future);
      }
      catch (FileNotFoundException fnfe) {
        //If the file doesn't exist at this point we can probably ignore,
        //but I'll leave that for you to decide.
        System.err.println("Unable to create future for " + filename);
        fnfe.printStackTrace(System.err);
      }
    }

    //You may want to wait at this point, until all threads have finished
    //You could maybe loop through each future until allDone() holds true
    //for each of them.

    //Loop over all finished futures and do something with the result
    //from each
    for (Future<String> current : futureList) {
      String result = current.get();
      //Do something with the result from this future
    }
  }
}

Мой пример здесь далеко не полный и далеко не эффективный. Я не рассматривал размер выборки, если она действительно огромна, вы можете продолжать перебирать futureList, удаляя завершенные элементы, что-то вроде:

while (futureList.size() > 0) {
      for (Future<String> current : futureList) {
        if (current.isDone()) {
          String result = current.get();
          //Do something with result
          futureList.remove(current);
          break; //We have modified the list during iteration, best break out of for-loop
        }
      }
}

В качестве альтернативы вы можете реализовать настройку типа производитель-потребитель, при которой производитель отправляет вызываемые объекты в служба исполнителя создает будущее, а потребитель берет результат будущего и отбрасывает будущее.

Это может потребовать, чтобы производящий и потребительский потоки были самими потоками и синхронизированным списком для добавления / удаления фьючерсов.

Если есть вопросы, задавайте.

2
ответ дан 17 December 2019 в 04:42
поделиться
Другие вопросы по тегам:

Похожие вопросы: