Java ForkJoin

在大数据的 Mapreduce 中,其运算方式就是分而治之,把一个复杂的大的运算任务分成若干个小任务,单独计算后进行汇总得到结果,JDK1.7 的 ForkJoin(Fork 拆分,Join 合并)就是这种思想,分而治之,是一个**多线程并行(不是并发)**处理框架

并发和并行

ForkJoin 特点

forkjoin 和多个 thread 有啥区别 ?

  • ForkJoin 是分而治之,把大任务递归分解成多个小任务,能高效利用多个 CPU,而多改个 thread 则没有递归分解,线程独立。
  • ForkJoin 可以进行任务窃取,即当某个线程完成了自己的任务后,它会去窃取其他线程的任务来执行,自动负载均衡,而多个 thread 就做不到,需要手动平衡。
  • ForkJoin 有ForkJoinPool 线程池,可以自适应,会根据 cpu 核心动态调整,而多 thread 设置线程池的话就要手动设置参数,并且要处理线程生命周期。

总之 ForkJoin 是基于任务分解的并行框架,在处理任务时有一定的自动平衡,相对多 thread 不需要很多手动控制管理。

Forkjoin 原理

ForkJoin 框架是从 jdk1.7 中引入的新特性,和 ThreadPoolExecutor 一样,也实现了 Executor 和 ExecutorService 接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,默认值是计算机可用的 CPU 数量。

ForkJoinPool 能够使用相对较少的线程来处理大量的任务。比如要对 1000 万个数据进行排序,那么会将这个任务分割成两个 500 万的排序任务和一个针对这两组 500 万数据的合并任务。以此类推,对于 500 万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于 10 时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概 200 万+个。问题的关键在于,对于一个父任务而言,只有当它所有的子任务完成之后,父任务才能够被执行

所以如果使用 ThreadPoolExecutor+分治法会存在问题,因为 ThreadPoolExecutor 中的线程做不到父子间的任务关系,而使用 ForkJoinPool 就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

那么使用 ThreadPoolExecutor 或者 ForkJoinPool,性能上会有什么差异呢?

首先,使用 ForkJoinPool 能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用 4 个线程来完成超过 200 万个任务。但是,使用 ThreadPoolExecutor 时,是不可能完成的,因为 ThreadPoolExecutor 中的 Thread 无法选择优先执行子任务,需要完成 200 万个具有父子关系的任务时,也需要 200 万个线程,很显然这是不可行的,也是很不合理的!!

ForkJoin 局限性  

  • 任务只能使用 Fork 和 Join 操作来进行同步机制,如果使用了其他同步机制,则在同步操作时,工作线程就不能执行其他任务了。比如,在 Fork/Join 框架中,使任务进行了睡眠,那么,在睡眠期间内,正在执行这个任务的工作线程将不会执行其他任务了
  • 在 Fork/Join 框架中,所拆分的任务适合能快速执行并且不会被长时间阻塞的计算操作,不适合执行 IO 操作,比如:读写数据文件。因为ForkJoinPool 依赖于工作窃取机制来保持线程繁忙,当线程在执行 I/O 操作时被阻塞,它无法被其它线程窃取任务,从而导致资源利用不充分,降低整体效率。
  • 任务不能抛出检查异常,必须通过必要的代码来处理这些异常。

Demo 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
            // 执行子任务
            leftTask.fork();
            rightTask.fork();
            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();
        //生成一个计算任务,计算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);
        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

CompletableFuture+ThreadPoolExecutor VS  ForkJoin

首先 ForkJoinPool 的特点是分而治之、父子任务、工作窃取,适用于大量计算密集型的任务,例如分治算法、图像处理、排序、递归计算等

我们知道 CompletableFuture 的 api 如thenApplythenCombinethenCompose可以将多个异步任务进行组合和联动,从而实现任务分解和组合。但是需要手动分解,并且没有显示的父子关系,不能做到自动的任务拆分调度,适用于异步编程和需要串联多个异步任务的场景,例如处理 IO 密集型任务、异步 API 调用、任务链等。

所以说CompletableFuture 更适合异步任务编排,没有强烈的递归性质的任务。ForkJoin 更适合 CPU 计算的任务,如果需要大量 io 读写,那么不建议 forkJoin。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计