秒懂 Java Fork-Join
admin
2024-05-10 18:37:18
0

fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。

在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。

为了提供有效的并行执行,fork/join 框架使用了一个名为 ForkJoinPool 的线程池,用于管理 ForkJoinWorkerThread 类型的工作线程。

ForkJoinPool 线程池

ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。

工作线程一次只能执行一个任务。

ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )( 或 deque,发音 deck )。

这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。

工作窃取( work-stealing )算法

要怎么解释 「 工作窃取算法 」 呢 ?

简单来说,就是 空闲的线程试图从繁忙线程的 deques 中 窃取 工作

默认情况下,每个工作线程从其自己的双端队列中获取任务。但如果自己的双端队列中的任务已经执行完毕,双端队列为空时,工作线程就会从另一个忙线程的双端队列尾部或全局入口队列中获取任务,因为这是最大概率可能找到工作的地方。

这种方法最大限度地减少了线程竞争任务的可能性。它还减少了工作线程寻找任务的次数,因为它首先在最大可用的工作块上工作。

ForkJoinPool 线程池的实例化

Java 8

在Java 8 中,创建 ForkJoinPool 实例的最简单的方式就是使用其静态方法 commonPool()。

commonPool() 静态方法,见名思义,就是提供了对公共池的引用,公共池是每个 ForkJoinTask 的默认线程池。

根据Oracle 的官方文档,使用预定义的公共池可以减少资源消耗,因为它会阻止每个任务创建一个单独的线程池。

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Java 7

如果要在 Java 7 中实现相同的行为,则需要通过创建 ForkJoinPool 的实例并将其赋值给实用程序类的公共静态字段。

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

使用构造函数实例化 ForkJoinPool 时,可以创建具有指定级别的并行性,线程工厂和异常处理程序的自定义线程池。在上面的示例中,线程池的并行度级别为 2 ,意味着线程池将使用 2 个处理器核心。

然后就可以通过这个公共静态字段轻松的访问 ForkJoinPool 的实例

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

ForkJoinTask

ForkJoinTask 是 ForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的两个子类中的任意一个

1、 任务不返回结果(返回void)的RecursiveAction;
2、 返回值的任务的RecursiveTask

这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。

我们所要做的,就是继承任意一个类,然后实现 compute() 方法。

RecursiveAction 使用示例

出于演示目的,示例当然是尽可能的简单,因此,我们的示例,执行了一个比较荒谬的任务:将输入转为大写并记录。

所有的代码如下所示

public class CustomRecursiveAction extends RecursiveAction {private String workload = "";private static final int THRESHOLD = 4;private static Logger logger = Logger.getAnonymousLogger();public CustomRecursiveAction(String workload) {this.workload = workload;}@Overrideprotected void compute() {if (workload.length() > THRESHOLD) {ForkJoinTask.invokeAll(createSubtasks());} else {processing(workload);}}private List createSubtasks() {List subtasks = new ArrayList<>();String partOne = workload.substring(0, workload.length() / 2);String partTwo = workload.substring(workload.length() / 2, workload.length());subtasks.add(new CustomRecursiveAction(partOne));subtasks.add(new CustomRecursiveAction(partTwo));return subtasks;}private void processing(String work) {String result = work.toUpperCase();logger.info("This result - (" + result + ") - was processed by "+ Thread.currentThread().getName());}
}

在这个示例中,我们使用了一个字符串类型 ( String ) 的名为 workload 属性来表示要处理的工作单元。

同时,为了演示 fork/join 框架的 fork 行为,在该示例中,如果 workload.length() 大于指定的阈值,那么就使用 createSubtask() 方法拆分任务。

在createSubtasks() 方法中,输入的字符串被递归地划分为子串,然后创建基于这些子串的 CustomRecursiveTask 实例。

当递归分割字符串完毕时,createSubtasks() 方法返回 List 作为结果。

然后在compute() 方法中使用 invokeAll() 方法将任务列表提交给 ForkJoinPool 线程池。

我们来总结下创建 RecursiveAction 的步骤:

1、 创建一个表示工作总量的对象;
2、 选择合适的阈值;
3、 定义分割工作的方法;
4、 定义执行工作的方法;

类似的,我们可以使用相同的方式开发自己的 RecursiveAction 类。

RecursiveTask 使用示例

对于有返回值的任务,除了将每个子任务的结果在一个结果中合并,其它逻辑和 RecursiveAction 都差不多。

public class CustomRecursiveTask extends RecursiveTask {private int[] arr;private static final int THRESHOLD = 20;public CustomRecursiveTask(int[] arr) {this.arr = arr;}@Overrideprotected Integer compute() {if (arr.length > THRESHOLD) {return ForkJoinTask.invokeAll(createSubtasks()).stream().mapToInt(ForkJoinTask::join).sum();} else {return processing(arr);}}private Collection createSubtasks() {List dividedTasks = new ArrayList<>();dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, 0, arr.length / 2)));dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, arr.length / 2, arr.length)));return dividedTasks;}private Integer processing(int[] arr) {return Arrays.stream(arr).filter(a -> a > 10 && a < 27).map(a -> a * 10).sum();}
}

在上面这个示例中,任务由存储在 CustomRecursiveTask 类的 arr 字段中的数组表示。

createSubtask() 方法递归地将任务划分为较小的工作,直到每个部分小于阈值。然后,invokeAll()方法将子任务提交给公共拉取并返回 Future 列表。

要触发执行,需要为每个子任务调用 join() 方法。

上面这个示例中,我们使用了 Java 8 的流 ( Stream ) API , sum() 方法用于将子结果组合到最终结果中。

将任务提交到 ForkJoinPool 线程池中

只要使用很少的方法,就可以把任务提交到 ForkJoinPool 线程池中。

1、 submit()或execute()方法;

这两个方法的调用方式是相同的

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

2、 使用invoke()方法fork任务并等待结果,不需要任何手动连接(join);

int result = forkJoinPool.invoke(customRecursiveTask);

3、 invokeAll()方法是将ForkJoinTasks序列提交给ForkJoinPool的最方便的方法它将任务作为参数(两个任务,varargs或集合),fork它们,并按照生成它们的顺序返回Future对象的集合;
4、 或者,我们还可以使用单独的fork()和join()方法;

  • fork() 方法将任务提交给线程池,但不会触发任务的执行。
  • join() 方法则用于触发任务的执行。在 RecursiveAction 的情况下,join() 返回 null,但对于 RecursiveTask ,它返回任务执行的结果。

customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();

上面的RecursiveTask 示例中,我们使用 invokeAll() 方法向线程池提交一系列子任务。同样的工作,也可以使用 fork() 和 join() 来完成,但这可能会对结果的排序产生影响。

为了避免混淆,当涉及到多个任务且要保证任务的顺序时,通常都是使用 ForkJoinPool.invokeAll() 。

结束语

使用fork/join 框架可以加速处理大型任务,但要实现这一结果,应遵循一些指导原则:

  • 使用尽可能少的线程池。绝大多数情况下,最好的决定是每个应用程序或系统只使用一个线程池。 (是线程池而不是线程)。
  • 当不需要任何调整时,使用默认的公共线程池。
  • 使用合理的阈值。将 ForkJoingTask 任务拆分为子任务。
  • 避免在 ForkJoingTasks 中出现任何阻塞

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...