Как напрямую отправить вывод преобразователя -в редюсер другого преобразователя -без сохранения вывода в hdfs

Проблема решена В конце концов проверьте мое решение внизу


Недавно я пытаюсь запустить пример рекомендателя в chaper6 (, листинге 6.1 ~6.4 )от Mahout в действии. Но я столкнулся с проблемой, и я погуглил, но не могу найти решение.

Вот проблема :У меня есть пара картографов -редуктор

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

Редюсер выводит идентификатор пользователя и пользовательский вектор, и это выглядит так :98955 {590 :1.0 22 :1.0 9059 :1.0 3 :1.0 2 :1.0 1 :1.0}

Затем я хочу использовать другую пару преобразователей -для обрабатывать эти данные

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

Когда я пытаюсь запустить задание, возникает ошибка приведения:

org.apache.hadoop.io.Text не может быть приведен к org.apache.mahout.math.VectorWritable

первый преобразователь -редюсер записывает вывод в hdfs,и второй преобразователь -пытается прочитать вывод, преобразователь может преобразовать 98955 в VarLongWritable, но не может преобразовать {590 :1.0 22 :1.0 9059 :1.0 3 :1.0 2 :1.0 1 :1.0} для VectorWritable, поэтому мне интересно, есть ли способ сделать первый преобразователь -редуктор напрямую отправляет вывод второй паре, тогда нет необходимости выполнять преобразование данных. Я просмотрел Hadoop в действии и полное руководство по hadoop :, похоже, такого способа сделать это не существует, какие-нибудь предложения?


Проблема решена

Решение :С помощью SequenceFileOutputFormat мы можем вывести и сохранить результат сокращения первого рабочего процесса MapReduce в DFS, после чего второй рабочий процесс MapReduce сможет прочитать временный файл как ввод с использованием класса SequenceFileInputFormat в качестве параметра при создании преобразователя. Поскольку вектор будет сохранен в двоичном файле последовательности, который имеет определенный формат, SequenceFileInputFormat может прочитать его и преобразовать обратно в векторный формат.

Вот пример кода.:

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             

    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();


    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

Если у вас возникли проблемы с этим, пожалуйста, свяжитесь со мной

6
задан YoungHobbit 26 December 2015 в 18:53
поделиться