Click here to Skip to main content
15,892,927 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Hi All,

I am new to Mapreduce and I am trying to explore it a little. I took the basic WordCount example and have run it over data that is in mySQL table, it is giving 34 count for each individual record of mySQL. I assume that the map function is being called 34 times for each of the record in the table. I wonder is there a way to control the number of times the map function can be called. Please let me know if there is something I am missing.Any help on this is appreciated.

Here is the code that I am using:

Java
public class WordCount1 {

 	public static Connection Con;
	public static Statement statement = null;
	public static PreparedStatement preparedStatement = null;
	public static ResultSet resultSet = null;

 	   public static class Map extends MapReduceBase implements Mapper<longwritable,> {
 	     private final static IntWritable one = new IntWritable(1);
		
 	     public static Text word = new Text();
 	
 	     public void map(LongWritable key, Text value, OutputCollector<text,> output, Reporter reporter) throws IOException  {
	
		try
		{
			Class.forName("com.mysql.jdbc.Driver");
			Con= DriverManager.getConnection("jdbc:mysql://<<ip_add>>:3306/test","user","mysql");
				 statement = Con.createStatement();
			
			     resultSet = statement.executeQuery("select * from test.a1");
			     
			    
			     while(resultSet.next())
			     {
			        word.set(resultSet.getString("aname"));
				output.collect(word,one);
			     }
		
		}
	catch (ClassNotFoundException e)
		{
		System.out.println("no MYSQL Driver found");
		 word.set(e.toString());
		 output.collect(word,one);
		}
	catch (SQLException e)
	{
		 word.set(e.toString());
		 output.collect(word,one);
	}
 	      
 	     }
 	   }
 	
 	   public static class Reduce extends MapReduceBase implements Reducer<text,> {
 	     public void reduce(Text key, Iterator<intwritable> values, OutputCollector<text,> output, Reporter reporter) throws IOException {
 	       int sum = 0;
 	       while (values.hasNext()) {
 	         sum += values.next().get();
 	       }

 	       output.collect(key, new IntWritable(sum));
 	     }
 	   }
 	
 	   public static void main(String[] args) throws Exception {
 	     JobConf conf = new JobConf(WordCount1.class);
 	     conf.setJobName("wordcount");
 	
 	     conf.setOutputKeyClass(Text.class);
 	     conf.setOutputValueClass(IntWritable.class);
 	
 	     conf.setMapperClass(Map.class);
 	     conf.setCombinerClass(Reduce.class);
 	     conf.setReducerClass(Reduce.class);
 	
 	     conf.setInputFormat(TextInputFormat.class);
 	     conf.setOutputFormat(TextOutputFormat.class);
 	
 	     FileInputFormat.setInputPaths(conf, new Path(args[0]));
 	     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
 	
 	     JobClient.runJob(conf);
 	   }
 	}

Thanks in Advance,

Praveen K Bandi
Posted
Updated 12-Feb-13 22:54pm
v2

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900