Click here to Skip to main content
Click here to Skip to main content

Getting Started With ForkJoinPool - Map & Reduce of Java

By , 25 May 2012
 

ForkJoinPool - Java's Map & Reduce


ForkJoinPool ::FJP is a executor service for Fork-Join Taks or tasks which can computed using divide and conqor or we can say FJP is inbuild Map & Reduce framework in Java.
               FJP implements work-steal so all threads try to find and execute task submitted by other tasks in FJP or external program. FJP try to maintain active threads as per number of processor available.
  FJP provides below methods to submit task ::==>
<style>.xl1522360 {padding-top:1px; padding-right:1px; padding-left:1px; mso-ignore:padding; color:black; font-size:11.0pt; font-weight:400; font-style:normal; text-decoration:none; font-family:Calibri, sans-serif; mso-font-charsetBlush | :O ; mso-number-format:General; text-align:general; vertical-align:bottom; mso-background-source:auto; mso-pattern:auto; white-space:nowrap;} #Book1_22360 td { border:1px solid green; } </style>
Call Type External Task Clients i.e. main method Inner Task Divide and Conquer Calls
Call from non-fork/join clients  Call from within fork/join computations
Arrange async execution  execute(ForkJoinTask)  ForkJoinTask.fork()
Await and obtain result  invoke(ForkJoinTask)  ForkJoinTask.invoke()
Arrange exec and obtain Future  submit(ForkJoinTask)  ForkJoinTask.fork() (ForkJoinTasks are Futures)


Pre-requiste :: FJP is in-builted in Java 7. You can download and install the same from Oracle.
You can either configure Java in your system path and also set JAVA_HOME to home directory of JDK installation.
Or you can use Eclipse and configure JRE in eclipse.

You can also use FJP with Java 6 but you need to do some configuration to make it work.
1st you need to download JSR166 from here.
Add JSR166 in your classpath in eclipse or using -cp when compling 
And when running your program you need to pass VM Arguments as -Xbootclasspath/p:jsr166.jar in eclipse or using Java command line utility.

Getting Started ::
Classes which we use in our example
ForkJoinPool :: ForkJoinPool is used to create a pool of threads. You can pass number of processers as arguments.
RecursiveTask<V> :: RecursiveTask implements ForkJoinTask and its compute method computes the result and return a result of type V.
RecursiveAction  :: Similar to RecursiveTask but its compute method doesn't return anything means void.
ForkJoinTask     :: Superclass of RecursiveAction and RecursiveTaks. Provides methods like fork, join, invokeAll to create subtaks from current task and submit to Pool.

Creating ForkJoinPool

You can create ForkJoinPool as
ForkJoinPool fjp = new ForkJoinPool(); // Use all available processors
or
ForkJoinPool fjp = new ForkJoinPool(numperOfProcessors); // Use number of processors passed

Creating RecursiveTask<V>
In RecursiveTask compute method is similar to run method of Thread/Runnable and call method of Callable interface.
So we will not call compute method ourselves. But all logic to divide big task into smaller task will be in compute method.
You need to create a subclass of RecursiveTask<V> to start get going. Also we need to pass data to newly created class.
So best way is to create parameterized constuctor. And Store the data at instance level. So compute method can access the data and work on it.
 
Code With Comments 

package com.thekarna.fjp;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * This class glues all code required for FJP Demo.
 * In many cases given implementation can perform worse then single threaded max finder
 * So don't use this to benchmark, 
 * this is only a Hello World Kind of program to get started with FJP   
 * @author DVG 
 *
 */
public class MaxFinderFJP {
	
	/**
	 * This class extends RecursiveTask class
	 * RecursiveTask class is quite similar to Thread/Runnable/Callable in context to ExecutorService
	 * And compute method of this class is similar to run/call method of Runnable/Callable
	 * compute method is the place where you will write your problem solution and also logic to divide & conquer
	 * You can also extend RecursiveAction but that return void
	 * @author DVG
	 *
	 * @param <T>
	 */
	@SuppressWarnings("all")
	private static class MaxFinderTask<T> extends RecursiveTask<T> {
		private static final int THRESHOLD = 10000;
		private T targetArray[] = null;
		private int low;
		private int high;
		
		/**
		 * Parameterized so the callee can pass data
		 * @param arr
		 */
		public MaxFinderTask(T arr[]) {
			targetArray = arr;
			low = 0;
			high = targetArray.length;
		}
		
		private MaxFinderTask(T arr[], int low, int high) {
			targetArray = arr;
			this.low = low;
			this.high = high;
		}

		/**
		 * Bread & Butter for ForkJoinPool 
		 * All Logic Go Below
		 * @return
		 */
		@Override
		protected T compute() {
			
			/*
				If Task is small then compute it normally as diving more then required will
				result in negative way
			 */
			if (high - low <=MaxFinderTask.THRESHOLD) {
				return this.computeNormal();
			}
			int mid = (high - low) / 2;
			T result = null;
			T result2 = null;
			//Divide The Big Task in Smaller Task So All Processor Can Get There Share
			MaxFinderTask<T> leftTask = new MaxFinderTask<>(this.targetArray, low, low+mid);
			MaxFinderTask<T> rightTask = new MaxFinderTask<>(this.targetArray, low+mid, high);
			//Fork 1st Task, Fork is a non-blocking call so current thread will move ahead.
			leftTask.fork();
			//Call compute directly which will result in again divide & conquer
			result2 = rightTask.compute();
			//Calling Join Result in blocking call but Divide & conquer will go ahead in newly created leftTask
			result = leftTask.join();
			//Merge Results and Return
			return ((Comparable) result).compareTo(((Comparable) result2)) > -1 ? result
					: result2;
		}
		
		/**
		 * This Method Found Max in Normal Way
		 * @return
		 */
		protected T computeNormal() {
			if (this.targetArray.length == 1) {
				return this.targetArray[0];
			}
			Comparable cMax =  (Comparable)this.targetArray[0];
			for(int i=low; i<high; i++) {
				Comparable obj =  (Comparable)this.targetArray[i];
				if(obj.compareTo(cMax) > 0) {
					cMax = obj;
				}
			}
			return (T)cMax;
		}
	}

	/**
	 * This is where all things get going on
	 * @param args
	 */
	public static void main(String[] args) {
		//Create a array of millions/billion entries
		Integer d[] = new Integer[100_000_0];
		
		//Fill it with random data
		for (int i = 0; i < d.length; i++) {
			Random r = new Random(System.nanoTime());
			d[i] = r.nextInt();
		}
		final long startTime = System.currentTimeMillis();
		//Create a ForkJoinPool
		final ForkJoinPool forkJoinPool = new ForkJoinPool();
		
		//Create Object of MaxFinderTask and Call Invoke which is a blocking call
		//You can also use submit method and can use submit which returns Future
		//final Future<Integer> futureResult = forkJoinPool.submit(new MaxFinderTask<Integer>(d));
		final Integer result = forkJoinPool.invoke(new MaxFinderTask<Integer>(d));
		//final Future<Integer> futureResult = forkJoinPool.submit(new MaxFinderTask<Integer>(d));
		forkJoinPool.shutdown();
		
		System.out.println("Max == " + result);
		System.out.println("Time Taken FJP == " + (System.currentTimeMillis() - startTime));

	}
}				

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

TheDhruv

United States United States
No Biography provided

Sign Up to vote   Poor Excellent
Add a reason or comment to your vote: x
Votes of 3 or less require a comment

Comments and Discussions

 
You must Sign In to use this message board.
Search this forum  
    Spacing  Noise  Layout  Per page   
GeneralMy vote of 5memberTheDhruv27-May-12 3:28 
I am Author Smile | :)

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Permalink | Advertise | Privacy | Mobile
Web02 | 2.6.130617.1 | Last Updated 25 May 2012
Article Copyright 2012 by TheDhruv
Everything else Copyright © CodeProject, 1999-2013
Terms of Use
Layout: fixed | fluid