Finding Friends via map reduce

Link

Difference between combiner and Reducer

  • One constraint that a Combiner will have, unlike a Reducer, is that the input/output key and value types must match the output types of your Mapper.
  • Combiners can only be used on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c} . This also means that combiners may operate only on a subset of your keys and values or may not execute at all, still you want the output of the program to remain same.
  • Reducers can get data from multiple Mappers as part of the partitioning process. Combiners can only get its input from one Mapper.

difference between Hdfs block and input split

  • Block is the physical part of disk which has minimum amount of data that can be read or write. The actual size of block is decided during the design phase.For example, the block size of HDFS can be 128MB/256MB though the default HDFS block size is 64 MB.
  • HDFS block are physically entity while Input split is logical partition.
  • What is logical partition –> Logical partition means it will has just the information about blocks address or location. In the case where last record (value) in the block is incomplete,the input split includes location information for the next block and byte offset of the data needed to complete the record.

Counters Example Link

User Defined Counters Example

What is GenericOptionsParser
GenericOptionsParser is a utility to parse command line arguments generic to the Hadoop framework. GenericOptionsParser recognizes several standard command line arguments, enabling applications to easily specify a namenode, a ResourceManager, additional configuration resources etc.

The usage of GenericOptionsParser enables to specify Generic option in the command line itself

Eg: With Genericoption you can specify the following

>>hadoop jar /home/hduser/WordCount/wordcount.jar WordCount -Dmapred.reduce.tasks=20 input output

GenericOptionsParser vs ToolRunner

There’s no extra privileges, but your Command line options get run via the GenericOptionsParser, which will allow you extract certain configuration properties and configure a Configuration object from it

By using ToolRunner.run(), any hadoop application can handle standard command line options supported by hadoop. ToolRunner uses GenericOptionsParser internally. In short, the hadoop specific options which are provided command line are parsed and set into the Configuration object of the application.

eg. If you say:

>>hadoop MyHadoopApp -D mapred.reduce.tasks=3

Then ToolRunner.run(new MyHadoopApp(), args) will automatically set the value parameter mapred.reduce.tasks to 3 in the Configuration object.

Basically rather that parsing some options yourself (using the index of the argument in the list), you can explicitly configure Configuration properties from the command line:

hadoop jar myJar.jar com.Main prop1value prop2value
public static void main(String args[]) {
    Configuration conf = new Configuration();
    conf.set("prop1", args[0]);
    conf.set("prop2", args[1]);

    conf.get("prop1"); // will resolve to "prop1Value"
    conf.get("prop2"); // will resolve to "prop2Value"
}

Becomes much more condensed with ToolRunner:

hadoop jar myJar.jar com.Main -Dprop1=prop1value -Dprop2=prop2value

public int run(String args[]) {
    Configuration conf = getConf();

    conf.get("prop1"); // will resolve to "prop1Value"
    conf.get("prop2"); // will resolve to "prop2Value"
}

GenericOptionsParser, Tool, and ToolRunner for running Hadoop Job

Hadoop comes with a few helper classes for making it easier to run jobs from the command line. GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your application to use as desired. You don’t usually use GenericOptionsParser directly, as it’s more convenient to implement the Tool interface and run your application with the ToolRunner, which uses GenericOptionsParser internally:

public interface Tool extends Configurable {
        int run(String [] args) throws Exception;
    }

Below example shows a very simple implementation of Tool, for running the Hadoop Map Reduce Job.

public class WordCountConfigured extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        return 0;
        }        
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WordCountConfigured(), args);
        System.exit(exitCode);
    }

We make WordCountConfigured a subclass of Configured, which is an implementation of the Configurable interface. All implementations of Tool need to implement Configurable (since Tool extends it), and subclassing Configured is often the easiest way to achieve this. The run() method obtains the Configuration using Configurable’s getConf() method, and then iterates over it, printing each property to standard output.

WordCountConfigured’s main() method does not invoke its own run() method directly. Instead, we call ToolRunner’s static run() method, which takes care of creating a Configuration object for the Tool, before calling its run() method. ToolRunner also uses a GenericOptionsParser to pick up any standard options specified on the command line, and set them on the Configuration instance. We can see the effect of picking up the properties specified in conf/hadoop-localhost.xml by running the following command:

    >>hadoop WordCountConfigured -conf conf/hadoop-localhost.xml -D mapred.job.tracker=localhost:10011 -D mapred.reduce.tasks=n

Options specified with -D take priority over properties from the configuration files. This is very useful: you can put defaults into configuration files, and then override them with the -D option as needed. A common example of this is setting the number of reducers for a MapReduce job via -D mapred.reduce.tasks=n. This will override the number of reducers set on the cluster, or if set in any client-side configuration files. The other options that GenericOptionsParser and ToolRunner support are listed in Table.

Does the MapReduce framework (for example Hadoop implementation), assign the inputs for mappers before mapper job starts or it is done at runtime?
In MapReduce framework, the Mapper tasks are assigned to the machines based on the Data Locality Concept. This means, data nodes which are storing the block of the data, will be assigned to execute the mapper task for that block of data.

The data splits (blocks) happen when you store the data into HDFS using configuration defined for data replication and block size. So if the original file is let say 128MB and block size is 64MB then file will be split into two blocks. These blocks will be store on two different machines.

A typical block size used by HDFS is 64 MB. Thus, an HDFS file is chopped up into 64 MB chunks, and if possible, each chunk will reside on a different DataNode.

Now when run the MapReduce job for a particular file then two Mapper tasks will be launched on these two machines.

So the data split and launching of mappers are completely two independent things. The first is handled by HDFS framework and second is by MapReduce framework.

the inputs for the Map tasks are prepared before the Mapper phase starts in Hadoop. The number of mappers is decided by the number of Input Splits calculated for the given input file before the Mapper phase starts.

Here the Input Split is the logic blocks of the given input file, where by default for every block of the file , one Input Split will be prepared and for every input split one mapper task will be dispatched.

You can control the number of InputSplits by controlling the mapreduce.input.fileinputformat.split.maxsize and mapreduce.input.fileinputformat.split.minsize properties.

The number of nodes available to execute the calculated number of map tasks is depends on the capacity of your cluster.

For example , say your input file is about 100GB(102400 MB) in size and block size 100MB, and Input split size is block size (by default), then 1024 Map tasks will be calculated. In this case assume that you cluster’s maximum containers available to execute map/reduce tasks across the cluster is 500, then at the best case only 500 mappers will be executed in Parallel. The machines whichever executes the Map task container sooner will pick the next Map task from the queue and continue so on until all mappers were completed.

Does map & reduce tasks run on Same Thread?

map() is called sequential and not in parallel.

On a high-level, you absolutely cannot expect that these run in the same thread. They actually often run on separate machines, which is what makes MapReduce attractive (ability to run the job on lots of hardware in parallel).

Even if you have a single-machine hadoop cluster or if your map & reduce tasks happen to run on the same node, you still won’t share threads, because the task node daemon will generally speaking create a new Java VM for each new task (unless JVM reuse has been configured).

So in general you have to expect that your map and reduce functions are running in isolation from each other, with the any data exchange only occurring through input and output values.

The second piece of the puzzle is thread safety between different invocations within a single task. There is always a single Mapper or Reducer instance in existence for each task, so there is no complexity there to think about. Within a single instance, execution is controlled by the run() method that is part of the Mapper/Reducer API.

  • By default, map() calls are made in sequence on a single thread.
  • Your implementation of Mapper is free to introduce multithreading to enable fancy execution orders.
  • You are free to introduce shared state on a single instance of Mapper if it helps you process the batch of maps that run within a single task.

How is the run() method of mapper or reducer class called by the Hadoop framework?

The run() method will be called using the Java Run Time Polymorphism (i.e method overriding). As you can see the line# 569 on the link below, extended mapper/reducer will get instantiated using the Java Reflection APIs. The MapTask class gets the name of extended mapper/reducer from the Job configuration object which the client program would have been configured extended mapper/reducer class using job.setMapperClass()

The following is the code taken from the Hadoop Source MapTask.java

mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                  input, output, committer,
                                                  reporter, split);

   input.initialize(split, mapperContext);
   mapper.run(mapperContext);
   input.close();` 

The line# 621 is an example of run time polymorphism. On this line, the MapTask calls the run() method of configured mapper with ‘Mapper Context’ as parameter. If the run() is not extended, it calls the run() method on the org.apache.hadoop.mapreduce.Mapper which again calls the map() method on configured mapper.

On the line# 616 of the above link, MapTask creates the context object with all the details of job configuration, etc. as mentioned by @harpun and then passes onto the run() method on line # 621.

The above explanation holds good for reduce task as well with appropriate ReduceTask class being the main entry class.

When configuring a Hadoop Clusterhow to set the number of mappers/reducers for the cluster?

It depends on how many cores and how much memory do you have. The number of mapper + number of reducer should not exceed the number of cores in general. Keep in mind that the machine is also running Task Tracker and Data Node daemons. One of the general suggestion is more mappers than reducers.

How to Chain multiple MapReduce jobs in Hadoop?
Answer

Use of setup() and cleanup() methods
As already mentioned, setup() and cleanup() are methods you can override, if you choose, and they are there for you to initialize and clean up your map/reduce tasks. You actually don’t have access to any data from the input split directly during these phases. The lifecycle of a map/reduce task is (from a programmer’s point of view):

setup -> map -> cleanup

setup -> reduce -> cleanup

What typically happens during setup() is that you may read parameters from the configuration object to customize your processing logic.

What typically happens during cleanup() is that you clean up any resources you may have allocated. There are other uses too, which is to flush out any accumulation of aggregate results.

The setup() and cleanup() methods are simply “hooks” for you, the developer/programmer, to have a chance to do something before and after your map/reduce tasks.

For example, in the canonical word count example, let’s say you want to exclude certain words from being counted (e.g. stop words such as “the”, “a”, “be”, etc…). When you configure your MapReduce Job, you can pass a list (comma-delimited) of these words as a parameter (key-value pair) into the configuration object. Then in your map code, during setup(), you can acquire the stop words and store them in some global variable (global variable to the map task) and exclude counting these words during your map logic

public class WordCount {

 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private Set<String> stopWords;

    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();

        stopWords = new HashSet<String>();
        for(String word : conf.get("stop.words").split(",")) {
            stopWords.add(word);
        }
    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            String token = tokenizer.nextToken();
            if(stopWords.contains(token)) {
                continue;
            }
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
 } 

 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("stop.words", "the, a, an, be, but, can");

    Job job = new Job(conf, "wordcount");

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
 }
}

Different modes of hadoop
Standalone Mode
Default mode of Hadoop,HDFS is not utilized in this mode.Local file system is used for input and output, Used for debugging purpose, No Custom Configuration is required in 3 hadoop(mapred-site.xml,core-site.xml, hdfs-site.xml) files.Standalone mode is much faster than Pseudo-distributed mode.

Pseudo Distributed Mode(Single Node Cluster)
Configuration is required in given 3 files for this mode.Replication factory is one for HDFS.
Here one node will be used as Master Node / Data Node / Job Tracker / Task Tracker
Used for Real Code to test in HDFS.Pseudo distributed cluster is a cluster where all daemons are running on one node itself.

Fully distributed mode (or multiple node cluster)
This is a Production Phase.Data are used and distributed across many nodes.Different Nodes will be used as Master Node / Data Node / Job Tracker / Task Tracker

How many datanodes can run on a single Hadoop cluster?
Hadoop slave nodes contain only one datanode process each.

How many job tracker processes can run on a single Hadoop cluster?
Like datanodes, there can only be one job tracker process running on a single Hadoop cluster. Job tracker processes run on their own Java virtual machine process. If job tracker goes down, all currently active jobs stop.

What sorts of actions does the job tracker process perform?

Client applications send the job tracker jobs.
Job tracker determines the location of data by communicating with Namenode.
Job tracker finds nodes in task tracker that has open slots for the data.
Job tracker submits the job to task tracker nodes.
Job tracker monitors the task tracker nodes for signs of activity. If there is not enough activity, job tracker transfers the job to a different task tracker node.
Job tracker receives a notification from task tracker if the job has failed. From there, job tracker might submit the job elsewhere, as described above. If it doesn’t do this, it might blacklist either the job or the task tracker.

How does job tracker schedule a job for the task tracker?

When a client application submits a job to the job tracker, job tracker searches for an empty node to schedule the task on the server that contains the assigned datanode.

What does the mapred.job.tracker command do?

The mapred.job.tracker command will provide a list of nodes that are currently acting as a job tracker process.

What is “jps”?
jps – Java Virtual Machine Process Status
jps is similar to ps command.ps command on linux is one of the most basic commands for viewing the processes running on the system.jps is standard command-line utility which comes with JDK.jps is useful tools for viewing information about running java processes.It’s a little annoying to see jps itself is included in the output.

public class MapReduce
{
public static void main(String[] args)
{
File f = new File(“/usr/lib/hadoop/etc/hadoop/core-site.xml”);
Configuration conf = new Configuration();
conf.addResource(new Path(“/usr/lib/hadoop/etc/hadoop/hdfs-site.xml”));

for (Entry entry : conf)
{
System.out.println(“Key ” + entry.getKey());
System.out.println(“Value ” + entry.getValue());
}
}
}

CreationScript.java

	sessionFactory = createSessionFactory();
	Session objSession = sessionFactory.openSession();
	objSession.beginTransaction();

	Criteria crt = objSession.createCriteria(Users.class);
	crt.add(Restrictions.eq("UserName", "UserName 9"));


	List<Users> arrUsers = (List<Users>)crt.list();

	for (Users users : arrUsers) {
		System.out.println(users.getUserName());
	}

AND Restrictions

Criteria crt = objSession.createCriteria(Users.class);
	crt.add(Restrictions.eq("UserName", "UserName 9")).
		add(Restrictions.gt("UserId", 5));
Criteria crt = objSession.createCriteria(Users.class);
	crt.add(Restrictions.eq("UserName", "UserName 9")).
		add(Restrictions.gt("UserId", 5));
Criteria crt = objSession.createCriteria(Users.class);
		crt.add(Restrictions.eq("UserName", "UserName 9")).
			add(Restrictions.between("UserId", 5, 10));
Criteria crt = objSession.createCriteria(Users.class);
		crt.add(Restrictions.eq("UserName", "UserName 9")).
			add(Restrictions.between("UserId", 5, 10));

OR Restrictions

Criteria crt = objSession.createCriteria(Users.class);
		crt.add(Restrictions.or(Restrictions.between("UserId", 0, 5), Restrictions.like("UserName", "Updated %")));

Getting list of Users from users Table

     sessionFactory = createSessionFactory();
     Session objSession = sessionFactory.openSession();
     objSession.beginTransaction();
		
     Query objQuery = objSession.createQuery("from Users");
     List<Users> arrUsers = objQuery.list();
		
     objSession.getTransaction().commit();
     objSession.close();
		
     System.out.println(arrUsers.size());

     for (Users users : arrUsers) {
	System.out.println(users.getUserName());
     }

Pagination Using HQL

    Query objQuery = objSession.createQuery("from Users");		
    objQuery.setFirstResult(5);
    objQuery.setMaxResults(2);
    List<Users> arrUsers = objQuery.list();
				
    objSession.getTransaction().commit();
    objSession.close();
		
    System.out.println(arrUsers.size());
		
    for (Users users : arrUsers) {
	System.out.println(users.getUserName());
    }

Note: In Pagination the Starting record is specified by setFirstResult and ending record is specified by setMaxResults.

Taking a Specific Column for Entity

	Query objQuery = objSession.createQuery("select UserName from Users");		
	objQuery.setFirstResult(5);
	objQuery.setMaxResults(2);
	List<String> arrUsers = (List<String>)objQuery.list();

	objSession.getTransaction().commit();
	objSession.close();

	System.out.println(arrUsers.size());

	for (String users : arrUsers) {
		System.out.println(users);
	}

Note :
The Object Name in entity should be same as specified in class including Case. username will not work in select query but UserName does.

Parameter Binding in Hibernate
Method 1

  Query objQuery = objSession.createQuery("from Users where UserId >?");
  objQuery.setParameter(0, 5);		
  List<Users> arrUsers = (List<Users>)objQuery.list();

  for (Users users : arrUsers) {
	System.out.println(users.getUserName());
  }

Method 2

     Query objQuery = objSession.createQuery("from Users where UserId > :limit");
     objQuery.setInteger("limit", 5);
		
     List<Users> arrUsers = (List<Users>)objQuery.list();
				
     objSession.getTransaction().commit();
     objSession.close();
	
     for (Users users : arrUsers) {
	System.out.println(users.getUserName());
     }

NamedQuery vs NamedNativeQuery
NamedQuery helps to consolidate all query at particular place.

users.java

@Entity
@NamedQuery(name="Users.byUserId", query="from Users where UserId=?")
public class Users {
	@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
	private int UserId;
	private String UserName;
	
	public int getUserId() {
		return UserId;
	}
	public void setUserId(int userId) {
		UserId = userId;
	}
	public String getUserName() {
		return UserName;
	}
	public void setUserName(String userName) {
		UserName = userName;
	}	
}

CreationScript.java

	sessionFactory = createSessionFactory();
	Session objSession = sessionFactory.openSession();
	objSession.beginTransaction();
		
	Query objQuery = objSession.getNamedQuery("Users.byUserId");
        objQuery.setInteger(0, 5);
		
	List<Users> arrUsers = (List<Users>)objQuery.list();
	
	for (Users users : arrUsers) {
	   System.out.println(users.getUserName());
	}

NativeQueries helps us to query the table directly by using table name instead of querying through Entity like one in NamedQuery.This is useful when we use stored procedure to take our resultSets.

users.java

@Entity
@NamedNativeQuery(name="Users.byUserId", query="SELECT * from Users where UserId=?", resultClass=Users.class)
public class Users {
	@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
	private int UserId;
	private String UserName;
	
	public int getUserId() {
		return UserId;
	}
	public void setUserId(int userId) {
		UserId = userId;
	}
	public String getUserName() {
		return UserName;
	}
	public void setUserName(String userName) {
		UserName = userName;
	}	
} 

Note: resultClass=Users.class should be specified or else object class cast exception would be thrown.

There may be times where you want to retrieve data from DB, do some changes in it and save back the data.In such a time the connection could not be kept open for the whole period of time since db connections are resource intensive.

So I will fetch the object -> close the Session -> Do some operations -> Open new session again -> Update the Object -> Close the Session.

	sessionFactory = createSessionFactory();
	Session objSession = sessionFactory.openSession();
	objSession.beginTransaction();		
	Users objUser = objSession.get(com.mugil.user.Users.class, 11);
	objSession.getTransaction().commit();
	objSession.close();

	objUser.setUserName("Updated");

	objSession = sessionFactory.openSession();
	objSession.beginTransaction();
	objSession.update(objUser);
	objSession.getTransaction().commit();
	objSession.close();

Now the way Hibernate works is it first runs the select query for the value which should be changed and updates the value.So 2 queries for the whole process.Now there are chance’s the value fetched from DB may or may not be changed.So we can do a check which checks the fetched and updated value for uniquess before running the update query.If the fetched data is not changed th update query wont be run.

The Annotation for that is as below.

 @Entity
 @org.hibernate.annotations.Entity(selectBeforeUpdate=true)

 	objSession.beginTransaction();		
	Users objUser = new Users();
	objUser.setUserName("Max");
	.
	.
	.
	----Transient State----
	.
	.		
	objSession.save(objUser);
	.
	.
	.
	----Persistent State----
	.
	.		
	objUser.setUserName("Max Muller");
	.
	.
	.
	----Persistent State----
	.
	.
	objSession.getTransaction().commit();
	objSession.close();
	.
	.
	.
	----Detached State----
	.
	.
	//Doesnot get reflected
	objUser.setUserName("Max Muller");
	.
	.
  • The object will be in Transient State until it is saved.An object is in transient state if it just has been instantiated using the new operator and there is no reference of it in the database i.e it does not represent any row in the database.
  • The object will be in Persistent State until the session is closed.An object is in the persistent state if it has some reference in the database i.e it represent some row in the database and identifier value is assigned to it. If any changes are made to the object then hibernate will detect those changes and effects will be there in the database that is why the name Persistent. These changes are made when session is closed. A persistent object is in the session scope.
  • The object will be in Detached State once the session is closed.An object that has been persistent and is no longer in the session scope. The hibernate will not detect any changes made to this object. It can be connected to the session again to make it persistent again.

The changes done after the session close will not get reflected

Image Showing Values in Table with and Without DiscriminatorColumn defined

  • By Default Hibernate follows Single Table Inheritance
  • DiscriminatorColumn tells hibernate the name in which DiscriminatorColumn should be save or else it would be saved as DType
@Inheritance(strategy=InheritanceType.SINGLE_TABLE)
@DiscriminatorColumn(name="VEHICLE_TYPE",
		     discriminatorType=DiscriminatorType.STRING)

Vehicles.java

@Entity
@Inheritance(strategy=InheritanceType.SINGLE_TABLE)
@DiscriminatorColumn(name="VEHICLE_TYPE",
		     discriminatorType=DiscriminatorType.STRING)
public class Vehicles {
	@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
	private int VehicleId;	
	private String name;
		
	public int getVehicleId() {
		return VehicleId;
	}
	public void setVehicleId(int vehicleId) {
		VehicleId = vehicleId;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}	
}

TwoWheelers.java

@Entity
@DiscriminatorValue("Bike")
public class TwoWheelers extends Vehicles{
	private String steeringHolder;

	public String getSteeringHolder() {
		return steeringHolder;
	}

	public void setSteeringHolder(String steeringHolder) {
		this.steeringHolder = steeringHolder;
	}
}

FourWheelers.java

@Entity
@DiscriminatorValue("Car")
public class FourWheelers extends Vehicles{
	
	private String steeringWheel;

	public String getSteeringWheel() {
		return steeringWheel;
	}

	public void setSteeringWheel(String steeringWheel) {
		this.steeringWheel = steeringWheel;
	}
	
}

By Using the below annotation individual tables would be created for all the subclasses instead of placing all the values getting placed in a single class.

InheritanceType.TABLE_PER_CLASS

@Inheritance(strategy=InheritanceType.TABLE_PER_CLASS)

InheritanceType.JOINED

@Inheritance(strategy=InheritanceType.JOINED)

Using InheritanceType.JOINED gets more normalized table compared to InheritanceType.TABLE_PER_CLASS