Каков самый быстрый способ программной массовой загрузки данных в HBase?

У меня есть простой текстовый файл, возможно, с миллионами строк, который требует специального синтаксического анализа, и я хочу как можно быстрее загрузить его в таблицу HBase (используя Hadoop или клиент HBase Java).

Мое текущее решение основано на задании MapReduce без части Reduce. Я использую FileInputFormat для чтения текстового файла, чтобы каждая строка передавалась методу map моего класса Mapper . На этом этапе строка анализируется для формирования объекта Put , который записывается в контекст . Затем TableOutputFormat берет объект Put и вставляет его в таблицу.

Это решение дает среднюю скорость вставки 1000 строк в секунду, что меньше, чем я ожидал. Моя установка HBase находится в псевдораспределенном режиме на одном сервере.

Интересно то, что при вставке 1 000 000 строк порождаются 25 Mapper (задач), но они запускаются последовательно (один за другим); это нормально?

Вот код моего текущего решения:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}
17
задан Cihan Keser 6 January 2012 в 06:49
поделиться