Fastest way to load bulk data in to HBase through a program

0 votes

I have a text document with a million lines in it. I need to parse the text file through some customized parsing methods by spending less time. Can anyone help me with the procedure to load data into the HBase in must faster way?

I am currently using MapReduce job excluding the Reduce part. I used FileInputFormat to read the text file so that each line is passed to the map method of my Mapper class. At this point, the line is parsed to form a Put object which is written to the context. Then, TableOutputFormat takes the Put object and inserts it to the table.

This solution yields an average insertion rate of 1,000 rows per second, which is less than what I expected. My HBase setup is in pseudo-distributed mode on a single server.

One interesting thing is that during insertion of 1,000,000 rows, 25 Mappers (tasks) are spawned but they run serially (one after another); is this normal?

My code is mentioned as follows:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}

Can someone help me out on this one?

Aug 8 in Big Data Hadoop by nitinrawat895
• 10,670 points
24 views

1 answer to this question.

0 votes

I've gone through a process that is probably very similar to yours of attempting to find an efficient way to load data from an MR into HBase. What I found to work is using HFileOutputFormat as the OutputFormatClass of the MR.

Below is the basis of my code that I have to generate the job and the Mapper map function which writes out the data. This was fast. We don't use it anymore, so I don't have numbers on hand, but it was around 2.5 million records in under a minute.

Here is the (stripped down) function I wrote to generate the job for my MapReduce process to put data into HBase

private Job createCubeJob(...) {
    //Build and Configure Job
    Job job = new Job(conf);
    job.setJobName(jobName);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
    job.setJarByClass(CubeBuilderDriver.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);

    TextInputFormat.setInputPaths(job, hiveOutputDir);
    HFileOutputFormat.setOutputPath(job, cubeOutputPath);

    Configuration hConf = HBaseConfiguration.create(conf);
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

    HTable hTable = new HTable(hConf, tableName);

    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    return job;
}

This is my map function from the HiveToHBaseMapper class (slightly edited ).

public void map(WritableComparable key, Writable val, Context context)
        throws IOException, InterruptedException {
    try{
        Configuration config = context.getConfiguration();
        String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
        String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
        String column = strs[COLUMN_INDEX];
        String Value = strs[VALUE_INDEX];
        String sKey = generateKey(strs, config);
        byte[] bKey = Bytes.toBytes(sKey);
        Put put = new Put(bKey);
        put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
                        ? Bytes.toBytes(Double.MIN_VALUE)
                        : Bytes.toBytes(value));

        ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
        context.write(ibKey, put);

        context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
    }
    catch(Exception e){
        context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);    
    }

}

I pretty sure this isn't going to be a Copy&Paste solution for you. Obviously the data I was working with here didn't need any custom processing (that was done in a MR job before this one). The main thing I want to provide out of this is the HFileOutputFormat.

answered Aug 8 by ravikiran
• 4,560 points

Related Questions In Big Data Hadoop

+1 vote
1 answer

Unable to load data in Hbase table from dataset

Try these steps (make necessary changes): First upload ...READ MORE

answered Dec 18, 2018 in Big Data Hadoop by Omkar
• 67,480 points
265 views
0 votes
1 answer

Is there any way to load data from MySql to HDFS?

The generic command i.e used to import ...READ MORE

answered Apr 10, 2018 in Big Data Hadoop by nitinrawat895
• 10,670 points
542 views
0 votes
1 answer

Is there a way to rebalance single Datanode in Hadoop.

Currently Hadoop does not automatically do this. ...READ MORE

answered Apr 15, 2018 in Big Data Hadoop by kurt_cobain
• 9,240 points
62 views
0 votes
1 answer
0 votes
1 answer
0 votes
1 answer

Moving files in Hadoop using the Java API?

I would recommend you to use FileSystem.rename(). ...READ MORE

answered Apr 15, 2018 in Big Data Hadoop by Shubham
• 13,290 points
800 views
0 votes
1 answer

Hadoop giving java.io.IOException, in mkdir Java code.

I am not sure about the issue. ...READ MORE

answered May 3, 2018 in Big Data Hadoop by Shubham
• 13,290 points
400 views
0 votes
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,670 points
2,723 views
0 votes
1 answer

How to Sqoop in a Java Program?

You can use the following sample code for ...READ MORE

answered Jul 22 in Big Data Hadoop by ravikiran
• 4,560 points
70 views
0 votes
2 answers

How to use scoop in a Java Program

There is a trick which worked out ...READ MORE

answered Sep 4 in Big Data Hadoop by ravikiran
• 4,560 points
37 views