Wednesday, June 6, 2012

Hadoop Simple Example

Hadoop Example
This post will help understand how to install (standalone node) and run a sample map-reduce job on hadoop. Although the example dose not reflect the most correct actual usage of the map-reduce, its good for starter to start learning and coding Hadoop



Setting up Hadoop on Ubuntu (or any other linux)
  • Download the debian file "hadoop_1.0.3-1_i386.deb" from apache hadoop site
  • Create an group named hadoop. If you are using ubuntu , you explicitly need to create a group because the debian package will try to create a group with id 123 . Usually the group id exists.
                    sudo groupadd -g 142 -r hadoop 
  • Install hadoop using a debian package
                    sudo dpkg -i hadoop_1.0.3-1_i386.deb
  • Create passphraseless SSH (This is , I suppose some limitation on framework as it require passwordless SSH to be enabled. Maybe in actual setup with multiple nodes , this is not needed.):
       sudo su -
       ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
       cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
  • Create a hdfs and format it :
      Create a new directory as HDFS and format it. We need to initialize it.
                     hadoop namenode -format
  • Start hadoop Node . Starting the dfs will ensure the distributed service is started . We also need to start the task node which will run the map reduce programs. 
                     start-dfs.sh
                     start-mapred.sh

  • Java Code
This java code will append a pudo text prior to each element in the csv file

package com.sanket;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class PudoFile {
public static class Map extends Mapper {
 private Text word = new Text();

 private Text key = new Text();
 public void map(Text key, Text value, Context context) throws IOException,   InterruptedException {
   String line = key.toString();
  //value is returned as NULL. Hence will parse the Key to read value
  StringTokenizer tokenizer = new StringTokenizer(line,",");
  String strKey = tokenizer.nextToken();
  key.set(strKey);
  word = new Text();
  word.set(line);
  context.write(key, word);
 } 

}
public static class Reduce extends Reducer {
 public void reduce(Text key, Iterable values, Context context) 

 throws IOException, InterruptedException {
  StringBuffer outputValue=new StringBuffer();
   for (Text val : values) {
    StringTokenizer st=new StringTokenizer(val.toString(),",");
    StringBuffer sb=new StringBuffer();
    while(st.hasMoreTokens())
    {
     String entity=st.nextToken();
     sb.append("Pudo"+entity+",");
     }
   context.write(new Text(), new Text(sb.toString().substring(0,sb.toString().length()-1))); 
  }
 }
}
public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();
 Job reverse = new Job(conf, "ReadCSV");
 reverse.setOutputKeyClass(Text.class);
 reverse.setOutputValueClass(Text.class);
 reverse.setMapOutputKeyClass(Text.class);
 reverse.setMapOutputValueClass(Text.class);
 reverse.setMapperClass(Map.class);
 reverse.setReducerClass(Reduce.class);
 //Regular TextInputFormat gives class cast exception
 //java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
 reverse.setInputFormatClass(KeyValueTextInputFormat.class);
 reverse.setOutputFormatClass(TextOutputFormat.class);
 FileInputFormat.addInputPath(reverse, new Path(args[0]));
 FileOutputFormat.setOutputPath(reverse, new Path(args[1]));
 reverse.waitForCompletion(true);
 }
}


  • Compile the class and jar it. We will use the jar to run map reduce programs.
  javac -classpath hadoop-core-1.0.3.jar -d bin PsudoFile.java
  jar cvf FirstProgram.jar -C bin/ .
  • Add an input file to HDFS from local file system
       hadoop fs -mkdir inputfile
       hadoop fs -put export.csv inputfile
  • Run the program using following command
           hadoop jar FirstProgram.jar com.sanket.PsudoFile inputcsv/export.csv outputcsv


  • Check the output and delete existing output file. The output adapter gives error if file already exists
      NOW=$(date +"%b-%d-%s")
      LOGFILE="log-$NOW.log" > $LOGFILE 
      hadoop fs -cat outputcsv/part-r-00000 > $LOGFILE
      hadoop fs -rmr outputcsv 
  • Output will be present in log---.log file


The output will not be in same sequence because of the internal sort done by map-reduce . The workaround is to implement your one Key and override the compare method. This will ensure that the output will be same as the input. But ideally , the data to be analyzed need not be given in the same sequence or can be used to derive a meaning out of the huge chunk of data.

A very practical example is  :
http://online.wsj.com/article/SB10001424052702303444204577460552615646874.html


Peace.
Sanket Raut

No comments: