Software Development, Product to Market
  • Home
  • Blog
  • Projects
  • Resources
    • My Discovery Log
    • Books I Read
    • Blogs I Visit
    • Tools I Use
  • Home
  • Blog
  • Projects
  • Resources
    • My Discovery Log
    • Books I Read
    • Blogs I Visit
    • Tools I Use

ForkJoinPool Example (Java Concurrency - Part 2)

7/10/2013

0 Comments

 
In Java concurrency, ForkJoinPool uses a work-stealing multi-threading framework which works well for executing tasks of uneven distribution of chunk sizes. Before you read on, you need to know Java Multithreading Approaches first.

A ForkJoinPool implements ExecutorService but it differs from other (I'll refer them as 'traditional') ExecutorService mainly by virtue of employing work-stealing - all threads in the pool attempt to find and execute subtasks created by other active tasks. It automatically balance the task load between threads, while traditional ThreadPoolExecutor has no mechanism for such kind of load balancing. If no available worker thread is available, tasks will be blocked until a thread becomes available to steal work from those workers who are busy.

ForkJoinPool is an implementation of the Divide and Conquer algorithm in which a central ForkJoinPool executes branching ForkJoinTasks. A ForkJoinTask is a thread-like entity but is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool. Because ForkJoinPool is an ExecutorService, its logic is a kind of 'submit a callable' approach in multithreading programming. It,
  1. separates (forks) each large task into smaller tasks; 
  2. processes each smaller task in a separate thread (separating those into even smaller tasks if necessary); 
  3. joins the results.
If so, how does ForkJoinPool differ from traditional 'summit a callable' approach introduced since Java 5?

  • Fork/Join does work-stealing, whereas traditional ExecutorService does not.
  • ForkJoinTasks in ForkJoinPool are lighter than threads in traditional ExecutorService (thread pool). In Fork/Join, a large number of tasks can be hosted by a smaller number of threads because of work-stealing.
  • Think of ForkJoinPool as a pool of smaller tasks, whereas traditional ExecutorService as a pool of threads.

To divvy up a bigger task into smaller ones, you extend RecursiveTask and implement a compute() method as follows. Inside compute(), you divide and conquer, then return the result after join. In RecursiveTask compute() is similar to run() method of Thread/Runnable and call() method of Callable interface. For example, the following example recursively execute sub-tasks to calculate Fibonacci series:
import java.util.concurrent.*; 

class FibonacciSeriesGeneratorTask extends RecursiveTask {
  private Integer index = 0;
  public FibonacciSeriesGeneratorTask(Integer index) {
    this.index = index;
  }
 
  @Override
  protected Integer compute() { // 1
    if (index == 0) {
      return 0;
    }
    if (index < 2) {
      return 1;
    }
    final FibonacciSeriesGeneratorTask worker1 = new FibonacciSeriesGeneratorTask(index - 1);
    worker1.fork();
    final FibonacciSeriesGeneratorTask worker2 = new FibonacciSeriesGeneratorTask(index - 2);
   
    return worker2.compute() + worker1.join();
  }
}


public class ThreadDemo{
  public static void main(String args[]){

    int index = 100; // to calculate 100th element of Fibonacci Series
    int poolSize = Runtime.getRuntime().availableProcessors();
    ForkJoinPool pool = new ForkJoinPool(poolSize);   

    Integer result = pool.invoke(new 

FibonacciSeriesGeneratorTask(index)); // 2
  }
}

[1] The main computation performed by this task. You must define this method, but you should not in general call it directly. Implement compute() as if it is a recursive function that has en ending condition. The compute() of a RecursiveTask returns a V that is introduced in the generalized form of the declared RecursiveTask<V>.

[2] Performs the given ForkJoinTask task, returning its result upon completion and return a V. This V is what compute() method of the RecursiveTask returns. Usually, more tasks were invoked from within compute(). A ForkJoinTask is a thread-like entity that plays similar role as Future thus can be thought as a lightweight form of Future.

Let's zero in on to the compute() method, it is the where you divvy up bigger tasks into smaller ones and invoke to execute each task:
  @Override
  protected Integer compute() {
    ...
    RecursiveTask worker1 = new ConcreteRecursiveTask();
    worker1.fork();                               // 1
    RecursiveTask worker2 = new ConcreteRecursiveTask();
    return worker2.compute() + worker1.join();    // 2
  }
[1] fork() allows a new ForkJoinTask (worker1) to be launched from an existing one.
[2] join() allows a ForkJoinTask (existing one) to wait for the completion of another one (worker1).

or,
  @Override
  protected Integer compute() {

    List<RecursiveTask> actions = new ArrayList<RecursiveTask>();
    ...
    for (int i = 0; i < ...; i++) {
      ...
      actions.add(new ConcreteRecursiveTask();
    }
    invokeAll(actions);      // 1
    
    Integer sum = 0;
    try { 
      for(RecursiveTask action: actions){
        sum += action.get(); // 2
      } 
    catch (Exception ex) { 
      ex.printStackTrace(); 
    }
  }
[1]  When a task calls the invokeAll() method it waits until the tasks sent to execute through this method finish.
[2] The value from the subtasks is obtained with the get() method from the Future interface.

performance comparison - ForkJoinPool vs. ThreadPoolExecutor

With an unevenly distributed workload among tasks/threads, the ForkJoinPool achieves better results, while the traditional ExecutorService suffers under the uneven distribution. However, using ForkJoinPool, if the tasks are broken up into sub-tasks that are too small, performance will suffer. (see this benchmark)

conclusion

The Fork/Join library introduced in Java 7 extends the Java concurrency package (ExecutorService) introduced in Java 5 with support for multicore hardware parallelism. Class ForkJoinPool implements Executor and ExecutorService interfaces. It is isn't intended to replace older Java Concurrency classes (e.g. ExecutorService); instead it updates and completes them for dealing with uneven task distributions.

what to read next

  • Java Multithreading Approaches (Java Concurrency - Part 1)
  • Non-Blocking Function (Java Concurrency - Part 3)

references

  • When to use ForkJoinPool vs. ExecutorService?
  • The Fork/Join Framework in Java 7
  • Parallel Processing with ForkJoinPool
  • Java Concurrency Part 6
0 Comments



Leave a Reply.

    Categories

    All
    Algorithm
    Concurrency
    CQ
    Data Structure
    Design Pattern
    Developer Tool
    Dynamic Programming
    Entrepreneur
    Functional Programming
    IDE
    Java
    JMX
    Marketing
    Marklogic
    Memory
    OSGI
    Performance
    Product Management
    Security
    Services
    Sling
    Social Media Programming
    Software Development
    Startup Skills

    Feed Widget

    Archives

    March 2018
    February 2018
    December 2017
    March 2017
    November 2016
    June 2016
    May 2016
    April 2016
    October 2015
    September 2015
    August 2015
    September 2014
    July 2014
    June 2014
    May 2014
    March 2014
    January 2014
    December 2013
    November 2013
    October 2013
    September 2013
    August 2013
    July 2013
    June 2013

    RSS Feed

in loving memory of my mother  and my 4th aunt
Photos used under Creative Commons from net_efekt, schani, visnup, Dan Zen, gadl, bobbigmac, Susana López-Urrutia, jwalsh, Philippe Put, michael pollak, oskay, Creative Tools, Violentz, Kyknoord, mobilyazilar