Fastest way to load bulk data in to HBase through a program

0 votes

I have a text document with a million lines in it. I need to parse the text file through some customized parsing methods by spending less time. Can anyone help me with the procedure to load data into the HBase in must faster way?

I am currently using MapReduce job excluding the Reduce part. I used FileInputFormat to read the text file so that each line is passed to the map method of my Mapper class. At this point, the line is parsed to form a Put object which is written to the context. Then, TableOutputFormat takes the Put object and inserts it to the table.

This solution yields an average insertion rate of 1,000 rows per second, which is less than what I expected. My HBase setup is in pseudo-distributed mode on a single server.

One interesting thing is that during insertion of 1,000,000 rows, 25 Mappers (tasks) are spawned but they run serially (one after another); is this normal?

My code is mentioned as follows:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}

Can someone help me out on this one?

Aug 8, 2019 in Big Data Hadoop by nitinrawat895
• 11,380 points
1,606 views

1 answer to this question.

0 votes

I've gone through a process that is probably very similar to yours of attempting to find an efficient way to load data from an MR into HBase. What I found to work is using HFileOutputFormat as the OutputFormatClass of the MR.

Below is the basis of my code that I have to generate the job and the Mapper map function which writes out the data. This was fast. We don't use it anymore, so I don't have numbers on hand, but it was around 2.5 million records in under a minute.

Here is the (stripped down) function I wrote to generate the job for my MapReduce process to put data into HBase

private Job createCubeJob(...) {
    //Build and Configure Job
    Job job = new Job(conf);
    job.setJobName(jobName);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
    job.setJarByClass(CubeBuilderDriver.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);

    TextInputFormat.setInputPaths(job, hiveOutputDir);
    HFileOutputFormat.setOutputPath(job, cubeOutputPath);

    Configuration hConf = HBaseConfiguration.create(conf);
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

    HTable hTable = new HTable(hConf, tableName);

    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    return job;
}

This is my map function from the HiveToHBaseMapper class (slightly edited ).

public void map(WritableComparable key, Writable val, Context context)
        throws IOException, InterruptedException {
    try{
        Configuration config = context.getConfiguration();
        String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
        String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
        String column = strs[COLUMN_INDEX];
        String Value = strs[VALUE_INDEX];
        String sKey = generateKey(strs, config);
        byte[] bKey = Bytes.toBytes(sKey);
        Put put = new Put(bKey);
        put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
                        ? Bytes.toBytes(Double.MIN_VALUE)
                        : Bytes.toBytes(value));

        ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
        context.write(ibKey, put);

        context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
    }
    catch(Exception e){
        context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);    
    }

}

I pretty sure this isn't going to be a Copy&Paste solution for you. Obviously the data I was working with here didn't need any custom processing (that was done in a MR job before this one). The main thing I want to provide out of this is the HFileOutputFormat.

answered Aug 8, 2019 by ravikiran
• 4,620 points

Related Questions In Big Data Hadoop

+1 vote
1 answer

Unable to load data in Hbase table from dataset

Try these steps (make necessary changes): First upload ...READ MORE

answered Dec 18, 2018 in Big Data Hadoop by Omkar
• 69,220 points
2,123 views
0 votes
1 answer

Is there any way to load data from MySql to HDFS?

The generic command i.e used to import ...READ MORE

answered Apr 10, 2018 in Big Data Hadoop by nitinrawat895
• 11,380 points
3,602 views
0 votes
1 answer

Is there a way to rebalance single Datanode in Hadoop.

Currently Hadoop does not automatically do this. ...READ MORE

answered Apr 15, 2018 in Big Data Hadoop by kurt_cobain
• 9,390 points
710 views
+1 vote
1 answer

What is the process to perform an incremental data load in Sqoop?

The process to perform incremental data load ...READ MORE

answered Dec 17, 2018 in Big Data Hadoop by Frankie
• 9,830 points
5,386 views
0 votes
1 answer
0 votes
1 answer

Moving files in Hadoop using the Java API?

I would recommend you to use FileSystem.rename(). ...READ MORE

answered Apr 15, 2018 in Big Data Hadoop by Shubham
• 13,490 points
2,653 views
0 votes
1 answer

Hadoop giving java.io.IOException, in mkdir Java code.

I am not sure about the issue. ...READ MORE

answered May 3, 2018 in Big Data Hadoop by Shubham
• 13,490 points
2,525 views
+1 vote
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 11,380 points
10,979 views
0 votes
1 answer

How to Sqoop in a Java Program?

You can use the following sample code for ...READ MORE

answered Jul 22, 2019 in Big Data Hadoop by ravikiran
• 4,620 points
1,626 views
0 votes
2 answers

How to use scoop in a Java Program

There is a trick which worked out ...READ MORE

answered Sep 4, 2019 in Big Data Hadoop by ravikiran
• 4,620 points
1,669 views
webinar REGISTER FOR FREE WEBINAR X
REGISTER NOW
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP