Introduction To Apache Hadoop – HDFS & MapReduce

Let’s get something out of the way quickly: Hadoop is NOT a database. It is NOT a library. In reality, there is NO single product called Hadoop. Hadoop is made up of stand-alone modules such as a distributed file system called HDFS, a distributed database named HBASE, a library for parallel processing of large distributed datasets called MapReduce, and the list goes on. An analogy would be Microsoft Office. There is no application called Microsoft Office. It’s the name given to a suite of desktop applications like Word, Excel, etc.

In this post we will focus on the Hadoop Distributed File System (HDFS) and MapReduce. These two are core Hadoop modules and are widely used.

Together, HDFS and MapReduce form a framework for distributed batch processing. HDFS is used for storage, while MapReduce(MR) is used for analysis.

Almost anything you can accomplish with HDFS + MR could be done with built-in Linux utilities like grep, awk, bash, etc. Hadoop excels in large scale, distributed processing of data, where data to be processed is distributed on hundreds of nodes. With the advent of Cloud Computing, this is quickly becoming the norm. Distributed servers running on multiple nodes producing decentralized logs make it difficult to analyze data in one central place. Consider Google - it runs on thousands of web servers in multiple data centers around the world. Each web server generates a log file which is stored on its local disk, ending up with thousands of log files stored on as many servers. Analytics program should be able to view these dispersed logs as a single logical unit. For example, the following hypothetical queries require processing every single log file to find the total for that server and then add up results from all servers to get the final aggregate sum:

  • Number of unique users between 12:00-1:00am.
  • Number of users in a day from Chile.

Hadoop’s true power lies in its ability to scale to hundreds or thousands of nodes to distribute large amounts of work across a set of machines to be performed in parallel.


Hadoop modules are built upon a Distributed File System appropriately named Hadoop Distributed File System (HDFS). The most famous `distributed` file system in existence today is the Network File System or NFS. HDFS is different from NFS on many levels, especially with regards to scalability.

Note: The design of HDFS is based on Google File System (GFS) described in this paper.

HDFS is based on a master/slave architecture and the design requires that a single master node keeps track of all the files in the file system. This is called Name Node. A Name Node stores only the meta-data about the files present on the file system: it doesn’t store the actual data. The data is stored on Data Nodes. Files are stored on the HDFS in blocks which are typically 64 MB in size.

Name Node versus Data Node: A Name Node manages the HDFS’s namespace and regulates access to files and directories. It distributes data blocks to Data Nodes and stores this mapping information. Data Nodes are responsible for storing data and serve read/write requests from clients directly.

Let us consider an example, suppose we are storing a 131 MB file on the HDFS. The file will be stored in three blocks on the HDFS  (64 + 64 + 3). Name Node will distribute the three blocks to Data Nodes and keep track of the mapping. To read a file stored on the HDFS, the client must have HDFS installed. The client HDFS will obtain the file information from Name Node such as the number of file blocks and their location and then download these blocks directly from Data Nodes.

Fore more information on the HDFS, I recommend the following link:

Map Reduce

Map Reduce (MR) is a framework or a library for writing applications to process large amounts of distributed data in parallel. Like HDFS, it’s architecture is also based on master/slave model. The master is a special node which coordinates activity between several worker nodes.

Here’s how it works: The master receives the input data which is to be processed. The input data is split into smaller chunks and all these chunks are distributed to and processed in parallel on multiple worker nodes. This is called the Map Phase. The workers send their results back to the master node which aggregates these results to produce the sum total. This is called the Reduce phase.

Note: I’ve oversimplified the inner workings of MapReduce. The Map phase output is written to the local disk of workers which is partitioned in as many regions as there are Reduce workers available. This locations is then passed to the master which passes it onto Reduce workers. I recommend this paper on MapReduce by Google, which actually introduced it.

MR applications at least must provide the following three input parameters:

  1. Location of the input data (e.g. a directory consisting of a single (rare) or multiple input files).
  2. Programming implementations of Map, Reduce functions and their configuration (e.g. a Java JAR file which is distributed to workers)
  3. Location of the output data (e.g. `/tmp/hadoop-output/`)

You must remember this always: All input and output in MR is based on <key, value> pairs. It is everywhere, the input to the Map function, its output, the input to the Reduce function and its output are all in <key, value> pairs.

Map Reduce Example in Java

To wrap our minds around MapReduce, let us consider an example. Suppose you have just become the Development Lead for a company which specializes in reading seismic data which measure earthquake magnitudes around the world. There are thousands of such sensors deployed around the world recording earthquake data in log files, the following format:

nc,71920701,1,”Saturday, January 12, 2013 19:43:18 UTC”,38.7865,-122.7630,1.5,1.10,27,“Northern California”

Each entry consists of lot of details. The items in red are the magnitude of the earthquake and the name of region where the reading was taken, respectively.

There are millions of such log files available. In addition, logs also contain erroneous entries such as when the sensor became faulty and went in an infinite loop dumping thousands of lines a second. The input data is stored on 50 machines and all the log files combined are about 10 Terabytes in size.  Your Director of Software asks you to perform a simple task: for every region where sensors were deployed, find out the highest magnitude of the earthquake recorded.

Now, let’s think about this for a moment. This tasks sounds rather simple. You could use your trusted linux tools like `grep`, `sort`, or even `awk` to accomplish this if the logfiles were available on a single computer. But they are not – they are scattered across 50 computers. Processing data on each computer manually and combing results will be too inefficient (for a Lead Developer, that is).

This is the kind of problem where you can use Hadoop. Let us see how you would do it:

  1. First you will deploy HDFS on the 50 machines where the input data is stored so that all data could be seen by all machines. Let us say you put all logfiles of data in a folder on HDFS called input/.
  2. Next, you will write a Java application providing implementations of Map & Reduce functions.
         * The `Mapper` function. It receives a line of input from the file,
         * extracts `region name` and `earthquake magnitude` from it, and outputs
         * the `region name` and `magnitude` in <key, value> manner.
         * @param key - The line offset in the file - ignored.
         * @param value - This is the line itself.
         * @param context - Provides access to the OutputCollector and Reporter.
         * @throws IOException
         * @throws InterruptedException
        public void map(LongWritable key, Text value, Context context) throws
                IOException, InterruptedException {
            String[] line = value.toString().split(",", 12);
            // Ignore invalid lines
            if (line.length != 12) {
                System.out.println("- " + line.length);
            // The output `key` is the name of the region
            String outputKey = line[11];
            // The output `value` is the magnitude of the earthquake
            double outputValue = Double.parseDouble(line[8]);
            // Record the output in the Context object
            context.write(new Text(outputKey), new DoubleWritable(outputValue));
         * The `Reducer` function. Iterates through all earthquake magnitudes for a
         * region to find the maximum value. The output is the `region name` and the
         * `maximum value of the magnitude`.
         * @param key - The name of the region
         * @param values - Iterator over earthquake magnitudes in the region
         * @param context - Used for collecting output
         * @throws IOException
         * @throws InterruptedException
        public void reduce(Text key, Iterable values,
                Context context) throws IOException, InterruptedException {
            // Standard algorithm for finding the max value
            double maxMagnitude = Double.MIN_VALUE;
            for (DoubleWritable value : values) {
                maxMagnitude = Math.max(maxMagnitude, value.get());
            context.write(key, new DoubleWritable(maxMagnitude));
  3. Next you will configure MapReduce to be run processing on all 50 computers. This achieves data locality – the logfiles are processed on the same computer where they are located and only the results are sent back to be reduced, saving bandwidth.
  4. Run `hadoop` passing it the location of the input folder on the HDFS (input/), MapReduce program, and the location where the output is to be produced (output/). And that’s all you need to do.

Example Project on GitHub

You can find the complete source code for the above example on GitHub repository I have created.

About these ads

I'm Umer Mansoor , the author of this blog.

Tagged with: , , ,
Posted in Hadoop
5 comments on “Introduction To Apache Hadoop – HDFS & MapReduce
  1. Michael Bartoschek says:

    Great explanation. I cannot thank you enough for this!

  2. egyptianMan says:

    “Like HDFS, it’s architecture is also based on master/slave model”

    um.. isn’t master/slave model supposed to be bad? in cloud computing, node failures aree common.

    so how can this be good. or am i missing something?

    • Actually its not exactly master slave model. You can configure the replication count that you want. Suppose if there are 10 data files and if the replication is 3 then you will be having 30 files.
      A separate node called the name node will the holding what each data nodes are holding. So if some node is down then the same data could be got from 2 more nodes. Each node is a master.
      The biggest advantage is we can get data from any 3 nodes there by making load balancing possible and If a data node had been crashed we can boot up a new data node with a best time complexity of n, where n the number of data nodes where the replications are present

  3. […] Guide, 3rd Edition by Tom White – An excellent resource for anyone wanting to learn Apache Hadoop. Hard to read cover to cover, the first few chapters are an excellent introduction to Hadoop. […]

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


Get every new post delivered to your Inbox.

%d bloggers like this: