Does the MapReduce framework (for example Hadoop implementation), assign the inputs for mappers before mapper job starts or it is done at runtime?
In MapReduce framework, the Mapper tasks are assigned to the machines based on the Data Locality Concept. This means, data nodes which are storing the block of the data, will be assigned to execute the mapper task for that block of data.

The data splits (blocks) happen when you store the data into HDFS using configuration defined for data replication and block size. So if the original file is let say 128MB and block size is 64MB then file will be split into two blocks. These blocks will be store on two different machines.

A typical block size used by HDFS is 64 MB. Thus, an HDFS file is chopped up into 64 MB chunks, and if possible, each chunk will reside on a different DataNode.

Now when run the MapReduce job for a particular file then two Mapper tasks will be launched on these two machines.

So the data split and launching of mappers are completely two independent things. The first is handled by HDFS framework and second is by MapReduce framework.

the inputs for the Map tasks are prepared before the Mapper phase starts in Hadoop. The number of mappers is decided by the number of Input Splits calculated for the given input file before the Mapper phase starts.

Here the Input Split is the logic blocks of the given input file, where by default for every block of the file , one Input Split will be prepared and for every input split one mapper task will be dispatched.

You can control the number of InputSplits by controlling the mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize properties.

The number of nodes available to execute the calculated number of map tasks is depends on the capacity of your cluster.

For example , say your input file is about 100GB(102400 MB) in size and block size 100MB, and Input split size is block size (by default), then 1024 Map tasks will be calculated. In this case assume that you cluster’s maximum containers available to execute map/reduce tasks across the cluster is 500, then at the best case only 500 mappers will be executed in Parallel. The machines whichever executes the Map task container sooner will pick the next Map task from the queue and continue so on until all mappers were completed.

Does map & reduce tasks run on Same Thread?

map() is called sequential and not in parallel.

On a high-level, you absolutely cannot expect that these run in the same thread. They actually often run on separate machines, which is what makes MapReduce attractive (ability to run the job on lots of hardware in parallel).

Even if you have a single-machine hadoop cluster or if your map & reduce tasks happen to run on the same node, you still won’t share threads, because the task node daemon will generally speaking create a new Java VM for each new task (unless JVM reuse has been configured).

So in general you have to expect that your map and reduce functions are running in isolation from each other, with the any data exchange only occurring through input and output values.

The second piece of the puzzle is thread safety between different invocations within a single task. There is always a single Mapper or Reducer instance in existence for each task, so there is no complexity there to think about. Within a single instance, execution is controlled by the run() method that is part of the Mapper/Reducer API.

  • By default, map() calls are made in sequence on a single thread.
  • Your implementation of Mapper is free to introduce multithreading to enable fancy execution orders.
  • You are free to introduce shared state on a single instance of Mapper if it helps you process the batch of maps that run within a single task.

How is the run() method of mapper or reducer class called by the Hadoop framework?

The run() method will be called using the Java Run Time Polymorphism (i.e method overriding). As you can see the line# 569 on the link below, extended mapper/reducer will get instantiated using the Java Reflection APIs. The MapTask class gets the name of extended mapper/reducer from the Job configuration object which the client program would have been configured extended mapper/reducer class using job.setMapperClass()

The following is the code taken from the Hadoop Source MapTask.java

mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                  input, output, committer,
                                                  reporter, split);

   input.initialize(split, mapperContext);
   mapper.run(mapperContext);
   input.close();` 

The line# 621 is an example of run time polymorphism. On this line, the MapTask calls the run() method of configured mapper with ‘Mapper Context’ as parameter. If the run() is not extended, it calls the run() method on the org.apache.hadoop.mapreduce.Mapper which again calls the map() method on configured mapper.

On the line# 616 of the above link, MapTask creates the context object with all the details of job configuration, etc. as mentioned by @harpun and then passes onto the run() method on line # 621.

The above explanation holds good for reduce task as well with appropriate ReduceTask class being the main entry class.

When configuring a Hadoop Clusterhow to set the number of mappers/reducers for the cluster?

It depends on how many cores and how much memory do you have. The number of mapper + number of reducer should not exceed the number of cores in general. Keep in mind that the machine is also running Task Tracker and Data Node daemons. One of the general suggestion is more mappers than reducers.

How to Chain multiple MapReduce jobs in Hadoop?
Answer

Use of setup() and cleanup() methods
As already mentioned, setup() and cleanup() are methods you can override, if you choose, and they are there for you to initialize and clean up your map/reduce tasks. You actually don’t have access to any data from the input split directly during these phases. The lifecycle of a map/reduce task is (from a programmer’s point of view):

setup -> map -> cleanup

setup -> reduce -> cleanup

What typically happens during setup() is that you may read parameters from the configuration object to customize your processing logic.

What typically happens during cleanup() is that you clean up any resources you may have allocated. There are other uses too, which is to flush out any accumulation of aggregate results.

The setup() and cleanup() methods are simply “hooks” for you, the developer/programmer, to have a chance to do something before and after your map/reduce tasks.

For example, in the canonical word count example, let’s say you want to exclude certain words from being counted (e.g. stop words such as “the”, “a”, “be”, etc…). When you configure your MapReduce Job, you can pass a list (comma-delimited) of these words as a parameter (key-value pair) into the configuration object. Then in your map code, during setup(), you can acquire the stop words and store them in some global variable (global variable to the map task) and exclude counting these words during your map logic

public class WordCount {

 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private Set<String> stopWords;

    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();

        stopWords = new HashSet<String>();
        for(String word : conf.get("stop.words").split(",")) {
            stopWords.add(word);
        }
    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            String token = tokenizer.nextToken();
            if(stopWords.contains(token)) {
                continue;
            }
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 

 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("stop.words", "the, a, an, be, but, can");

    Job job = new Job(conf, "wordcount");

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
 }
}

Different modes of hadoop
Standalone Mode
Default mode of Hadoop,HDFS is not utilized in this mode.Local file system is used for input and output, Used for debugging purpose, No Custom Configuration is required in 3 hadoop(mapred-site.xml,core-site.xml, hdfs-site.xml) files.Standalone mode is much faster than Pseudo-distributed mode.

Pseudo Distributed Mode(Single Node Cluster)
Configuration is required in given 3 files for this mode.Replication factory is one for HDFS.
Here one node will be used as Master Node / Data Node / Job Tracker / Task Tracker
Used for Real Code to test in HDFS.Pseudo distributed cluster is a cluster where all daemons are running on one node itself.

Fully distributed mode (or multiple node cluster)
This is a Production Phase.Data are used and distributed across many nodes.Different Nodes will be used as Master Node / Data Node / Job Tracker / Task Tracker

How many datanodes can run on a single Hadoop cluster?
Hadoop slave nodes contain only one datanode process each.

How many job tracker processes can run on a single Hadoop cluster?
Like datanodes, there can only be one job tracker process running on a single Hadoop cluster. Job tracker processes run on their own Java virtual machine process. If job tracker goes down, all currently active jobs stop.

What sorts of actions does the job tracker process perform?

Client applications send the job tracker jobs.
Job tracker determines the location of data by communicating with Namenode.
Job tracker finds nodes in task tracker that has open slots for the data.
Job tracker submits the job to task tracker nodes.
Job tracker monitors the task tracker nodes for signs of activity. If there is not enough activity, job tracker transfers the job to a different task tracker node.
Job tracker receives a notification from task tracker if the job has failed. From there, job tracker might submit the job elsewhere, as described above. If it doesn’t do this, it might blacklist either the job or the task tracker.

How does job tracker schedule a job for the task tracker?

When a client application submits a job to the job tracker, job tracker searches for an empty node to schedule the task on the server that contains the assigned datanode.

What does the mapred.job.tracker command do?

The mapred.job.tracker command will provide a list of nodes that are currently acting as a job tracker process.

What is “jps”?
jps – Java Virtual Machine Process Status
jps is similar to ps command.ps command on linux is one of the most basic commands for viewing the processes running on the system.jps is standard command-line utility which comes with JDK.jps is useful tools for viewing information about running java processes.It’s a little annoying to see jps itself is included in the output.

Comments are closed.