In map-reduce v.1 mapreduce.tasktracker.map.tasks.maximum and mapreduce.tasktracker.reduce.tasks.maximum are used to configure number of map slots and reduce slots accordingly in mapred-site.xml.

starting from map-reduce v.2 (YARN), containers is a more generic term is used instead of slots, containers represents the max number of tasks that can run in parallel under the node regardless being Map task, Reduce task or application master task (in YARN).

Suppose you have a TaskTracker with 32 GB of memory, 16 map slots, and 8 reduce slots. If all task JVMs use 1 GB of memory and all slots are filled, you have 24 Java processes with 1 GB each, for a total of 24 GB. Because you have 32 GB of physical memory, there is probably enough memory for all 24 processes. On the other hand, if your average map and reduce tasks need 2 GB of memory and all slots are full, the 24 tasks could need up to 48 GB of memory, more than is available. To avoid over-committing TaskTracker node memory, reduce the number of slots.

Question : I have 10 node cluster and each node has a quad core processor
so in total there are 80 slots (8×10). It is said that one map task is alloted
per slot(map slot). Suppose usable slot is 60 (say other slots are busy handling other)
daemons). If I have a file that has 1000 blocks, and then say 40 slots are
for map task and 20 for reduce task, how the blocks are gonna process?
as the number of map task starts on number of blocks available (1000 here)
but the available map slots are 40.

Answer : The number of map tasks for a given job is driven by the number of input splits and not by the mapred.map.tasks parameter. For each input split a map task is spawned. So, over the lifetime of a mapreduce job the number of map tasks is equal to the number of input splits. mapred.map.tasks is just a hint to the InputFormat for the number of maps.

Assume your hadoop input file size is 2 GB and you set block size as 64 MB so 32 Mappers tasks are set to run while each mapper will process 64 MB block to complete the Mapper Job of your Hadoop Job.

==> Number of mappers set to run are completely dependent on 1) File Size and 2) Block Size

Assume you have running hadoop on a cluster size of 4: Assume you set mapred.map.tasks and mapred.reduce.tasks parameters in your conf file to the nodes as follows:

Node 1: mapred.map.tasks = 4 and mapred.reduce.tasks = 4
Node 2: mapred.map.tasks = 2 and mapred.reduce.tasks = 2
Node 3: mapred.map.tasks = 4 and mapred.reduce.tasks = 4
Node 4: mapred.map.tasks = 1 and mapred.reduce.tasks = 1

Assume you set the above paramters for 4 of your nodes in this cluster. If you notice Node 2 has set only 2 and 2 respectively because the processing resources of the Node 2 might be less e.g(2 Processors, 2 Cores) and Node 4 is even set lower to just 1 and 1 respectively might be due to processing resources on that node is 1 processor, 2 cores so can’t run more than 1 mapper and 1 reducer task.

So when you run the job Node 1, Node 2, Node 3, Node 4 are configured to run a max. total of (4+2+4+1)11 mapper tasks simultaneously out of 42 mapper tasks that needs to be completed by the Job. After each Node completes its map tasks it will take the remaining mapper tasks left in 42 mapper tasks.

Processor is the entire chipset including all the cores. Cores are like 2 (or more like 4 core, 6 core) parts of the processor that does parallel processing (processing two different data simultaneously in different units) which helps in multitasking without causing much strain on the processor. Each core itself is a processor technically. But the chipset is manufactured in such a way that the different cores work with coordination and not individually. An anlogy is dividing a large hall into several “identical” bedrooms so that there is no overcrowding. Each bedroom is like a core that does the same function of keeping the guests but are different physically.

Mapper
The map function in Mapper reads row by row of Input File

Combiner
The Combiner wont be called when the call to the Reducer class is not there in Driver class.

Reducer
The Reducer and Combiner need not do the same thing as in case of average of 0 to 100 Numbers.

Input to Mapper

1/1/09 1:26,Product2,1200,Nikki,United States
1/1/09 1:51,Product2,1200,Si,Denmark
1/1/09 10:06,Product2,3600,Irene,Germany
1/1/09 11:05,Product2,1200,Janis,Ireland
1/1/09 12:19,Product2,1200,Marlene,United States
1/1/09 12:20,Product2,3600,seemab,Malta
1/1/09 12:25,Product2,3600,Anne-line,Switzerland
1/1/09 12:42,Product1,1200,ashton,United Kingdom
1/1/09 14:19,Product2,1200,Gabriel,Canada
1/1/09 14:22,Product1,1200,Liban,Norway
1/1/09 16:00,Product2,1200,Toni,United Kingdom
1/1/09 16:44,Product2,1200,Julie,United States
1/1/09 18:32,Product1,1200,Andrea,United States

Output of Mapper

Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {3600} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {3600} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {3600} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product1}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product1}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product2}
Value = Product Price {1200} 	  Product No {1}
----------------------------
Key = Product Date {2009-01} Product Name {Product1}
Value = Product Price {1200} 	  Product No {1}
----------------------------

Magic of Framework happens Here
Input of Combiner

----------------------------
Key = Product Name {2009-01} 	 Product No {Product1}
Values
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
----------------------------
Key = Product Name {2009-01} 	 Product No {Product2}
Values
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 3600	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 3600	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 1200	
Product No 1
Product Price 3600	
Product No 1
----------------------------

Values added together in Combiner based on Key

key 2009-01	Product1
productPrice 1200
productNos 1
----------------------------
productPrice 2400
productNos 2
----------------------------
productPrice 3600
productNos 3
----------------------------
key 2009-01	Product2
productPrice 1200
productNos 1
----------------------------
productPrice 2400
productNos 2
----------------------------
productPrice 6000
productNos 3
----------------------------
productPrice 7200
productNos 4
----------------------------
productPrice 8400
productNos 5
----------------------------
productPrice 12000
productNos 6
----------------------------
productPrice 13200
productNos 7
----------------------------
productPrice 14400
productNos 8
----------------------------
productPrice 15600
productNos 9
----------------------------
productPrice 19200
productNos 10
----------------------------

Output of Combiner and Input to reducer

Key = Product Name {2009-01} 	 Product No {Product1}
value = Product Price {3600}	Product Nos {3}
----------------------------
key = Key = Product Name {2009-01} 	 Product No {Product2}
Value = Product Price {19200} Product Nos {10}

Output of Reducer

Key = Product Name {2009-01} 	 Product No {Product1}
Value = AvgVolume {1200}	NoOfRecords {3}
----------------------------
Key = Product Name {2009-01} 	 Product No {Product2}
Value = AvgVolume {1920}	NoOfRecords {10}
----------------------------

CompareTo is used for Object Comparison

The compareTo logic tells obviously how to sort the dataset and also tells the reducer what elements are equal so they can be grouped.

Compares this object with the specified object for order. Returns a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than the specified object.

Let’s say we would like to compare Jedis by their age:

class Jedi implements Comparable<Jedi> {

    private final String name;
    private final int age;
        //...
}

Then if our Jedi is older than the provided one, you must return a positive, if they are the same age, you return 0, and if our Jedi is younger you return a negative.

public int compareTo(Jedi jedi){
    return this.age > jedi.age ? 1 : this.age < jedi.age ? -1 : 0;
}

By implementing the compareTo method (coming from the Comparable interface) your are defining what is called a natural order. All sorting methods in JDK will use this ordering by default.

There are ocassions in which you may want to base your comparision in other objects, and not on a primitive type. For instance, copare Jedis based on their names. In this case, if the objects being compared already implement Comparable then you can do the comparison using its compareTo method.

public int compareTo(Jedi jedi){
    return this.name.compareTo(jedi.getName());
}

It would be simpler in this case.

Now, if you inted to use both name and age as the comparison criteria then you have to decide your oder of comparison, what has precedence. For instance, if two Jedis are named the same, then you can use their age to decide which goes first and which goes second.

public int compareTo(Jedi jedi){
    int result = this.name.compareTo(jedi.getName());
    if(result == 0){
        result = this.age > jedi.age ? 1 : this.age < jedi.age ? -1 : 0;
    }
    return result;
}

If you had an array of Jedis

Jedi[] jediAcademy = {new Jedi("Obiwan",80), new Jedi("Anakin", 30), ..}

All you have to do is to ask to the class java.util.Arrays to use its sort method.

Arrays.sort(jediAcademy);

This Arrays.sort method will use your compareTo method to sort the objects one by one.

  1. Yes, a combiner can be different to the Reducer, although your Combiner will still be implementing the Reducer interface. Combiners can only be used in specific cases which are going to be job dependent. The Combiner will operate like a Reducer, but only on the subset of the Key/Values output from each Mapper.One constraint that your Combiner will have, unlike a Reducer, is that the input/output key and value types must match the output types of your Mapper.
  2. The primary goal of combiners is to optimize/minimize the number of key value pairs that will be shuffled across the network between mappers and reducers and thus to save as most bandwidth as possible.
  3. The thumb rule of combiner is it has to have the same input and output variable types, the reason for this, is combiner use is not guaranteed, it can or can not be used , depending the volume and number of spills.
  4. The reducer can be used as a combiner when it satisfies this rule i.e. same input and output variable type.
  5. The other most important rule for combiner is it can only be used when the function you want to apply is both commutative and associative. like adding numbers .But not in case like average(if u r using same code as reducer).Combiners can only be used on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c}

n HDFS architecture there is a concept of blocks. A typical block size used by HDFS is 64 MB. When we place a large file into HDFS it chopped up into 64 MB chunks(based on default configuration of blocks), Suppose you have a file of 1GB and you want to place that file in HDFS,then there will be 1GB/64MB = 16 split/blocks and these block will be distribute across the DataNodes. These blocks/chunk will reside on a different DataNode based on your cluster configuration.

Data splitting happens based on file offsets.The goal of splitting of file and store it into different blocks is parallel processing and fail over of data.

Difference between block size and split size.

Split is logical split of your data, basically used during data processing using Map/Reduce program or other processing techniques. Split size is user defined and you can choose your split size based on your data(How much data you are processing).

Split is basically used to control number of Mapper in Map/Reduce program. If you have not defined any input split size in Map/Reduce program then default HDFS block split will be considered as input split.

Example:
Suppose you have a file of 100MB and HDFS default block configuration is 64MB then it will chopped in 2 split and occupy 2 blocks. Now you have a Map/Reduce program to process this data but you have not specified any input split then based on the number of blocks(2 block) input split will be considered for the Map/Reduce processing and 2 mapper will get assigned for this job.

But suppose,you have specified the split size(say 100MB) in your Map/Reduce program then both blocks(2 block) will be considered as a single split for the Map/Reduce processing and 1 Mapper will get assigned for this job.

Suppose,you have specified the split size(say 25MB) in your Map/Reduce program then there will be 4 input split for the Map/Reduce program and 4 Mapper will get assigned for the job.

Conclusion:

Split is a logical division of the input data while block is a physical division of data.
HDFS default block size is default split size if input split is not specified.
Split is user defined and user can control split size in his Map/Reduce program.
One split can be mapping to multiple blocks and there can be multiple split of one block.
The number of map tasks (Mapper) are equal to the number of splits.

If your resource is limited and you want to limit the number of maps you can increase the split size. For example: If we have 640 MB of 10 blocks i.e. each block of 64 MB and resource is limited then you can mention Split size as 128 MB then then logical grouping of 128 MB is formed and only 5 maps will be executed with a size of 128 MB.

If we specify split size is false then whole file will form one input split and processed by one map which it takes more time to process when file is big.

Reference

Finding Friends via map reduce

Link

Difference between combiner and Reducer

  • One constraint that a Combiner will have, unlike a Reducer, is that the input/output key and value types must match the output types of your Mapper.
  • Combiners can only be used on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c} . This also means that combiners may operate only on a subset of your keys and values or may not execute at all, still you want the output of the program to remain same.
  • Reducers can get data from multiple Mappers as part of the partitioning process. Combiners can only get its input from one Mapper.

difference between Hdfs block and input split

  • Block is the physical part of disk which has minimum amount of data that can be read or write. The actual size of block is decided during the design phase.For example, the block size of HDFS can be 128MB/256MB though the default HDFS block size is 64 MB.
  • HDFS block are physically entity while Input split is logical partition.
  • What is logical partition –> Logical partition means it will has just the information about blocks address or location. In the case where last record (value) in the block is incomplete,the input split includes location information for the next block and byte offset of the data needed to complete the record.

Counters Example Link

User Defined Counters Example

What is GenericOptionsParser
GenericOptionsParser is a utility to parse command line arguments generic to the Hadoop framework. GenericOptionsParser recognizes several standard command line arguments, enabling applications to easily specify a namenode, a ResourceManager, additional configuration resources etc.

The usage of GenericOptionsParser enables to specify Generic option in the command line itself

Eg: With Genericoption you can specify the following

>>hadoop jar /home/hduser/WordCount/wordcount.jar WordCount -Dmapred.reduce.tasks=20 input output

GenericOptionsParser vs ToolRunner

There’s no extra privileges, but your Command line options get run via the GenericOptionsParser, which will allow you extract certain configuration properties and configure a Configuration object from it

By using ToolRunner.run(), any hadoop application can handle standard command line options supported by hadoop. ToolRunner uses GenericOptionsParser internally. In short, the hadoop specific options which are provided command line are parsed and set into the Configuration object of the application.

eg. If you say:

>>hadoop MyHadoopApp -D mapred.reduce.tasks=3

Then ToolRunner.run(new MyHadoopApp(), args) will automatically set the value parameter mapred.reduce.tasks to 3 in the Configuration object.

Basically rather that parsing some options yourself (using the index of the argument in the list), you can explicitly configure Configuration properties from the command line:

hadoop jar myJar.jar com.Main prop1value prop2value
public static void main(String args[]) {
    Configuration conf = new Configuration();
    conf.set("prop1", args[0]);
    conf.set("prop2", args[1]);

    conf.get("prop1"); // will resolve to "prop1Value"
    conf.get("prop2"); // will resolve to "prop2Value"
}

Becomes much more condensed with ToolRunner:

hadoop jar myJar.jar com.Main -Dprop1=prop1value -Dprop2=prop2value

public int run(String args[]) {
    Configuration conf = getConf();

    conf.get("prop1"); // will resolve to "prop1Value"
    conf.get("prop2"); // will resolve to "prop2Value"
}

GenericOptionsParser, Tool, and ToolRunner for running Hadoop Job

Hadoop comes with a few helper classes for making it easier to run jobs from the command line. GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your application to use as desired. You don’t usually use GenericOptionsParser directly, as it’s more convenient to implement the Tool interface and run your application with the ToolRunner, which uses GenericOptionsParser internally:

public interface Tool extends Configurable {
        int run(String [] args) throws Exception;
    }

Below example shows a very simple implementation of Tool, for running the Hadoop Map Reduce Job.

public class WordCountConfigured extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        return 0;
        }        
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WordCountConfigured(), args);
        System.exit(exitCode);
    }

We make WordCountConfigured a subclass of Configured, which is an implementation of the Configurable interface. All implementations of Tool need to implement Configurable (since Tool extends it), and subclassing Configured is often the easiest way to achieve this. The run() method obtains the Configuration using Configurable’s getConf() method, and then iterates over it, printing each property to standard output.

WordCountConfigured’s main() method does not invoke its own run() method directly. Instead, we call ToolRunner’s static run() method, which takes care of creating a Configuration object for the Tool, before calling its run() method. ToolRunner also uses a GenericOptionsParser to pick up any standard options specified on the command line, and set them on the Configuration instance. We can see the effect of picking up the properties specified in conf/hadoop-localhost.xml by running the following command:

    >>hadoop WordCountConfigured -conf conf/hadoop-localhost.xml -D mapred.job.tracker=localhost:10011 -D mapred.reduce.tasks=n

Options specified with -D take priority over properties from the configuration files. This is very useful: you can put defaults into configuration files, and then override them with the -D option as needed. A common example of this is setting the number of reducers for a MapReduce job via -D mapred.reduce.tasks=n. This will override the number of reducers set on the cluster, or if set in any client-side configuration files. The other options that GenericOptionsParser and ToolRunner support are listed in Table.

Does the MapReduce framework (for example Hadoop implementation), assign the inputs for mappers before mapper job starts or it is done at runtime?
In MapReduce framework, the Mapper tasks are assigned to the machines based on the Data Locality Concept. This means, data nodes which are storing the block of the data, will be assigned to execute the mapper task for that block of data.

The data splits (blocks) happen when you store the data into HDFS using configuration defined for data replication and block size. So if the original file is let say 128MB and block size is 64MB then file will be split into two blocks. These blocks will be store on two different machines.

A typical block size used by HDFS is 64 MB. Thus, an HDFS file is chopped up into 64 MB chunks, and if possible, each chunk will reside on a different DataNode.

Now when run the MapReduce job for a particular file then two Mapper tasks will be launched on these two machines.

So the data split and launching of mappers are completely two independent things. The first is handled by HDFS framework and second is by MapReduce framework.

the inputs for the Map tasks are prepared before the Mapper phase starts in Hadoop. The number of mappers is decided by the number of Input Splits calculated for the given input file before the Mapper phase starts.

Here the Input Split is the logic blocks of the given input file, where by default for every block of the file , one Input Split will be prepared and for every input split one mapper task will be dispatched.

You can control the number of InputSplits by controlling the mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize properties.

The number of nodes available to execute the calculated number of map tasks is depends on the capacity of your cluster.

For example , say your input file is about 100GB(102400 MB) in size and block size 100MB, and Input split size is block size (by default), then 1024 Map tasks will be calculated. In this case assume that you cluster’s maximum containers available to execute map/reduce tasks across the cluster is 500, then at the best case only 500 mappers will be executed in Parallel. The machines whichever executes the Map task container sooner will pick the next Map task from the queue and continue so on until all mappers were completed.

Does map & reduce tasks run on Same Thread?

map() is called sequential and not in parallel.

On a high-level, you absolutely cannot expect that these run in the same thread. They actually often run on separate machines, which is what makes MapReduce attractive (ability to run the job on lots of hardware in parallel).

Even if you have a single-machine hadoop cluster or if your map & reduce tasks happen to run on the same node, you still won’t share threads, because the task node daemon will generally speaking create a new Java VM for each new task (unless JVM reuse has been configured).

So in general you have to expect that your map and reduce functions are running in isolation from each other, with the any data exchange only occurring through input and output values.

The second piece of the puzzle is thread safety between different invocations within a single task. There is always a single Mapper or Reducer instance in existence for each task, so there is no complexity there to think about. Within a single instance, execution is controlled by the run() method that is part of the Mapper/Reducer API.

  • By default, map() calls are made in sequence on a single thread.
  • Your implementation of Mapper is free to introduce multithreading to enable fancy execution orders.
  • You are free to introduce shared state on a single instance of Mapper if it helps you process the batch of maps that run within a single task.

How is the run() method of mapper or reducer class called by the Hadoop framework?

The run() method will be called using the Java Run Time Polymorphism (i.e method overriding). As you can see the line# 569 on the link below, extended mapper/reducer will get instantiated using the Java Reflection APIs. The MapTask class gets the name of extended mapper/reducer from the Job configuration object which the client program would have been configured extended mapper/reducer class using job.setMapperClass()

The following is the code taken from the Hadoop Source MapTask.java

mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                  input, output, committer,
                                                  reporter, split);

   input.initialize(split, mapperContext);
   mapper.run(mapperContext);
   input.close();` 

The line# 621 is an example of run time polymorphism. On this line, the MapTask calls the run() method of configured mapper with ‘Mapper Context’ as parameter. If the run() is not extended, it calls the run() method on the org.apache.hadoop.mapreduce.Mapper which again calls the map() method on configured mapper.

On the line# 616 of the above link, MapTask creates the context object with all the details of job configuration, etc. as mentioned by @harpun and then passes onto the run() method on line # 621.

The above explanation holds good for reduce task as well with appropriate ReduceTask class being the main entry class.

When configuring a Hadoop Clusterhow to set the number of mappers/reducers for the cluster?

It depends on how many cores and how much memory do you have. The number of mapper + number of reducer should not exceed the number of cores in general. Keep in mind that the machine is also running Task Tracker and Data Node daemons. One of the general suggestion is more mappers than reducers.

How to Chain multiple MapReduce jobs in Hadoop?
Answer

Use of setup() and cleanup() methods
As already mentioned, setup() and cleanup() are methods you can override, if you choose, and they are there for you to initialize and clean up your map/reduce tasks. You actually don’t have access to any data from the input split directly during these phases. The lifecycle of a map/reduce task is (from a programmer’s point of view):

setup -> map -> cleanup

setup -> reduce -> cleanup

What typically happens during setup() is that you may read parameters from the configuration object to customize your processing logic.

What typically happens during cleanup() is that you clean up any resources you may have allocated. There are other uses too, which is to flush out any accumulation of aggregate results.

The setup() and cleanup() methods are simply “hooks” for you, the developer/programmer, to have a chance to do something before and after your map/reduce tasks.

For example, in the canonical word count example, let’s say you want to exclude certain words from being counted (e.g. stop words such as “the”, “a”, “be”, etc…). When you configure your MapReduce Job, you can pass a list (comma-delimited) of these words as a parameter (key-value pair) into the configuration object. Then in your map code, during setup(), you can acquire the stop words and store them in some global variable (global variable to the map task) and exclude counting these words during your map logic

public class WordCount {

 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private Set<String> stopWords;

    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();

        stopWords = new HashSet<String>();
        for(String word : conf.get("stop.words").split(",")) {
            stopWords.add(word);
        }
    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            String token = tokenizer.nextToken();
            if(stopWords.contains(token)) {
                continue;
            }
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 

 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("stop.words", "the, a, an, be, but, can");

    Job job = new Job(conf, "wordcount");

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
 }
}

Different modes of hadoop
Standalone Mode
Default mode of Hadoop,HDFS is not utilized in this mode.Local file system is used for input and output, Used for debugging purpose, No Custom Configuration is required in 3 hadoop(mapred-site.xml,core-site.xml, hdfs-site.xml) files.Standalone mode is much faster than Pseudo-distributed mode.

Pseudo Distributed Mode(Single Node Cluster)
Configuration is required in given 3 files for this mode.Replication factory is one for HDFS.
Here one node will be used as Master Node / Data Node / Job Tracker / Task Tracker
Used for Real Code to test in HDFS.Pseudo distributed cluster is a cluster where all daemons are running on one node itself.

Fully distributed mode (or multiple node cluster)
This is a Production Phase.Data are used and distributed across many nodes.Different Nodes will be used as Master Node / Data Node / Job Tracker / Task Tracker

How many datanodes can run on a single Hadoop cluster?
Hadoop slave nodes contain only one datanode process each.

How many job tracker processes can run on a single Hadoop cluster?
Like datanodes, there can only be one job tracker process running on a single Hadoop cluster. Job tracker processes run on their own Java virtual machine process. If job tracker goes down, all currently active jobs stop.

What sorts of actions does the job tracker process perform?

Client applications send the job tracker jobs.
Job tracker determines the location of data by communicating with Namenode.
Job tracker finds nodes in task tracker that has open slots for the data.
Job tracker submits the job to task tracker nodes.
Job tracker monitors the task tracker nodes for signs of activity. If there is not enough activity, job tracker transfers the job to a different task tracker node.
Job tracker receives a notification from task tracker if the job has failed. From there, job tracker might submit the job elsewhere, as described above. If it doesn’t do this, it might blacklist either the job or the task tracker.

How does job tracker schedule a job for the task tracker?

When a client application submits a job to the job tracker, job tracker searches for an empty node to schedule the task on the server that contains the assigned datanode.

What does the mapred.job.tracker command do?

The mapred.job.tracker command will provide a list of nodes that are currently acting as a job tracker process.

What is “jps”?
jps – Java Virtual Machine Process Status
jps is similar to ps command.ps command on linux is one of the most basic commands for viewing the processes running on the system.jps is standard command-line utility which comes with JDK.jps is useful tools for viewing information about running java processes.It’s a little annoying to see jps itself is included in the output.