Hi All,
I am new to Mapreduce and I am trying to explore it a little. I took the basic WordCount example and have run it over data that is in mySQL table, it is giving 34 count for each individual record of mySQL. I assume that the map function is being called 34 times for each of the record in the table. I wonder is there a way to control the number of times the map function can be called. Please let me know if there is something I am missing.Any help on this is appreciated.
Here is the code that I am using:
public class WordCount1 {
public static Connection Con;
public static Statement statement = null;
public static PreparedStatement preparedStatement = null;
public static ResultSet resultSet = null;
public static class Map extends MapReduceBase implements Mapper<longwritable,> {
private final static IntWritable one = new IntWritable(1);
public static Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<text,> output, Reporter reporter) throws IOException {
try
{
Class.forName("com.mysql.jdbc.Driver");
Con= DriverManager.getConnection("jdbc:mysql://<<ip_add>>:3306/test","user","mysql");
statement = Con.createStatement();
resultSet = statement.executeQuery("select * from test.a1");
while(resultSet.next())
{
word.set(resultSet.getString("aname"));
output.collect(word,one);
}
}
catch (ClassNotFoundException e)
{
System.out.println("no MYSQL Driver found");
word.set(e.toString());
output.collect(word,one);
}
catch (SQLException e)
{
word.set(e.toString());
output.collect(word,one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<text,> {
public void reduce(Text key, Iterator<intwritable> values, OutputCollector<text,> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount1.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Thanks in Advance,
Praveen K Bandi