Hadoop is a framework that allows you to process large sets of unstructured or semi-structured data. The unstructured/semi-structured nature of the data and the sheer size (terabytes or petabytes) make the current RDMS offerings come short. Enter Apache Hadoop.
Note: Updated to Hadoop 2.4.1 and re-published from original Sept 28th, 2011 blog.
Excerpt from the Hadoop website: “The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using a simple programming model.”
Lets dissect this a little bit.
- framework – That is fairly self-explanatory.
- distributed processing – Data sets are distributed across multiple machines (with HDFS) and processing logic travels to where the data resides.
- large data sets – We are talking of use cases that have many 100s of MBs, GB’s or more of data.
- clusters of computers – Hadoop runs on commodity hardware.
- Simple Programming Model – MapReduce is the simple programming model that lets you perform distributed processing to your already distributed data (on HDFS).
Lets say we run a large web-hosting firm and have traffic coming to us from all over the world. The log files capture, among other things, the page name and the source location of the user.
….., USA, page1,….other data….
We would like to analyze by country the most sought after page. Pretty straightforward until I throw this wrench in. My log files are in terabytes size. Trying to run the search on a single machine would be extremely time consuming. That is assuming you are able to make available the entire data set on one machine. You could manually split the files to multiple machines, but you need a framework to split the data sets into smaller pieces, distribute them between the nodes , perform the search on each of those machines, bring the data back and aggregate the results to get to your final results. If one of the machines stops responding, then you have to worry about fail-over and retry. Pretty daunting if you have to roll your own framework. This is where Hadoop steps in.
At its core Hadoop is made of the HDFS (Hadoop Distributed File System) and the MapReduce data processing model. In this article I will focus on MapReduce.
HDFS is a file system that can store large files across a distributed network. HDFS – Excerpt from Hadoop site “Hadoop Distributed File System (HDFS™) is the primary storage system used by Hadoop applications. HDFS creates multiple replicas of data blocks and distributes them on compute nodes throughout a cluster to enable reliable, extremely rapid computations.”
MapReduce as mentioned earlier is a simple programming model. It is made of two phases – the Map phase and the Reduce phase. Each phase takes as input key value pairs and generates key-value pair as output. We specify a map function and a reduce function. Map will take in as input a line of text from our data file and send as output a set of key value pairs. The framework will then sort this result and pass it to the reduce function. The reduce function will additional aggregation logic we specify to generate the final results. The output from reduce is again key-value pairs.
Hadoop can run in three modes:
- Local Mode: This does not have rely on HDFS and runs locally. Great for development purposes. This is the default mode.
- Pseudo Distributed Mode: Runs with HDFS, but on a single machine (by default file system is stored in /tmp).
- Fully Distributed Mode: Runs Hadoop over multiple machines (HDFS and MapReduce).
To change the mode you will need to update configuration xml files in the ${HADOOP}/etc/hadoop folder. For this particular blog we will run it in the default local mode so no changes are required. The data set we will use is freely available from data.medicare.gov. The data is a survey of patients hospital experiences. It is not a large data set – only 4k lines of data. But it helps to illustrate the use of MapReduce. I want to find out the highest percentage of “doctors that did not communicate well” BY state. The data is by hospital. For the sake of this example I ignore the hospital name. Take a look at the data. Column #7 contains the state and #14 contains the doctor communication percentage.
Lets start with our Map function:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package doop1; import java.io.IOException; import java.util.Iterator; import org.apache.commons.lang.text.StrTokenizer; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class DoctorCommunication { public static class InsufficientDoctorCommunicationMapper extends MapReduceBase implements Mapper<longwritable, text,="" doublewritable=""> { @Override public void map(LongWritable key, Text value, OutputCollector<text, doublewritable=""> output, Reporter reporter) throws IOException { StrTokenizer line = StrTokenizer.getCSVInstance(value.toString()); String[] tokens = line.getTokenArray(); String state = tokens[6]; String docFb = tokens[13]; double docFbPercent = 0d; try { docFbPercent = Double.parseDouble(docFb.substring(0, docFb.lastIndexOf('%'))); } catch (Exception ex) { docFbPercent = 0d; } output.collect(new Text(state), new DoubleWritable(docFbPercent)); } } .....MORE CODE HERE.... } |
The map function gets a line of text from the sample data file. It parses the contents as per its needs and places the resulting key (state) – value (percentage) in the output via the Context object. Once the map has run through all of the data it will sort it and the output will look like:
VA {10,12,33,45}
PA {22,33,12,24}
This output is then fed to the reduce function which will then further “reduce” it by picking only the maximum percentage value for each state.
The reduce function is:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package doop1; import java.io.IOException; import java.util.Iterator; import org.apache.commons.lang.text.StrTokenizer; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class DoctorCommunication { ....other code.... public static class InsufficientDoctorCommunicationReducer extends MapReduceBase implements Reducer<text, doublewritable,="" text,="" doublewritable=""> { @Override public void reduce(Text key, Iterator values, OutputCollector<text, doublewritable=""> output, Reporter reporter) throws IOException { double maxValue = Double.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new DoubleWritable(maxValue)); } } } |
As explained earlier the reduce function will pick the maximum percentage for each state and produce an output that looks like:
VA {45}
PA {33}
Now to test this we need a driver class. Here is the code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package doop1; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class InsufficientDoctorCommunication { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { if (args.length != 2) { System.err.println("Usage: hadoop " + InsufficientDoctorCommunication.class.getName() + " (input_path) (output_path)"); System.exit(-1); } JobConf conf = new JobConf(DoctorCommunication.class); conf.setJobName("doctorcommunication"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(DoubleWritable.class); conf.setMapperClass(DoctorCommunication.InsufficientDoctorCommunicationMapper.class); conf.setCombinerClass(DoctorCommunication.InsufficientDoctorCommunicationReducer.class); conf.setReducerClass(DoctorCommunication.InsufficientDoctorCommunicationReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } } |
To execute the processing run:
- Download Hadoop (if you have not already done so). Hereafter the root hadoop folder will be called simply ${HADOOP} (your folder might be named hadoop-2.4.1).
- Change location to the project root folder
- Make sure you have JAVA_HOME set. I have mine set to JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/
- Set the JAVA_HOME in ${HADOOP}/etc/hadoop/hadoop-env.sh
- Run the command >> hadoop jar target/hadoop-patient-experience-1.0.jar doop1.InsufficientDoctorCommunication src/main/resources/PatientExperiences.csv output
- Once processing is done you should see a file named output/part-00000. It will contain the results of your processing by state.
You can get the code from GitHub at – https://github.com/thomasma/hadoop-patient-experience. Run ‘mvn package’ to generate the hadoop-patient-experience-1.0.jar file. The data file is also part of the project (src/main/resources).
That is it for now. In the next blog we will look at how we can run the same example on HDFS using the Pseudo-Distributed mode.