CodeProject ForkJoinPool - Java's Map & Reduce
ForkJoinPool ::FJP is a executor service for Fork-Join Taks or tasks which can computed using divide and conqor or we can say FJP is inbuild Map & Reduce framework in Java. FJP implements work-steal so all threads try to find and execute task submitted by other tasks in FJP or external program. FJP try to maintain active threads as per number of processor available. FJP provides below methods to submit task ::==><style>.xl1522360 {padding-top:1px; padding-right:1px; padding-left:1px; mso-ignore:padding; color:black; font-size:11.0pt; font-weight:400; font-style:normal; text-decoration:none; font-family:Calibri, sans-serif; mso-font-charset

; mso-number-format:General; text-align:general; vertical-align:bottom; mso-background-source:auto; mso-pattern:auto; white-space:nowrap;} #Book1_22360 td { border:1px solid green; } </style>
| Call Type | External Task Clients i.e. main method | Inner Task Divide and Conquer Calls |
| Call from non-fork/join clients | Call from within fork/join computations |
| Arrange async execution | execute(ForkJoinTask) | ForkJoinTask.fork() |
| Await and obtain result | invoke(ForkJoinTask) | ForkJoinTask.invoke() |
| Arrange exec and obtain Future | submit(ForkJoinTask) | ForkJoinTask.fork() (ForkJoinTasks are Futures) |
| | |
Pre-requiste :: FJP is in-builted in Java 7. You can download and install the same from Oracle. You can either configure Java in your system path and also set JAVA_HOME to home directory of JDK installation. Or you can use Eclipse and configure JRE in eclipse. You can also use FJP with Java 6 but you need to do some configuration to make it work. 1st you need to download JSR166 from here. Add JSR166 in your classpath in eclipse or using -cp when compling And when running your program you need to pass VM Arguments as -Xbootclasspath/p:jsr166.jar in eclipse or using Java command line utility. Getting Started :: Classes which we use in our example ForkJoinPool :: ForkJoinPool is used to create a pool of threads. You can pass number of processers as arguments. RecursiveTask<V> :: RecursiveTask implements ForkJoinTask and its compute method computes the result and return a result of type V. RecursiveAction :: Similar to RecursiveTask but its compute method doesn't return anything means void. ForkJoinTask :: Superclass of RecursiveAction and RecursiveTaks. Provides methods like fork, join, invokeAll to create subtaks from current task and submit to Pool. Creating ForkJoinPool You can create ForkJoinPool as ForkJoinPool fjp = new ForkJoinPool(); // Use all available processors or ForkJoinPool fjp = new ForkJoinPool(numperOfProcessors); // Use number of processors passed Creating RecursiveTask<V> In RecursiveTask compute method is similar to run method of Thread/Runnable and call method of Callable interface. So we will not call compute method ourselves. But all logic to divide big task into smaller task will be in compute method. You need to create a subclass of RecursiveTask<V> to start get going. Also we need to pass data to newly created class. So best way is to create parameterized constuctor. And Store the data at instance level. So compute method can access the data and work on it. Code With Comments
package com.thekarna.fjp;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class MaxFinderFJP {
@SuppressWarnings("all")
private static class MaxFinderTask<T> extends RecursiveTask<T> {
private static final int THRESHOLD = 10000;
private T targetArray[] = null;
private int low;
private int high;
public MaxFinderTask(T arr[]) {
targetArray = arr;
low = 0;
high = targetArray.length;
}
private MaxFinderTask(T arr[], int low, int high) {
targetArray = arr;
this.low = low;
this.high = high;
}
@Override
protected T compute() {
if (high - low <=MaxFinderTask.THRESHOLD) {
return this.computeNormal();
}
int mid = (high - low) / 2;
T result = null;
T result2 = null;
MaxFinderTask<T> leftTask = new MaxFinderTask<>(this.targetArray, low, low+mid);
MaxFinderTask<T> rightTask = new MaxFinderTask<>(this.targetArray, low+mid, high);
leftTask.fork();
result2 = rightTask.compute();
result = leftTask.join();
return ((Comparable) result).compareTo(((Comparable) result2)) > -1 ? result
: result2;
}
protected T computeNormal() {
if (this.targetArray.length == 1) {
return this.targetArray[0];
}
Comparable cMax = (Comparable)this.targetArray[0];
for(int i=low; i<high; i++) {
Comparable obj = (Comparable)this.targetArray[i];
if(obj.compareTo(cMax) > 0) {
cMax = obj;
}
}
return (T)cMax;
}
}
public static void main(String[] args) {
Integer d[] = new Integer[100_000_0];
for (int i = 0; i < d.length; i++) {
Random r = new Random(System.nanoTime());
d[i] = r.nextInt();
}
final long startTime = System.currentTimeMillis();
final ForkJoinPool forkJoinPool = new ForkJoinPool();
final Integer result = forkJoinPool.invoke(new MaxFinderTask<Integer>(d));
forkJoinPool.shutdown();
System.out.println("Max == " + result);
System.out.println("Time Taken FJP == " + (System.currentTimeMillis() - startTime));
}
}