Multiple input for MapReduce job

0 votes

While running a job using

hadoop jar jarfile.jar mainClass i/p_file o/p_dir

can we specify more than one i/p files? If yes, how?

Jun 20 in Big Data Hadoop by Kunal
19 views

1 answer to this question.

0 votes

We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user wants to take input from two input files with a similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What do we need to do? Just two things:

  1. Use two mapper classes.
  2. Specify the mapper classes in MultipleInputs class object in run/main method.
File 1            File 2

Aman 19     Ash 12

Tom 20      James 21

Tony 15     Punk 21

John 18     Frank 20

Johnny 19

Hugh 17

Here is the code for the same. Notice two mapper classes with same logic and only single reducer.

import java.io.IOException;

import mutipleInput.Join;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;


public class multiInputFile extends Configured implements Tool 

{


 public static class CounterMapper extends Mapper

 {

  public void map(LongWritable key, Text value, Context context)

  throws IOException, InterruptedException

  {

   String[] line=value.toString().split("\t"); 


   context.write(new Text(line[0]), new Text(line[1]));

  }

 }



 public static class CountertwoMapper extends Mapper

 {

  public void map(LongWritable key, Text value, Context context)

  throws IOException, InterruptedException

  {

   String[] line=value.toString().split("\t");

   context.write(new Text(line[0]), new Text(line[1]));

  }

 }


 public static class CounterReducer extends Reducer

 {

  String line=null;


  public void reduce(Text key, Iterable values, Context context ) 

  throws IOException, InterruptedException

  {

   

   for(Text value:values)

   {

    line = value.toString();

   }


   context.write(key, new Text(line));

 }

}



 public int run(String[] args) throws Exception {

 Configuration conf = new Configuration();

 Job job = new Job(conf, "aggprog");

 job.setJarByClass(multiInputFile.class);

 MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,CounterMapper.class);

 MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,CountertwoMapper.class);

  

 FileOutputFormat.setOutputPath(job, new Path(args[2]));

 job.setReducerClass(CounterReducer.class);

 job.setNumReduceTasks(1);

 job.setOutputKeyClass(Text.class);

 job.setOutputValueClass(Text.class);


 return (job.waitForCompletion(true) ? 0 : 1);


 }


 public static void main(String[] args) throws Exception {


  

  int ecode = ToolRunner.run(new multiInputFile(), args);

  System.exit(ecode);


  

 }


}

Here is the output.

Ash 12

Tony 15

Hugh 17

John 18

Aman 19 

Johnny 19

Frank 20          

Tom 20

James 21

Punk 21
answered Jun 20 by Jim

Related Questions In Big Data Hadoop

0 votes
1 answer

Hadoop security GroupMappingServiceProvider exception for Spark job via Dataproc API

One of the reason behin you getting ...READ MORE

answered Mar 22, 2018 in Big Data Hadoop by nitinrawat895
• 10,670 points
131 views
0 votes
1 answer

How can I get the respective Bitcoin value for an input in USD when using c#

Simply make call to server and parse ...READ MORE

answered Mar 25, 2018 in Big Data Hadoop by charlie_brown
• 7,720 points
58 views
0 votes
1 answer

How hadoop mapreduce job is submitted to worker nodes?

Alright, I think you are basically looking ...READ MORE

answered Mar 29, 2018 in Big Data Hadoop by Ashish
• 2,630 points
1,629 views
0 votes
1 answer

Getting error in MapReduce job.setInputFormatClass

In old Hadoop API(i.e. below Hadoop 0.20.0), ...READ MORE

answered Apr 15, 2018 in Big Data Hadoop by Shubham
• 13,290 points
317 views
0 votes
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,670 points
2,679 views
0 votes
1 answer

hadoop.mapred vs hadoop.mapreduce?

org.apache.hadoop.mapred is the Old API  org.apache.hadoop.mapreduce is the ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,670 points
279 views
0 votes
10 answers

hadoop fs -put command?

put syntax: put <localSrc> <dest> copy syntax: copyFr ...READ MORE

answered Dec 7, 2018 in Big Data Hadoop by Aditya
13,294 views
0 votes
1 answer

Hadoop dfs -ls command?

In your case there is no difference ...READ MORE

answered Mar 16, 2018 in Big Data Hadoop by kurt_cobain
• 9,240 points
978 views
0 votes
1 answer

Hadoop MapReduce wordcount "type job is not applicable for the arguments" error

The combiner class is not required in ...READ MORE

answered Dec 19, 2018 in Big Data Hadoop by Omkar
• 67,380 points
34 views
0 votes
1 answer

Java MapReduce error saying No FileSystem for scheme:hdfs

The problem is with the dependencies. The ...READ MORE

answered May 9, 2018 in Big Data Hadoop by Shubham
• 13,290 points
254 views