Saturday, November 01, 2008

My First MapReduce with Hadoop

Last week, I described a Phrase Spelling corrector that depended on Word Collocation Probabilities. The probabilities (from the occur_a and occur_ab tables) were based on data that were in the ballpark of what they should be, but the fact remains that I cooked them up. So this week, I describe a Hadoop Map-Reduce class that pulls this information out of real user-entered search terms from Apache access logs.

Background

I learned about the existence of the Map-Reduce programming style about a year ago, when a colleague told us about a free Hadoop seminar hosted by Yahoo!. Unfortunately, by the time I got around to enroll, all the seats were taken. I did read the original Map-Reduce paper by Jeffrey Dean and Sanjay Ghemawat of Google at the time, and while it seemed a neat idea, I wasn't sure where or how I would be able to use it in my work. So not being able to go to the seminar didn't seem like a huge loss.

More recently, however, as I spend more time working in and around text mining, I find myself spending more time waiting for programs to finish executing than I spend writing them. I had made a few half-hearted attempts to try to pick up Hadoop on my own, but it is a fairly steep learning curve, and I was never able to find the time to get to a point where I could model my existing algorithm as an equivalent Map-Reduce program, before having to move on to other things.

On a somewhat unrelated note, I recently also joined East Bay IT Group (EBIG) in an attempt to meet other like-minded tech professionals and to enhance my chances of landing a job closer to home. Just kidding about that last one, since nobody at EBIG seems to be working anywhere east of Oakland. So in any case, the first talk (since my joining) on the Java SIG was on Hadoop, by Owen O'Malley of Yahoo, so I figured that attending it would be a good way to quickly ramp up on Hadoop. I am happy to say that the talk was very informative (thanks Owen!) and I did get enough out of it to be able to write my own code soon after.

Specifications

The structure of our search URL is as follows:

1
2
  http://.../search/q1=term[&name=value...]
  where term: a single or multi-word query term

The idea is to run through all the access logs and count unique occurrences of single words and unique word pairs from the q1 values. These counts will later be fed into the occur_a and occur_ab tables described in my previous post.

For development, I just use the access_log files that are in my /var/log/httpd directory (just 4 of them), but in the final run on a cluster (not described in this post) will use a years worth of log files.

The MapReduce Class

Here's the code for the MapReduce class. As you can see, the Map and Reduce classes are written as inner classes of the main class. This seems to be the preferred style so I went with it, but it may be more unit-testable if you put each job into its own package and put the Map and Reduce classes inside that package. I did test the supporting classes, and did a quick test run, and things came out ok, so...

The Map class is called MapClass and the Reduce class is called ReduceClass. In addition, there is a PartitionerClass that attempts to send the pair output from the Map to one Reducer and the singleton output to another, so they are "sorted" in the final output, but apparently you cannot have more than one reducer in a non-clustered environment, so you cannot have a Partioner partition Map output to a second Reducer (because it does not exist). That is why the PartitionerClass is defined but commented out in the JobConf settings.

Once the Map and Reduce classes are defined, the main method sets up a JobConf and sets the Map and Reduce classes into it. The framework takes care of the rest. Basically the input is read line by line and passed into a bank of available Map classes. The outputs are accumulated into temporary file(s). Once all Map classes are done, the pairs written by the Map classes are sent to a bank of Reducers, which accumulates them. Once all the Reducers are done, the program ends.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Source: src/main/java/com/mycompany/accessloganalyzer/AccessLogAnalyzer.java
package com.mycompany.accessloganalyzer;

import java.io.IOException;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

import com.mycompany.accessloganalyzer.NcsaLogParser.NcsaLog;

public class AccessLogAnalyzer {

  private static class MapClass extends MapReduceBase 
    implements Mapper<WritableComparable<Text>,Writable,
                      WritableComparable<Text>,Writable> {

    public void map(WritableComparable<Text> key,Writable value,
        OutputCollector<WritableComparable<Text>,Writable> output,
        Reporter reporter) throws IOException {
      String line = ((Text) value).toString();
      EnumMap<NcsaLog,String> values = NcsaLogParser.parse(line);
      String url = values.get(NcsaLog.REQUEST_URL);
      if (url.startsWith("/search")) {
        Map<String,String> parameters = 
          NcsaLogParser.getUrlParameters(url);
        String searchTerms = parameters.get("q1");
        String[] terms = StringUtils.split(searchTerms, " ");
        for (String term : terms) {
          output.collect(new Text(term), new LongWritable(1));
        }
        if (terms.length > 1) {
          // need to have at least 2 words to generate pair-wise combinations
          CombinationGenerator combinationGenerator = 
            new CombinationGenerator(terms.length, 2);
          Set<Pair> combinations = new HashSet<Pair>();
          while (combinationGenerator.hasMore()) {
            int[] indices = combinationGenerator.getNext();
            combinations.add(new Pair(terms[indices[0]], terms[indices[1]]));
          }
          for (Pair combination : combinations) {
            output.collect(new Text(combination.toString()), 
              new LongWritable(1));
          }
        }
      }
    }
  }
  
  private static class ReduceClass extends MapReduceBase 
    implements Reducer<WritableComparable<Text>,Writable,
                       WritableComparable<Text>,Writable> {

    public void reduce(WritableComparable<Text> key, 
        Iterator<Writable> values,
        OutputCollector<WritableComparable<Text>,Writable> output,
        Reporter reporter) throws IOException {
      long occurs = 0;
      while (values.hasNext()) {
        occurs += ((LongWritable) values.next()).get();
      }
      output.collect(key, new LongWritable(occurs));
    }
  }

  private static class PartitionerClass  
    implements Partitioner<WritableComparable<Text>,Writable> {

    public void configure(JobConf conf) { /* NOOP */ }

    public int getPartition(WritableComparable<Text> key, Writable value, 
        int numReduceTasks) {
      if (numReduceTasks > 1) {
        String k = ((Text) key).toString();
        return (k.contains(",") ? 1 : 0);
      }
      return 0;
    }
  }
  
  static class Pair {
    public String first;
    public String second;

    public Pair(String first, String second) {
      String[] pair = new String[] {first, second};
      Arrays.sort(pair);
      this.first = pair[0];
      this.second = pair[1];
    }
    
    @Override
    public int hashCode() {
      return toString().hashCode();
    }
    
    @Override
    public boolean equals(Object obj) {
      if (!(obj instanceof Pair)) {
        return false;
      }
      Pair that = (Pair) obj;
      return (this.first.equals(that.first) &&
        this.second.equals(that.second));
    }
    
    @Override
    public String toString() {
      return StringUtils.join(new String[] {first, second}, ",");
    }
  }

  public static void main(String[] argv) throws IOException {
    if (argv.length != 2) {
      System.err.println("Usage: calc input_path output_path");
      System.exit(-1);
    }
    
    JobConf jobConf = new JobConf(AccessLogAnalyzer.class);
    
    FileInputFormat.addInputPath(jobConf, new Path(argv[0]));
    FileOutputFormat.setOutputPath(jobConf, new Path(argv[1]));
    
    jobConf.setOutputKeyClass(Text.class);
    jobConf.setOutputValueClass(LongWritable.class);
    
    jobConf.setMapperClass(MapClass.class);
    jobConf.setCombinerClass(ReduceClass.class);
    jobConf.setReducerClass(ReduceClass.class);
//    jobConf.setPartitionerClass(PartitionerClass.class);
    
    jobConf.setNumReduceTasks(2);
    
    JobClient.runJob(jobConf);
  }
}

Supporting Classes

NCSA Log Parser

I went looking for a NCSA Log Parser but couldn't find one, so I wrote my own. I tried to make it generic, since I will probably be re-using this parser to pull other stuff out of the logs in the future. The parser described below parses the NCSA Common Log file format, which is smallest of the three NCSA Log formats. Here is the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Source: src/main/java/com/mycompany/accessloganalyzer/NcsaLogParser.java
package com.mycompany.accessloganalyzer;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.commons.lang.StringUtils;

public class NcsaLogParser {
  
  public enum NcsaLog {
    HOST,
    PROTOCOL,
    USERNAME,
    DATE,
    TIME,
    TIMEZONE,
    REQUEST_METHOD,
    REQUEST_URL,
    REQUEST_PROTOCOL,
    STATUS_CODE,
    BYTE_COUNT
  };
  
  public static EnumMap<NcsaLog,String> parse(String logline) {
    EnumMap<NcsaLog,String> values = 
      new EnumMap<NcsaLog, String>(NcsaLog.class);
    StringTokenizer tok = new StringTokenizer(logline, " ");
    if (tok.hasMoreTokens()) {
      values.put(NcsaLog.HOST, tok.nextToken());
      values.put(NcsaLog.PROTOCOL, tok.nextToken());
      values.put(NcsaLog.USERNAME, tok.nextToken());
      String dttm = tok.nextToken();
      values.put(NcsaLog.DATE, dttm.substring(1, dttm.indexOf(':')));
      values.put(NcsaLog.TIME, dttm.substring(dttm.indexOf(':') + 1));
      String tz = tok.nextToken();
      values.put(NcsaLog.TIMEZONE, tz.substring(0, tz.length() - 1));
      String requestMethod = tok.nextToken();
      values.put(NcsaLog.REQUEST_METHOD, requestMethod.substring(1));
      values.put(NcsaLog.REQUEST_URL, tok.nextToken());
      String requestProtocol = tok.nextToken();
      values.put(NcsaLog.REQUEST_PROTOCOL, 
        requestProtocol.substring(0, requestProtocol.length() - 1));
      values.put(NcsaLog.STATUS_CODE, tok.nextToken());
      values.put(NcsaLog.BYTE_COUNT, tok.nextToken());
    }
    return values;
  }
  
  public static Map<String,String> getUrlParameters(String url) throws IOException {
    Map<String,String> parameters = new HashMap<String,String>();
    int pos = url.indexOf('?');
    if (pos == -1) {
      return parameters;
    }
    String queryString = url.substring(pos + 1);
    String[] nvps = queryString.split("&");
    for (String nvp : nvps) {
      String[] pair = nvp.split("=");
      if (pair.length != 2) {
        continue;
      }
      String key = pair[0];
      String value = pair[1];
      // URL decode the value, replacing + and %20 etc chars with their
      // non-encoded equivalents.
      try {
        value = URLDecoder.decode(value, "UTF-8");
      } catch (UnsupportedEncodingException e) {
        throw new IOException("Unsupported encoding", e);
      }
      // replace all punctuation by space
      value = value.replaceAll("\\p{Punct}", " ");
      // lowercase it
      value = StringUtils.lowerCase(value);
      parameters.put(key, value); 
    }
    return parameters;
  }
}

Combination Generator

I needed a way to enumerate all pairs of words I find in a multi-word phrase. Michael Gilleland has already written one that works great, so all I did was to just copy this into my own package structure and use it. You can read/snag the code from Michael's site.

Packaging

Hadoop needs the classes packaged a certain way. Along with the compiled classes, you also want to add in any runtime dependency JAR files in a lib/ directory. You can optionally supply a MANIFEST.MF file specifying the Main-Class if you want to use the java -jar calling style. Since I use Maven, but don't really know of an easy way to write ad-hoc scripts to build new Maven goals, I decided to generate an Ant build.xml using Maven, then writing a new target.

  1. To generate the Ant build.xml, run mvn ant:ant.
  2. Add hadoop.jar to build.classpath
  3. Add definitions for input and output directories for the job
  4. Add the hadoop build and run target (shown below).

My target to package and run the jar are shown below. In the future, when I run this on a remote cluster, I will decouple it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  <target name="hadoop-log-analyzer" 
      description="Launch AccessLogAnalyzer job on Hadoop" depends="compile">
    <!-- create new directory target/lib and copy required runtime
         dependencies for the hadoop job into it -->
    <delete dir="${maven.build.directory}/jars/lib"/>
    <mkdir dir="${maven.build.directory}/jars/lib"/>
    <copy todir="${maven.build.directory}/jars/lib" flatten="true">
      <fileset dir="${maven.repo.local}">
        <include name="commons-lang/commons-lang/2.1/commons-lang-2.1.jar"/>
      </fileset>
    </copy>
    <!-- create jar file with classes and libraries -->
    <jar jarfile="${maven.build.directory}/log-analyzer.jar">
      <fileset dir="${maven.build.directory}/classes"/>
      <fileset dir="${maven.build.directory}/jars"/>
      <manifest>
        <attribute name="Main-Class"
          value="com/healthline/accessloganalyzer/AccessLogAnalyzer"/>
      </manifest>
    </jar>
    <!-- clean up output directory -->
    <delete dir="${basedir}/src/main/resources/access_log_outputs"/>
    <!-- run jar in hadoop -->
    <exec executable="bin/hadoop" dir="/opt/hadoop-0.18.1">
      <arg value="jar"/>
      <arg value="${basedir}/target/log-analyzer.jar"/>
      <arg value="${basedir}/src/main/resources/access_logs"/>
      <arg value="${basedir}/src/main/resources/access_log_outputs"/>
    </exec>
  </target>

My only runtime dependency was commons-lang-2.3.jar which I provide to the package in the target above.

(Local) Dev Testing

Unlike Owen, my other computer is not a data center. In fact, unless you count my work desktop, my laptop is the only computer I have. So I need to be able to test the job on my laptop first. Here is what I had to do.

  1. Explode the Hadoop tarball into a local directory. My hadoop directory (HADOOP_HOME) is /opt/hadoop-0.18.1.
  2. In the $HADOOP_HOME/conf/hadoop-env.sh file, update JAVA_HOME to point to whatever it is on your machine. The export is commented out, uncomment and update.
  3. Run the package using the Ant target

Here is the output of running ant hadoop-log-analyzer from the command line (edited for readability by removing dates from the logs).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
hadoop-log-analyzer:
     [exec] jvm.JvmMetrics: Initializing JVM Metrics with \
       processName=JobTracker, sessionId=
     [exec] mapred.FileInputFormat: Total input paths to process : 5
     [exec] mapred.JobClient: Running job: job_local_1
     [exec] mapred.MapTask: numReduceTasks: 1
     [exec] mapred.LocalJobRunner: \
       file:~/myapp/src/main/resources/access_logs/access_log.3:0+118794
     [exec] mapred.TaskRunner: Task 'map_0000' done.
     [exec] mapred.MapTask: numReduceTasks: 1
     [exec] mapred.LocalJobRunner: \
       file:~/myapp/src/main/resources/access_logs/access_log.4:0+3811
     [exec] mapred.TaskRunner: Task 'map_0001' done.
     [exec] mapred.MapTask: numReduceTasks: 1
     [exec] mapred.LocalJobRunner: \
       file:~/myapp/src/main/resources/access_logs/access_log:0+446816
     [exec] mapred.TaskRunner: Task 'map_0002' done.
     [exec] mapred.MapTask: numReduceTasks: 1
     [exec] mapred.LocalJobRunner: \
       file:~/myapp/src/main/resources/access_logs/access_log.2:0+99752
     [exec] mapred.TaskRunner: Task 'map_0003' done.
     [exec] mapred.MapTask: numReduceTasks: 1
     [exec] mapred.LocalJobRunner: \
       file:~/myapp/src/main/resources/access_logs/access_log.1:0+36810
     [exec] mapred.TaskRunner: Task 'map_0004' done.
     [exec] mapred.LocalJobRunner: reduce > reduce
     [exec] mapred.TaskRunner: Task 'reduce_mglk8q' done.
     [exec] mapred.TaskRunner: Saved output of task \
       'reduce_mglk8q' to file:~/myapp/src/main/resources/access_log_outputs
     [exec] mapred.JobClient: Job complete: job_local_1
     [exec] mapred.JobClient: Counters: 9
     [exec] mapred.JobClient:   Map-Reduce Framework
     [exec] mapred.JobClient:     Map input records=2661
     [exec] mapred.JobClient:     Map output records=186
     [exec] mapred.JobClient:     Map input bytes=705983
     [exec] mapred.JobClient:     Map output bytes=3381
     [exec] mapred.JobClient:     Combine input records=186
     [exec] mapred.JobClient:     Combine output records=34
     [exec] mapred.JobClient:     Reduce input groups=31
     [exec] mapred.JobClient:     Reduce input records=34
     [exec] mapred.JobClient:     Reduce output records=31

Output is in the output directory in a file called part-0000, and here are a few lines from it to show what it looks like. In a full clustered system, we would be able to use the Partitioner to partition the pairs and the singletons into two Reducers so they will be distinct chunks.

1
2
3
4
5
6
7
8
9
asthma  6
breast  7
breast,cancer   7
cancer  18
diabetes        3
diaries 1
diaries,headache        1
disease 6
...

So thats pretty much it. The MapReduce style is a very powerful mechanism that allows average developers with domain expertise to write code that can be run within a framework, such as Hadoop, on large clusters to parallelize the computation. So its worth knowing, especially if you need to write batch programs that run on large data sets. I believe that I now understand enough about Hadoop to be able to use it effectively, and have reached a stage where I can pick up what I don't know. I hope this article has helped you in a similar way as well.

4 comments (moderated to prevent spam):

Luis Fdez. said...

Hi,
What about use Hadoop with a POS Tagger (like Stanford or Snowball)? Do you test it?

Sujit Pal said...

Hi Luis, no I haven't, sorry. But your comment gave me an idea (of using Stanford and Hadoop together), thanks for that :-).

aisha said...

hiiiii...

i did hadoop with stanford lemmatization. Its running fine with small inputs but as soon as I give 1gb of data, its throwing me some kind of child error: task exists with non-zero status 255..

i guess it has to do with memory issue.tried adding -Xmx4096m in mapred-site.xml.but its not working..can u suggest a way to solve this issue?

Sujit Pal said...

Hi Aisha, if you are using the standard approach of sending the entire text to CoreNLP and let it annotate the whole thing, then query the annotations, its probably the problem you are seeing. One way to work around this would be to use a streaming sentence splitter like Stanford's DocumentPreprocessor to convert your 1GB input into a list of sentences, then run lemmatizer against each sentence.