Passing Parameters in Command Context

public class AvgStockVolPerMonthMapper extends Mapper<LongWritable, Text, TextPair, LongPair> {

	private static NYSEParser parser = new NYSEParser();
	private static TextPair mapOutputKey = new TextPair();
	private static LongPair mapOutputValue = new LongPair();

	private static Set<String> stockTickers = new HashSet<String>();

	protected void setup(Context context) throws IOException, InterruptedException {
		String stockTicker = context.getConfiguration().get("filter.by.stock");
		if (stockTicker != null) {
			String[] tickers = stockTicker.split(",");

			for (String ticker : tickers) {
				stockTickers.add(ticker);
			}
		}
	}

	public void map(LongWritable lineOffset, Text record, Context context) throws IOException, InterruptedException {
		parser.parse(record.toString());

		if (stockTickers.isEmpty() || (!stockTickers.isEmpty() && stockTickers.contains(parser.getStockTicker()))) {
			
			if(parser.getStockTicker().equals("AAN")|| parser.getStockTicker().equals("AEB")|| parser.getStockTicker().equals("TCB")|| parser.getStockTicker().equals("XAA"))
			{
				mapOutputKey.setFirst(new Text(parser.getTradeMonth()));
				mapOutputKey.setSecond(new Text(parser.getStockTicker()));
	
				mapOutputValue.setFirst(new LongWritable(parser.getVolume()));
				mapOutputValue.setSecond(new LongWritable(1));
				
				context.write(mapOutputKey, mapOutputValue);
			}			
		}
	}

}

By using Partitioner we can group the output based on specific column.The Column based on which the output should be grouped is used for Partition.In below case I have used Second Value of TextPair key for grouping.

The Output of reducer will be equal to Hash Modulo Denominator

The Below Custom Partioner again makes use of HashCode and divides by the Total number of reducer.

PartitionValue = (HashCode Value of String x Max Val of Integer)/Total No of Reducers;

package com.mugil.part;

import org.apache.hadoop.mapreduce.Partitioner;

import com.mugil.avg.LongPair;
import com.mugil.avg.TextPair;

public class FirstPartioner extends Partitioner<TextPair, LongPair>
{

   @Override
   public int getPartition(TextPair arg0, LongPair arg1, int noOfReducers) 
   {
	int partitionValue = 0 ;		
	partitionValue = (arg0.getSecond().hashCode() & Integer.MAX_VALUE)%noOfReducers;		
	return partitionValue;
   } 
}

A binary file is a file whose content must be interpreted by a program or a hardware processor that understands in advance exactly how it is formatted. That is, the file is not in any externally identifiable format so that any program that wanted to could look for certain data at a certain place within the file. A progam (or hardware processor) has to know exactly how the data inside the file is laid out to make use of the file.

Hadoop does not work very well with a lot of small files, files that are smaller than a typical HDFS Block size as it causes a memory overhead for the NameNode to hold huge amounts of small files. Also, every map task processes a block of data at a time and when a map task has too little data to process, it becomes inefficient. Starting up several such map tasks is an overhead.

To solve this problem, Sequence files are used as a container to store the small files. Sequence files are flat files containing key, value pairs. A very common use case when designing ingestion systems is to use Sequence files as containers and store any file related metadata(filename, path, creation time etc) as the key and the file contents as the value.

A Sequence file can be have three different formats: An Uncompressed format, a Record Compressed format where the value is compressed and a Block Compressed format where entire records are compressed.There are sync markers for every few 100 bytes (approximately) that represent record boundaries.

Read from Here

  1. As binary files, these are more compact than text files
  2. Provides optional support for compression at different levels – record, block.
  3. Files can be split and processed in parallel
  4. As HDFS and MapReduce are optimized for large files, Sequence Files can be used as containers for large number of small files thus solving hadoop’s drawback of processing huge number of small files.
  5. Extensively used in MapReduce jobs as input and output formats. Internally, the temporary outputs of maps are also stored using Sequence File format.

A sequence file consists of a header followed by one or more records. All the three formats uses the same header structure.

  1. Uncompressed format
  2. Record Compressed format
  3. Block-Compressed format

Header Structure of Sequence Files

Record Structure of Sequence Files

Block Structure of Sequence Files

Read from Here

e.g. Assume that you are uploading images in facebook and you have to remove duplicate images. You can’t store image in textformat. What you can do : get MD5SUM of image file and if MD5SUM already exists in the system, just discard insertion of duplicate image. In your text file, you can simply have “Date:” and “Number of images uploaded”. Image can be stored out side of HDFS system like CDN network or at some other web server

Read from Here

Listing Files in Directory

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

public class MapReduceDriver extends Configured implements Tool
{
   public static void main(String[] args) throws Exception 
   {
	MapReduceDriver objMapReduceDriver = new MapReduceDriver();
		
	Configuration conf = new Configuration();
		
	FileSystem fs = FileSystem.get(conf);
	Path path = new Path(args[0]);
		
	FileStatus[] status = fs.listStatus(path);
	Path[] paths = FileUtil.stat2Paths(status);
		
	for (Path path2 : paths) 
        {
	  System.out.println(path2.toString());
	}
		
	int res = ToolRunner.run(objMapReduceDriver, args);
	System.exit(res);
   }
Path path = new Path(args[0]);
FileStatus[] status = fs.listStatus(path);
Path[] paths = FileUtil.stat2Paths(status);
		
for(Path path2 : paths) 
  csvPaths = String.join(",", path2.toString());

FileInputFormat.setInputPaths(objJob, csvPaths);

Merging Files in a Folder
copyMerge – Parameters

  1. FileSystem Object
  2. Input Path
  3. FileSystem Object
  4. Output Path
  5. Delete Orginal File
  6. null
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);

Path inputPath = new Path(args[0]);
Path outPath = new Path(args[2]);
		
boolean Merge = FileUtil.copyMerge(fs, inputPath, fs, outPath, false, conf, null);
		
if(Merge)
  System.out.println("Merge Successful");
		

globStatus takes patterns

Path path = new Path(args[0] + "/Inputs/Input*");
FileStatus[] status = fs.globStatus(path);

Merging Multiple Paths

 import org.apache.commons.lang.StringUtils;
 
 csvPaths = StringUtils.join(paths,",");
 String[] arrcsvPaths = csvPaths.split(",");

 for (int i = 0; i < arrcsvPaths.length; i++) 
  FileInputFormat.setInputPaths(objJob, arrcsvPaths[i]);	

Passing Arguments in Command Context and Fetching It

String filterWords =  context.getConfiguration().get("Word.Name");
				
for (int i = 0; i < arrString.length; i++) 
{	
  if(filterWords.equals(arrString[i].toString()))
    context.write(new Text(arrString[i].toString()), new IntWritable(1));
}

Input

 -DWord.Name=Tests /home/turbo/workspace/MapReduce5/src/Inputs/Inputs[1-2] /home/turbo/workspace/MapReduce5/src/Outputs/

Word.Name – is the parameter passed in Command Line.The Parameters should always passed as First Value.

The argument removes the parameter once the call to main method is over. So the args.length is 3 in main() and 2 in run method()

In map-reduce v.1 mapreduce.tasktracker.map.tasks.maximum and mapreduce.tasktracker.reduce.tasks.maximum are used to configure number of map slots and reduce slots accordingly in mapred-site.xml.

starting from map-reduce v.2 (YARN), containers is a more generic term is used instead of slots, containers represents the max number of tasks that can run in parallel under the node regardless being Map task, Reduce task or application master task (in YARN).

Suppose you have a TaskTracker with 32 GB of memory, 16 map slots, and 8 reduce slots. If all task JVMs use 1 GB of memory and all slots are filled, you have 24 Java processes with 1 GB each, for a total of 24 GB. Because you have 32 GB of physical memory, there is probably enough memory for all 24 processes. On the other hand, if your average map and reduce tasks need 2 GB of memory and all slots are full, the 24 tasks could need up to 48 GB of memory, more than is available. To avoid over-committing TaskTracker node memory, reduce the number of slots.

Question : I have 10 node cluster and each node has a quad core processor
so in total there are 80 slots (8×10). It is said that one map task is alloted
per slot(map slot). Suppose usable slot is 60 (say other slots are busy handling other)
daemons). If I have a file that has 1000 blocks, and then say 40 slots are
for map task and 20 for reduce task, how the blocks are gonna process?
as the number of map task starts on number of blocks available (1000 here)
but the available map slots are 40.

Answer : The number of map tasks for a given job is driven by the number of input splits and not by the mapred.map.tasks parameter. For each input split a map task is spawned. So, over the lifetime of a mapreduce job the number of map tasks is equal to the number of input splits. mapred.map.tasks is just a hint to the InputFormat for the number of maps.

Assume your hadoop input file size is 2 GB and you set block size as 64 MB so 32 Mappers tasks are set to run while each mapper will process 64 MB block to complete the Mapper Job of your Hadoop Job.

==> Number of mappers set to run are completely dependent on 1) File Size and 2) Block Size

Assume you have running hadoop on a cluster size of 4: Assume you set mapred.map.tasks and mapred.reduce.tasks parameters in your conf file to the nodes as follows:

Node 1: mapred.map.tasks = 4 and mapred.reduce.tasks = 4
Node 2: mapred.map.tasks = 2 and mapred.reduce.tasks = 2
Node 3: mapred.map.tasks = 4 and mapred.reduce.tasks = 4
Node 4: mapred.map.tasks = 1 and mapred.reduce.tasks = 1

Assume you set the above paramters for 4 of your nodes in this cluster. If you notice Node 2 has set only 2 and 2 respectively because the processing resources of the Node 2 might be less e.g(2 Processors, 2 Cores) and Node 4 is even set lower to just 1 and 1 respectively might be due to processing resources on that node is 1 processor, 2 cores so can’t run more than 1 mapper and 1 reducer task.

So when you run the job Node 1, Node 2, Node 3, Node 4 are configured to run a max. total of (4+2+4+1)11 mapper tasks simultaneously out of 42 mapper tasks that needs to be completed by the Job. After each Node completes its map tasks it will take the remaining mapper tasks left in 42 mapper tasks.

Processor is the entire chipset including all the cores. Cores are like 2 (or more like 4 core, 6 core) parts of the processor that does parallel processing (processing two different data simultaneously in different units) which helps in multitasking without causing much strain on the processor. Each core itself is a processor technically. But the chipset is manufactured in such a way that the different cores work with coordination and not individually. An anlogy is dividing a large hall into several “identical” bedrooms so that there is no overcrowding. Each bedroom is like a core that does the same function of keeping the guests but are different physically.

Mapper
The map function in Mapper reads row by row of Input File

Combiner
The Combiner wont be called when the call to the Reducer class is not there in Driver class.

Reducer
The Reducer and Combiner need not do the same thing as in case of average of 0 to 100 Numbers.

Input to Mapper

1/1/09 1:26,Product2,1200,Nikki,United States
1/1/09 1:51,Product2,1200,Si,Denmark
1/1/09 10:06,Product2,3600,Irene,Germany
1/1/09 11:05,Product2,1200,Janis,Ireland
1/1/09 12:19,Product2,1200,Marlene,United States
1/1/09 12:20,Product2,3600,seemab,Malta
1/1/09 12:25,Product2,3600,Anne-line,Switzerland
1/1/09 12:42,Product1,1200,ashton,United Kingdom
1/1/09 14:19,Product2,1200,Gabriel,Canada
1/1/09 14:22,Product1,1200,Liban,Norway
1/1/09 16:00,Product2,1200,Toni,United Kingdom
1/1/09 16:44,Product2,1200,Julie,United States
1/1/09 18:32,Product1,1200,Andrea,United States

Output of Mapper

Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {3600} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {3600} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {3600} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product1}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product1}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product1}
Value = Product Price {1200} 	  Product No {1}
----------------------------

Magic of Framework happens Here
Input of Combiner

----------------------------
Key = Product Name {2009-01} 	 Product No {Product1}
Values
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
----------------------------
Key = Product Name {2009-01} 	 Product No {Product2}
Values
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 3600	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 3600	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 3600	
Product No 1
----------------------------

Values added together in Combiner based on Key

key 2009-01	Product1
productPrice 1200
productNos 1
----------------------------
productPrice 2400
productNos 2
----------------------------
productPrice 3600
productNos 3
----------------------------
key 2009-01	Product2
productPrice 1200
productNos 1
----------------------------
productPrice 2400
productNos 2
----------------------------
productPrice 6000
productNos 3
----------------------------
productPrice 7200
productNos 4
----------------------------
productPrice 8400
productNos 5
----------------------------
productPrice 12000
productNos 6
----------------------------
productPrice 13200
productNos 7
----------------------------
productPrice 14400
productNos 8
----------------------------
productPrice 15600
productNos 9
----------------------------
productPrice 19200
productNos 10
----------------------------

Output of Combiner and Input to reducer

Key = Product Name {2009-01} 	 Product No {Product1}
value = Product Price {3600}	Product Nos {3}
----------------------------
key = Key = Product Name {2009-01} 	 Product No {Product2}
Value = Product Price {19200} Product Nos {10}

Output of Reducer

Key = Product Name {2009-01} 	 Product No {Product1}
Value = AvgVolume {1200}	NoOfRecords {3}
----------------------------
Key = Product Name {2009-01} 	 Product No {Product2}
Value = AvgVolume {1920}	NoOfRecords {10}
----------------------------

CompareTo is used for Object Comparison

The compareTo logic tells obviously how to sort the dataset and also tells the reducer what elements are equal so they can be grouped.

Compares this object with the specified object for order. Returns a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than the specified object.

Let’s say we would like to compare Jedis by their age:

class Jedi implements Comparable<Jedi> {

    private final String name;
    private final int age;
        //...
}

Then if our Jedi is older than the provided one, you must return a positive, if they are the same age, you return 0, and if our Jedi is younger you return a negative.

public int compareTo(Jedi jedi){
    return this.age > jedi.age ? 1 : this.age < jedi.age ? -1 : 0;
}

By implementing the compareTo method (coming from the Comparable interface) your are defining what is called a natural order. All sorting methods in JDK will use this ordering by default.

There are ocassions in which you may want to base your comparision in other objects, and not on a primitive type. For instance, copare Jedis based on their names. In this case, if the objects being compared already implement Comparable then you can do the comparison using its compareTo method.

public int compareTo(Jedi jedi){
    return this.name.compareTo(jedi.getName());
}

It would be simpler in this case.

Now, if you inted to use both name and age as the comparison criteria then you have to decide your oder of comparison, what has precedence. For instance, if two Jedis are named the same, then you can use their age to decide which goes first and which goes second.

public int compareTo(Jedi jedi){
    int result = this.name.compareTo(jedi.getName());
    if(result == 0){
        result = this.age > jedi.age ? 1 : this.age < jedi.age ? -1 : 0;
    }
    return result;
}

If you had an array of Jedis

Jedi[] jediAcademy = {new Jedi("Obiwan",80), new Jedi("Anakin", 30), ..}

All you have to do is to ask to the class java.util.Arrays to use its sort method.

Arrays.sort(jediAcademy);

This Arrays.sort method will use your compareTo method to sort the objects one by one.

  1. Yes, a combiner can be different to the Reducer, although your Combiner will still be implementing the Reducer interface. Combiners can only be used in specific cases which are going to be job dependent. The Combiner will operate like a Reducer, but only on the subset of the Key/Values output from each Mapper.One constraint that your Combiner will have, unlike a Reducer, is that the input/output key and value types must match the output types of your Mapper.
  2. The primary goal of combiners is to optimize/minimize the number of key value pairs that will be shuffled across the network between mappers and reducers and thus to save as most bandwidth as possible.
  3. The thumb rule of combiner is it has to have the same input and output variable types, the reason for this, is combiner use is not guaranteed, it can or can not be used , depending the volume and number of spills.
  4. The reducer can be used as a combiner when it satisfies this rule i.e. same input and output variable type.
  5. The other most important rule for combiner is it can only be used when the function you want to apply is both commutative and associative. like adding numbers .But not in case like average(if u r using same code as reducer).Combiners can only be used on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c}

n HDFS architecture there is a concept of blocks. A typical block size used by HDFS is 64 MB. When we place a large file into HDFS it chopped up into 64 MB chunks(based on default configuration of blocks), Suppose you have a file of 1GB and you want to place that file in HDFS,then there will be 1GB/64MB = 16 split/blocks and these block will be distribute across the DataNodes. These blocks/chunk will reside on a different DataNode based on your cluster configuration.

Data splitting happens based on file offsets.The goal of splitting of file and store it into different blocks is parallel processing and fail over of data.

Difference between block size and split size.

Split is logical split of your data, basically used during data processing using Map/Reduce program or other processing techniques. Split size is user defined and you can choose your split size based on your data(How much data you are processing).

Split is basically used to control number of Mapper in Map/Reduce program. If you have not defined any input split size in Map/Reduce program then default HDFS block split will be considered as input split.

Example:
Suppose you have a file of 100MB and HDFS default block configuration is 64MB then it will chopped in 2 split and occupy 2 blocks. Now you have a Map/Reduce program to process this data but you have not specified any input split then based on the number of blocks(2 block) input split will be considered for the Map/Reduce processing and 2 mapper will get assigned for this job.

But suppose,you have specified the split size(say 100MB) in your Map/Reduce program then both blocks(2 block) will be considered as a single split for the Map/Reduce processing and 1 Mapper will get assigned for this job.

Suppose,you have specified the split size(say 25MB) in your Map/Reduce program then there will be 4 input split for the Map/Reduce program and 4 Mapper will get assigned for the job.

Conclusion:

Split is a logical division of the input data while block is a physical division of data.
HDFS default block size is default split size if input split is not specified.
Split is user defined and user can control split size in his Map/Reduce program.
One split can be mapping to multiple blocks and there can be multiple split of one block.
The number of map tasks (Mapper) are equal to the number of splits.

If your resource is limited and you want to limit the number of maps you can increase the split size. For example: If we have 640 MB of 10 blocks i.e. each block of 64 MB and resource is limited then you can mention Split size as 128 MB then then logical grouping of 128 MB is formed and only 5 maps will be executed with a size of 128 MB.

If we specify split size is false then whole file will form one input split and processed by one map which it takes more time to process when file is big.

Reference

Default Mapper and Reducer are from

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

when the Mapper and reducer are not set using job.setMapperClass()
and job.setReducerClass() then default Mapper.class and Reducer.class will be considered

The Mapper.class performs a word count on lines.The input and output of the default ampper and reducer are as shown.

Input

Test
Test
Test

Output

0	test
5	test
10	test

The Line is considered as a word test – 4 + Carriage Return 1 = 5

package com.mugilmapred;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Test extends Configured implements Tool{

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Test objTest = new Test();
		int result = ToolRunner.run(objTest, args);
		System.exit(result);
	}

	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub		
		Job job =  new Job(getConf());
                job.setJarByClass(Test.class);
				
		Path inputFilepath = new Path(args[0]);
		Path outputFilepath = new Path(args[1]);
		
		FileInputFormat.addInputPath(job, inputFilepath);
		FileOutputFormat.setOutputPath(job, outputFilepath);
		
		FileSystem fs = FileSystem.newInstance(getConf());
		
		if(fs.exists(outputFilepath))
		{
			fs.delete(outputFilepath, true);
		}			
		return job.waitForCompletion(true)? 0:1;
	}
}

when you dont add set jar by class it will throw

Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.mugilmapred.Test$Map not found

If you run locally it wont expect this to be specified by when you run in local the class which contains the mapper should be specified else the system does not know in which jar file the mapper is located

job.setJarByClass(Test.class);

You can aslo use setJar as below

job.setJar("Test-0.0.1.jar");

Using a Predefined Reducer in Program

.
.
.
job.setMapperClass(WordMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
    
job.setReducerClass(LongSumReducer.class);
job.setNumReduceTasks(1);
.
.
.
.

LongSumReducer.class takes input from mapper ([count,1] [count,1] [count,1] [count,1]) and group it together as ([count,4])