//printenv
// JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera
// PATH=/usr/java/jdk1.7.0_67-cloudera/bin
// export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
// hadoop com.sun.tools.javac.Main WordCount3.java
// jar cf WordCount2.jar WordCount2*.class
// hadoop jar WordCount2.jar WordCount2 big.txt outWCBig
// hadoop jar WordCount2.jar WordCount2 t8.shakespeare.txt outWCShake
// t8.shakespeare
// Remove the previous results.
// $ hadoop fs -rm -r -f /user/cloudera/wordcount/output
import java.io.* ;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.jobWordCount;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
public class WordCount2 extends Configured implements Tool
{
private static final Logger LOG = Logger.getLogger(WordCount2.class);
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCount2(), args);
}
public static boolean isNullOrEmpty(String str) {
if(str != null && !str.trim().isEmpty())
return false;
return true;
}
public static class PunctuationMapper
extends Mapper<Object, Text, NullWritable, Text> {
private Text punctd = new Text();
//private static fin//al String PunctuationMarks="\"\'\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
private static final String PunctuationMarks="\"\\[\\]\\\\!$&@~#%:;`<>(){}/!|?*-+=^,.";
//DON'T
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String word= value.toString();
word=word.replaceAll("-", "");
//word=word.replaceAll("'", "");
word=word.replaceAll("["+PunctuationMarks+"]", " ");
//word = word.replaceAll("[^a-zA-Z\\s+]", " ").toLowerCase();
//word = curr.trim();
//punctd.set(Regex.Replace(word.toString(),[^\w\d\s-]," "));
//String word_punc=word.replaceAll("[^\\w\\d\\s-]", " ");
//word=word.replace( "/\s\s+/g", "" );// -> 390ms
punctd.set(word);
context.write(new IntWritable(1), punctd);
}
}
public static class TrimMapper
extends Mapper<Object, Text, NullWritable, Text> {
private Text trimd = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String word= value.toString().trim();
//word=word.replaceAll( "/\s\s+/g", "" );// -> 390ms
word=word.replaceAll("^ +| +$|( )+", "$1");
trimd.set(word);
context.write(new IntWritable(1), trimd);
}
}
public static class LowerCaseMapper
extends Mapper<Object, Text, NullWritable, Text> {
private Text lowercased = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
lowercased.set(value.toString().toLowerCase());
context.write(new IntWritable(1), lowercased);
}
}
public static class TokenizerMapper
extends Mapper<IntWritable, Text, Text, IntWritable>{
private static final java.util.regex.Pattern WORD_BOUNDARY = java.util.regex.Pattern.compile("\\s");
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(IntWritable key, Text lineText, Context context
) throws IOException, InterruptedException {
String line = lineText.toString();
Text currentWord = new Text();
for (String word : WORD_BOUNDARY.split(line)) {
if (WordCount2.isNullOrEmpty(word)) {
continue;
}
currentWord = new Text(word);
context.write(currentWord,one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key_word, Iterable<IntWritable> counts,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
result.set(sum);
context.write(key_word, result);
}
}
//////////////////////////////////////
public static class TopTenMapper extends Mapper<Object, Text, NullWritable, Text>
{
private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>(); //Collections.reverseOrder()
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// (word, count) tuple
String[] words = value.toString().split("\t") ;
if (words.length < 2) {
return;
}
topN.put(Integer.parseInt(words[1]), new Text(value));
if (topN.size() > 10) {
//topN.remove(topN.lastKey());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
for (Text t : topN.values()) {
context.write(NullWritable.get(), t);
}
}
}
public static class TopTenReducer extends
Reducer<NullWritable, Text, NullWritable, Text> {
private TreeMap<Integer, Text> topN = new TreeMap<Integer, Text>();
@Override
public void reduce(NullWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for (Text value : values) {
String[] words = value.toString().split("\t") ;
topN.put(Integer.parseInt(words[1]), new Text(value));
if (topN.size() > 10) {
}
}
for (Text word : topN.descendingMap().values()) {
context.write(NullWritable.get(), word);
}
}
}
//////////////////////////////////////
public int run(String[] args) throws Exception {
Configuration conf = getConf();
//////////////
FileSystem fs = FileSystem.get(conf);
Path tmpPath = new Path("/w1/tmp");
fs.delete(tmpPath, true);
Path inputPath = new Path(args[0]);
//Path partitionFile = new Path(args[1] + "_partitions.lst");
//Path outputStage = new Path(args[1] + "_staging");
Path outputStage = new Path("/w1/tmp");
Path outputOrder = new Path(args[1]);
//////////////
args = new GenericOptionsParser(conf, args).getRemainingArgs();
// creating a word count jobWordCount
jobWordCount jobWordCount = jobWordCount.getInstance(conf,"wordcount");
//jobWordCount jobWordCount = jobWordCount.getInstance(getConf(), "wordcount");
//jobWordCount.setJarByClass(WordCount2.class);
jobWordCount.setJarByClass(this.getClass());
// Use TextInputFormat, the default unless jobWordCount.setInputFormatClass is used
//public static class PunctuationMapper
//extends Mapper<Object, Text, NullWritable, Text>
Configuration punctuationMapperConf = new Configuration(false);
ChainMapper.addMapper(jobWordCount,
PunctuationMapper.class,
Object.class, Text.class,
IntWritable.class, Text.class,
punctuationMapperConf);
//public static class TrimMapper
//extends Mapper<Object, Text, NullWritable, Text>
Configuration trimMapperConf = new Configuration(false);
ChainMapper.addMapper(jobWordCount,
TrimMapper.class,
Object.class, Text.class,
IntWritable.class, Text.class,
trimMapperConf);
//public static class LowerCaseMapper
//extends Mapper<Object, Text, NullWritable, Text>
Configuration lowerCaseMapperConf = new Configuration(false);
ChainMapper.addMapper(jobWordCount,
LowerCaseMapper.class,
Object.class, Text.class,
//IntWritable.class, Text.class,
IntWritable.class, Text.class,
lowerCaseMapperConf);
//public static class TokenizerMapper
// extends Mapper<IntWritable, Text, Text, IntWritable>
Configuration tokenizerConf = new Configuration(false);
ChainMapper.addMapper(jobWordCount,
TokenizerMapper.class,
IntWritable.class,Text.class,
Text.class, IntWritable.class,
tokenizerConf);
//public static class IntSumReducer
//extends Reducer<Text,IntWritable,Text,IntWritable>
jobWordCount.setReducerClass(IntSumReducer.class);
jobWordCount.setOutputKeyClass(Text.class);
jobWordCount.setOutputValueClass(IntWritable.class);
//FileInputFormat.addInputPath(jobWordCount, new Path(args[0]));
TextInputFormat.setInputPaths(jobWordCount, inputPath);
//FileOutputFormat.setOutputPath(jobWordCount, new Path(args[1]));
FileOutputFormat.setOutputPath(jobWordCount, tmpPath);
// Set the output format to a sequence file
//jobWordCount.setOutputFormatClass(SequenceFileOutputFormat.class);
//SequenceFileOutputFormat.setOutputPath(jobWordCount, outputStage);
int code = jobWordCount.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
//Now that we have extracted column to sort
Job orderJob = new Job(conf, "TopWords");
orderJob.setJarByClass(WordCount2.class);
// Here, use the identity mapper to output the key/value pairs in
// the SequenceFile
orderJob.setMapperClass(TopTenMapper.class);
orderJob.setReducerClass(TopTenReducer.class);
//********************
//public static class TopTenMapper
//extends Mapper<Object, Text, NullWritable, Text>
jobB.setMapOutputKeyClass(NullWritable.class);
jobB.setMapOutputValueClass(Text.class);
//********************
// Set the number of reduce tasks to an appropriate number for the
// amount of data being sorted
orderJob.setNumReduceTasks(10);
// Use Hadoop's TotalOrderPartitioner class
//orderJob.setPartitionerClass(TotalOrderPartitioner.class);
// Set the partition file
//TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
// partitionFile);
//********************
//public static class TopTenReducer
//extends Reducer<NullWritable, Text, NullWritable, Text>
//orderJob.setOutputKeyClass(Text.class);
//orderJob.setOutputValueClass(IntWritable.class);
//********************
orderJob.setOutputKeyClass(NullWritable.class);
orderJob.setOutputValueClass(Text.class);
//********************
// Set the input to the previous job's output
//orderJob.setInputFormatClass(SequenceFileInputFormat.class);
orderJob.setInputFormatClass(KeyValueTextInputFormat.class);
orderJob.setOutputFormatClass(TextOutputFormat.class);
//SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
// Set the output path to the command line parameter
TextOutputFormat.setOutputPath(orderJob, outputOrder);
// Set the separator to an empty string
//orderJob.getConfiguration().set(
// "mapred.textoutputformat.separator", "");
// Use the InputSampler to go through the output of the previous
// job, sample it, and create the partition file
//InputSampler.writePartitionFile(orderJob,
// new InputSampler.RandomSampler(.1, 10000));
FileInputFormat.setInputPaths(orderJob, tmpPath);
FileOutputFormat.setOutputPath(orderJob, new Path(args[1]));
// Submit the job
code = orderJob.waitForCompletion(true) ? 0 : 2;
}
// Clean up the partition file and the staging directory
// FileSystem.get(new Configuration()).delete(partitionFile, false);
// FileSystem.get(new Configuration()).delete(outputStage, true);
return (jobWordCount.waitForCompletion(true) ? 0 : 1);
}
}