Hadoop Word Count Streaming Example with Node.js

This is a Hadoop streaming example MapReduce program implemented with Node.js.  It is based on the example from Michael G. Noll that can be found here.  It was tested with Michael's example single-node and multi-node setups on Ubuntu 11.10.  This example assumes that the Hadoop environment has been set up according to one of Michael's examples, with the only other prerequisite being Node.js itself.  Most of this example mimics Michael's Python example with the program examples and command lines being updated to accommodate the new language, file names, and in a couple of places, adding consistency for his preferred Hadoop user name (hduser) and some file paths.  When using this type of streaming example on a multi-node Hadoop cluster, remember to make sure that Node.js is installed on each node in the cluster, the scripts have been copied to the same path on each node in the cluster, and that execute permissions have been added to the scripts (see below).

Note:  Installing Node.js on Ubuntu 11.10


sudo apt-get install nodejs npm

Map:  mapper.js

Save the following code in the file /home/hduser/mapper.js.  It will read from process.stdin (standard input), split it into words and output a list of lines mapping words to their (intermediate) counts to process.stdout (standard output), just like the original Python example.  Just like the original, this is not an optimized, production ready example.  

Make sure to add execute permissions to the file (chmod +x /home/hduser/mapper.js) or the examples will fail to run on any nodes where the script does not have execute permissions.


#!/usr/bin/node

var stdin = process.stdin;
var stdout = process.stdout;

function processLine(line) {
	if(line && line.trim().length > 0) {
		var s = line.trim().split(' ');
		for(var i = 0; i < s.length; ++i) {
			stdout.write(s[i] + '\t1\n');
		}
	}
} 

var data = '';
stdin.setEncoding('utf8');
stdin.resume();
stdin.on('data', function(chunk) {
	data += chunk;
	data = data.replace(/\r\n/g, '\n');
	while(data.indexOf('\n') > -1) {
		var i = data.indexOf('\n') + 1;
		processLine(data.slice(0,i));
		data=data.slice(i);
	}
});
stdin.on('end', function() {
	processLine(data);
});

Reduce:  reducer.js

Save the following code in the file /home/hduser/reducer.js.  It will read the results of mapper.js from process.stdin (standard input), sum the occurrences of each word to a final count, and output its results to process.stdout (standard output).

Make sure to add execute permissions to the file (chmod +x /home/hduser/reducer.js) or the examples will fail to run on any nodes where the script does not have execute permissions.


#!/usr/bin/node

var stdin = process.stdin;
var stdout = process.stdout;

function processLine(line) {
	if(line && line.trim().length > 0) {
		var s = line.trim().split('\t');
		if(s.length == 2) {
			var w = s[0].trim();
			var c = parseInt(s[1]);
			if(!isNaN(c)) {
				if(cw == w) {
					cc += c;
				}
				else {
					if(cw) {
						stdout.write(cw + '\t' + cc + '\n');
					}
					cw = w;
					cc = c;
				}
			}
		}
	}
} 

var data = '';
var cw = null;
var cc = 0;

stdin.setEncoding('utf8');
stdin.resume();
stdin.on('data', function(chunk) {
	data += chunk;
	data = data.replace(/\r\n/g, '\n');
	while(data.indexOf('\n') > -1) {
		var i = data.indexOf('\n') + 1;
		processLine(data.slice(0,i));
		data=data.slice(i);
	}
});
stdin.on('end', function() {
	processLine(data); 
	stdout.write(cw + '\t' + cc + '\n'); 
});

Download Example Input Data

We will use three ebooks from Project Gutenberg for this example:

Download each eBook as a text file in Plain Text UTF-8 encoding and store the files in a temporary directory of your choice, such as /tmp/gutenberg.


hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt
hduser@ubuntu:~$

Copy local example data to HDFS

Before we run the actual MapReduce job, we first have to copy the files from the local file system to Hadoop's HDFS.


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$

Test the scripts (cat data | map | sort | reduce)

I recommend testing your mapper.js and reducer.js scripts on the command line before trying to use them in a MapReduce job.  Due to how some error conditions are tracked and reported in Hadoop, it is possible to have errors in the scripts even though the Hadoop job may complete successfully.  A key indicator of this is having the job complete without complaint, but have no results from the output, or output that you would not have expected.

Here are some ideas on testing the functionality of the scripts.

Very Basic Map Test


 hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.js
 foo     1
 foo     1
 quux    1
 labs    1
 foo     1
 bar     1
 quux    1

Very Basic Reduce Test

Note:  The sort in the middle is important.  It simulates the sort that Hadoop does to the resulting keys of the mapping stage before passing the results to the reducer.


hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.js | sort -k1,1 | /home/hduser/reducer.js
 bar     1
 foo     3
 labs    1
 quux    2

Run the MapReduce Job

Now that everything is ready, we can run the Node.js MapReduce job on the Hadoop cluster.  We use HadoopStreaming to pass data between out Map and Reduce code via process.stdin (standard input) and process.stdout (standard output).


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.js -mapper /home/hduser/mapper.js -file /home/hduser/reducer.js -reducer /home/hduser/reducer.js -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

The job will read all of the files in the HDFS directory /user/hduser/gutenberg, process it, and store the results in the HDFS directory /user/hduser/gutenberg-output.  In general Hadoop will create one output file per reducer; in our case, it will only create a single file because the input files were very small.

Job Monitoring and Working with the Output

Michael's original posts have more information on monitoring the job's status and reading the output, all of which also applies to this example.

Download Files

The mapper and reducer scripts can be downloaded from github.

Posted by Richard Sunday, February 19, 2012 9:15:00 PM Categories: hadoop mapreduce nodejs
Comments are closed on this post.
  • RSS
  • Add To My MSN
  • Add To Windows Live
  • Add To My Yahoo
  • Add To Google

Statistics

  • Entries (2)
  • Comments (0)