n HDFS architecture there is a concept of blocks. A typical block size used by HDFS is 64 MB. When we place a large file into HDFS it chopped up into 64 MB chunks(based on default configuration of blocks), Suppose you have a file of 1GB and you want to place that file in HDFS,then there will be 1GB/64MB = 16 split/blocks and these block will be distribute across the DataNodes. These blocks/chunk will reside on a different DataNode based on your cluster configuration.

Data splitting happens based on file offsets.The goal of splitting of file and store it into different blocks is parallel processing and fail over of data.

Difference between block size and split size.

Split is logical split of your data, basically used during data processing using Map/Reduce program or other processing techniques. Split size is user defined and you can choose your split size based on your data(How much data you are processing).

Split is basically used to control number of Mapper in Map/Reduce program. If you have not defined any input split size in Map/Reduce program then default HDFS block split will be considered as input split.

Example:
Suppose you have a file of 100MB and HDFS default block configuration is 64MB then it will chopped in 2 split and occupy 2 blocks. Now you have a Map/Reduce program to process this data but you have not specified any input split then based on the number of blocks(2 block) input split will be considered for the Map/Reduce processing and 2 mapper will get assigned for this job.

But suppose,you have specified the split size(say 100MB) in your Map/Reduce program then both blocks(2 block) will be considered as a single split for the Map/Reduce processing and 1 Mapper will get assigned for this job.

Suppose,you have specified the split size(say 25MB) in your Map/Reduce program then there will be 4 input split for the Map/Reduce program and 4 Mapper will get assigned for the job.

Conclusion:

Split is a logical division of the input data while block is a physical division of data.
HDFS default block size is default split size if input split is not specified.
Split is user defined and user can control split size in his Map/Reduce program.
One split can be mapping to multiple blocks and there can be multiple split of one block.
The number of map tasks (Mapper) are equal to the number of splits.

If your resource is limited and you want to limit the number of maps you can increase the split size. For example: If we have 640 MB of 10 blocks i.e. each block of 64 MB and resource is limited then you can mention Split size as 128 MB then then logical grouping of 128 MB is formed and only 5 maps will be executed with a size of 128 MB.

If we specify split size is false then whole file will form one input split and processed by one map which it takes more time to process when file is big.

Reference

Default Mapper and Reducer are from

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

when the Mapper and reducer are not set using job.setMapperClass()
and job.setReducerClass() then default Mapper.class and Reducer.class will be considered

The Mapper.class performs a word count on lines.The input and output of the default ampper and reducer are as shown.

Input

Test
Test
Test

Output

0	test
5	test
10	test

The Line is considered as a word test – 4 + Carriage Return 1 = 5

package com.mugilmapred;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Test extends Configured implements Tool{

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Test objTest = new Test();
		int result = ToolRunner.run(objTest, args);
		System.exit(result);
	}

	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub		
		Job job =  new Job(getConf());
                job.setJarByClass(Test.class);
				
		Path inputFilepath = new Path(args[0]);
		Path outputFilepath = new Path(args[1]);
		
		FileInputFormat.addInputPath(job, inputFilepath);
		FileOutputFormat.setOutputPath(job, outputFilepath);
		
		FileSystem fs = FileSystem.newInstance(getConf());
		
		if(fs.exists(outputFilepath))
		{
			fs.delete(outputFilepath, true);
		}			
		return job.waitForCompletion(true)? 0:1;
	}
}

when you dont add set jar by class it will throw

Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.mugilmapred.Test$Map not found

If you run locally it wont expect this to be specified by when you run in local the class which contains the mapper should be specified else the system does not know in which jar file the mapper is located

job.setJarByClass(Test.class);

You can aslo use setJar as below

job.setJar("Test-0.0.1.jar");

Using a Predefined Reducer in Program

.
.
.
job.setMapperClass(WordMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
    
job.setReducerClass(LongSumReducer.class);
job.setNumReduceTasks(1);
.
.
.
.

LongSumReducer.class takes input from mapper ([count,1] [count,1] [count,1] [count,1]) and group it together as ([count,4])

Table Creation

CREATE TABLE HomeNeeds(Type STRING, Product STRING, No INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TextFile;

Insertion

LOAD DATA LOCAL INPATH '/home/turbo/workspace/Sample Datas/Test.csv'
OVERWRITE INTO TABLE HomeNeeds;

Create Table with Partition

CREATE TABLE HomeNeeds(Type String, Product String, No Int)
PARTITIONED BY (Date String, Country String)  
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','; 

The Partitioned columns and Table columns have no Relations with one another

Inserting into Partitioned Table

LOAD DATA LOCAL INPATH '/home/turbo/workspace/Sample Datas/Test.csv' 
INTO TABLE HomeNeeds
PARTITION (Date='2001-01-25', Country='India');

Partition and Bucketing

CREATE TABLE HomeNeeds(Type String, Item String, No Int)
PARTITIONED BY (Area String)
CLUSTERED BY (Type) INTO 4 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';





	
 package com.mugil.pig;

import java.io.IOException;

import org.apache.pig.FilterFunc;
import org.apache.pig.data.Tuple;

public class FilterType  extends FilterFunc {
	
	@Override
	public Boolean exec(Tuple tuple) throws IOException {
		
		if(tuple == null || tuple.size() == 0)		
		 return false;
		
		try {
			Object obj = tuple.get(0);
			
			if(obj == null)			
			 return false;
			
			String Type = (String)obj;
			
			if(Type.equals("Kitchen"))
			 return true;
			
		} catch (Exception e) {
			throw new IOException("Caught exception processing input row " + e.getMessage(), e);
		}
			
		return false;
	}
}

Registering UDF Function

grunt> REGISTER  /usr/local/pig-0.15.0/FilterByType3.jar;                  
grunt> DEFINE FilterType com.mugil.pig.FilterType();         
grunt> filtered_records = FILTER records BY FilterType(Type);
grunt> DUMP filtered_records;
Posted in Pig.

Binary search is faster then linear search if the collection is sorted and does not contains duplicated Values

public static void BinarySearch(int searchVal)
{
  int lowerIndex = 0;
  int higherIndex = arrNumbers.length;
  int searchIndex = 0;
  
  while(lowerIndex < higherIndex)
  {
	int middleIndex = (lowerIndex + higherIndex)/2;
	
	if(searchVal < arrNumbers[middleIndex])
	{
		higherIndex = middleIndex + 1; 
	}
	else if(searchVal > arrNumbers[middleIndex])
	{
		lowerIndex = middleIndex - 1;
	}  
	else
	{
		searchIndex = middleIndex+1;			
		System.out.println("The element is Found at Index " + searchIndex);
		return;
	}
  }
}

Bubble Sort

public void bubbleSort()
{
	for (int i = arrNumbers.length-1; i>1 ; i--) 
	{	
		for (int j = 0; j < i; j++) 
		{			
			if(arrNumbers[j] > arrNumbers[j+1])
			{
				swapValuesAtIndex(j, j+1);					
			}
			
			/*IterationDisplay(arrNumbers, j);*/
		}
	}
}

Selection Sort
Selection sort works by dividing the list into 2 Parts. Sorted and Unsorted.Taking one element at a time as Minimum element it works by comparing the minimum element with other elements in the list.

public void selectionSort()
{
	int minElement = 0;
	
	for (int i = 0; i< arrNumbers.length ; i++) 
	{
     	minElement = i;
		
		for (int j = i; j < arrNumbers.length; j++) 
		{			
			if(arrNumbers[minElement] > arrNumbers[j])
			{						
				minElement =  j;
			}
		}
		
		swapValuesAtIndex(minElement, i);
	}
}

Insertion Sort
Insertion sort is the best sorting method compared to others.The list is divided into sorted and unsorted portion. Once a no is selected for comparison it will not ened without placing the no at the correct location.

public void insertionSort()
{	
	for (int i = 1; i < arrNumbers.length; i++) 
	{
		int j = i;
		int toCompare = arrNumbers[i];
		
		//holds no to Insert - arrNumbers[j-1]
		while((j>0) && (arrNumbers[j-1] > toCompare))
		{
			arrNumbers[j] = arrNumbers[j-1];
			j--;
		}
		
		arrNumbers[j] =  toCompare;		
	}
}
  1. Linear search is faster when searching for a element in a collection where the elements are duplicated and occurs multiple time. Binary Search is efficient when the collection elements are unique
public static String[] removeElements(String[] input, String deleteMe) 
{
    List result = new LinkedList();

    for(String item : input)
        if(!deleteMe.equals(item))
            result.add(item);

    return result.toArray(input);
}

Loading CSV File using Pig Script

A = LOAD '...' USING PigStorage(',') AS (...);

Filtering NULL values in chararray
Example1: null as chararray
input.txt

1,2014-04-08 12:09:23.0
2,2014-04-08 12:09:23.0
3,null
4,null

Pig:

A = LOAD 'input.txt' USING PigStorage(',') AS (f1:int,f2:chararray);
B = FILTER A BY f2!='null';
DUMP B;

Example2: Real null value
input.txt

1,2014-04-08 12:09:23.0
2,2014-04-08 12:09:23.0
3,
4,

Pig:

A = LOAD 'input.txt' USING PigStorage(',') AS (f1:int,f2:chararray);
B = FILTER A BY f2 is not null;
DUMP B;

Output:

(1,2014-04-08 12:09:23.0)
(2,2014-04-08 12:09:23.0)

Finding Max from CSV File

Test.csv

Maruthi,10
Maruthi,55
Suziki,50
Honda,4
Maruthi,40
Suziki,60
Honda,14
BMW,140
Benz,5
a1 = LOAD 'Test.csv' USING PigStorage(',') AS (Car:chararray, No:int);
DESCRIBE a1;

Output

 a1: {Car: chararray,No: int}
b1 = GROUP a1 BY Car;
DESCRIBE b1;
 b1: {group: chararray,a1: {(Car: chararray,No: int)}}
DUMP b1;

Output

(BMW,{(BMW,140)})
(Benz,{(Benz,5)})
(Honda,{(Honda,4),(Honda,14)})
(Suziki,{(Suziki,50),(Suziki,60)})
(Maruthi,{(Maruthi,10),(Maruthi,55),(Maruthi,40)})
(,{(,)})
 c1 = FOREACH b1 GENERATE group, MAX(a1.No);

Output

(BMW,140)
(Benz,5)
(Honda,14)
(Suziki,60)
(Maruthi,55)

Filtering Empty Records
Corrpted Record tsv content

HouseHold,Soap,2
Kitchen,Oil,2
HouseHold,Sweeper,2
PoojaItems,Sandal
Kitchen,Rice,30
HouseHold,,1
Kitchen,Sugar,5
HouseHold,Shampoo,2
PoojaItems,Champor,10
HouseHold,Soap,2
filtered_records = FILTER records BY Item is null OR No is null;

Getting Count of Corrupted Records

records = LOAD 'Test.csv' USING PigStorage(',') AS (Type:chararray, Item:chararray, No:int);
filtered_records = FILTER records BY Item is null OR No is null;
grouped_records = GROUP filtered_records BY Type;
DESCRIBE grouped_records;
 grouped_records: {group: chararray,filtered_records: {(Type: chararray,Item: chararray,No: int)}} 
corrupt_records = FOREACH grouped_records GENERATE group , COUNT(filtered_records);
(HouseHold,1)
(PoojaItems,1)

Writing macros to find Maximum Item Sold

DEFINE max_item_sold(Records, Type, No) RETURNS c 
{ 
 b = GROUP $Records BY $Type;                        
 $c = FOREACH b GENERATE group, MAX($Records.$No);  
}; 
 max_type_sold = max_item_sold(records, Type, No);
Posted in Pig.