多线程进阶学习02------Future异步任务
迪丽瓦拉
2025-05-31 12:47:32
0

异步任务即开辟分支任务,不阻塞主线程。Tips:异步线程的创建是纳秒级别

FutureTask

创建方式

// 创建任务对象
FutureTask task3 = new FutureTask<>(() -> {return 100;
});
// 参数1 是任务对象; 参数2 是线程名字,推荐
new Thread(task3, "t3").start();
// 主线程阻塞,同步等待 task 执行完毕的结果
Integer result = task3.get();

缺点

1、获取值get阻塞

  • futureTask.get():get方法会阻塞主线程,一般要把get方法放到最后
  • futureTask.get(3,TimeUnit.SECONDS):假如我不愿意等待很长时间,过时不候直接抛出TimeOutException。
    其他程序调用的时候可以捕获超时异常,相当于变相的做了止损操作
FutureTask task = new FutureTask<>(() -> {TimeUnit.SECONDS.sleep(10);return 100;
});
new Thread(task, "t3").start();
System.out.println(task.get());

2、轮训获取值isDone

  • 不断给主线程续命以等待异步线程的返回
  • 徒劳消耗cpu资源
FutureTask task = new FutureTask<>(() -> {TimeUnit.SECONDS.sleep(10);return 100;
});
new Thread(task, "t3").start();
while (!task.isDone()) {System.out.println(task.get());
}

CompletableFuture

原有的FutureTask类,get()方法会导致阻塞,isDone()轮询也占用cpu,并且能用的api较少,对于以上缺点,jdk8推出了CompletableFuture。

  1. 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法
  2. 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作

在这里插入图片描述

案例展示

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----1秒钟后出结果:" + result);return result;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
System.out.println(completableFuture.get());

四种创建方式

CompletableFuture 提供了四个静态方法来创建一个异步操作

  • runAsync方法不支持返回值.适用于多个接口之间没有任何先后关系
  • supplyAsync可以支持返回值,我们一般用supplyAsync来创建

比如在一个方法中,调用6个接口,接口A的结果需要作为接口B的入参,这个时候适合用带返回值的构造

//runAsync方法不支持返回值
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
//supplyAsync可以支持返回值
public static  CompletableFuture supplyAsync(Supplier supplier)
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同

public class CompletableFutureTest {public static void main(String[] args) throws Exception{ThreadPoolExecutor executor = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3));//(1). CompletableFuture.runAsync(Runnable runnable);CompletableFuture future1=CompletableFuture.runAsync(()->{System.out.println(Thread.currentThread().getName()+"*********future1 coming in");});//这里获取到的值是nullSystem.out.println(future1.get());//(2). CompletableFuture.runAsync(Runnable runnable,Executor executor);CompletableFuture future2 = CompletableFuture.runAsync(() -> {//ForkJoinPool.commonPool-worker-9 System.out.println(Thread.currentThread().getName() + "\t" + "*********future2 coming in");}, executor);//(3).public static  CompletableFuture supplyAsync(Supplier supplier)CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {//pool-1-thread-1System.out.println(Thread.currentThread().getName() + "\t" + "future3带有返回值");return 1024;});System.out.println(future3.get());//(4).public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "future4带有返回值");return 1025;}, executor);System.out.println(future4.get());//关闭线程池executor.shutdown();}
}

获取结果

  1. public T get( ) 不见不散(会抛出异常) 只要调用了get( )方法,不管是否计算完成都会导致阻塞
  2. public T get(long timeout, TimeUnit unit) 过时不候
  3. public T getNow(T valuelfAbsent):没有计算完成的情况下,给我一个替代结果计算完,返回计算完成后的结果、没算完,返回设定的valuelfAbsent
  4. public T join( ):join方法和get( )方法作用一样,不同的是,join方法不抛出异常
  5. public boolean complete(T value) 是否打断get方法立刻返回括号值
private static void group1() throws InterruptedException, ExecutionException{CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "abc";});//System.out.println(completableFuture.get());//System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));//System.out.println(completableFuture.join());//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }//System.out.println(completableFuture.getNow("xxx"));System.out.println(completableFuture.complete("completeValue")+"\t"+completableFuture.get());}

处理计算结果

  1. public CompletableFuture thenApply 计算结果存在依赖关系,这两个线程串行化
    由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
  2. public CompletableFuture handle(BiFunction fn):有异常也可以往下一步走,根据带的异常参数可以进一步处理
  3. public CompletableFuture whenComplete( BiConsumer action); 任务完成或者异常时运行action
  4. public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor); 任务完成或者异常时运行action,j可配置线程池
  5. public CompletableFuture exceptionally(Function fn); 有异常本次就会执行,否则不执行
ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() ->{//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1;},threadPool).handle((f,e) -> {int i=10/0;System.out.println("222");return f + 2;}).handle((f,e) -> {System.out.println("333");return f + 3;}).whenComplete((v,e) -> {if (e == null) {System.out.println("----计算结果: "+v);}}).exceptionally(e -> {e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"----主线程先去忙其它任务");threadPool.shutdown();

whenComplete与handle的区别在于,它不参与返回结果的处理,把它当成监听器即可
即使异常被处理,在CompletableFuture外层,异常也会再次复现

消费计算结果

  1. thenRun(Runnable runnable) 任务A执行完执行B,并且B不需要A的结果
  2. CompletableFuture thenAccept(Consumer action) 任务A执行完成执行B,B需要A的结果,但是任务B无返回值
  3. public CompletableFuture thenApply(Function fn) 任务A执行完成执行B,B需要A的结果,同时任务B有返回值
        CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f+2;}).thenApply(f -> {return f+3;}).thenAccept(r -> System.out.println(r));// 任务A执行完执行B,并且B不需要A的结果System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());// 任务A执行完成执行B,B需要A的结果,但是任务B无返回值System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());// 任务A执行完成执行B,B需要A的结果,同时任务B有返回值System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());

对计算速度进行选用

public CompletableFuture applyToEither(CompletionStage other, Function fn)
这个方法表示的是,谁快就用谁的结果,类似于我们在打跑得快,或者麻将谁赢了就返回给谁

        //这个方法表示的是,谁快就用谁的结果,类似于我们在打跑得快,或者麻将谁赢了就返回给谁//public  CompletableFuture applyToEither(CompletionStage other, Function fn);//下面这个在第一个中停留1s,在第二种停留2s,返回的结果是1System.out.println(CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1);  } catch (InterruptedException e) {e.printStackTrace();}return 1;}).applyToEither(CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2);  } catch (InterruptedException e) {e.printStackTrace();}return 2;}), r -> {return r;}).join());//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值

对计算结果进行合并

public CompletableFuture thenCombine(CompletionStage other,BiFunction fn)
两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理
先完成的先等着,等待其他分支任务

        //public  CompletableFuture thenCombine//(CompletionStage other,BiFunction fn)//两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理//先完成的先等着,等待其他分支任务System.out.println(CompletableFuture.supplyAsync(() -> {return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {return 20;}), (r1, r2) -> {return r1 + r2;}).thenCombine(CompletableFuture.supplyAsync(() -> {return 30;}), (r3, r4) -> {return r3 + r4;}).join());System.out.println(CompletableFuture.supplyAsync(() -> {return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {return 20;}), (r1, r2) -> {return r1 + r2;}).join());

两任务组合,都要完成

        CompletableFuture.supplyAsync(() -> {return 10;}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {return 20;}), (r1, r2) -> {System.out.println(r1);//10System.out.println(r2);//20});

多任务组合

(public static CompletableFuture allOf(CompletableFuture… cfs))
allOf:等待所有任务完成

(public static CompletableFuture anyOf(CompletableFuture… cfs))
anyOf:只要有一个任务完成

 CompletableFuture futureImg = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品的图片信息");return "hello.jpg";});CompletableFuture futureAttr = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品的属性");return "黑色+256G";});CompletableFuture futureDesc = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(3);  } catch (InterruptedException e) {e.printStackTrace();}System.out.println("查询商品介绍");return "华为";});//需要全部完成
//        futureImg.get();
//        futureAttr.get();
//        futureDesc.get();//CompletableFuture all = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);//all.get();CompletableFuture anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);anyOf.get();System.out.println(anyOf.get());System.out.println("main over....."); 

案例演示

电商比价需求
同一款产品,同时搜索出同款产品在各大电商的售价;
同一款产品,同时搜索出本产品在某一个电商平台下,各个入驻门店的售价是多少
出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
in jd price is 88.05
in pdd price is 86.11
in taobao price is 90.43

public class CompletableFutureNetMallDemo {static List list = Arrays.asList(new NetMall("jd"),new NetMall("pdd"),new NetMall("taobao"),new NetMall("dangdangwang"),new NetMall("tmall"));//同步 ,step by step/*** List  ---->   List* @param list* @param productName* @return*/public static List getPriceByStep(List list,String productName) {return list.stream().map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(),netMall.calcPrice(productName))).collect(Collectors.toList());}//异步 ,多箭齐发/*** List  ---->List> --->   List* @param list* @param productName* @return*/public static List getPriceByASync(List list,String productName) {return list.stream().map(netMall ->CompletableFuture.supplyAsync(() ->String.format(productName + " is %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();List list1 = getPriceByStep(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");System.out.println();long startTime2 = System.currentTimeMillis();List list2 = getPriceByASync(list, "mysql");for (String element : list2) {System.out.println(element);}long endTime2 = System.currentTimeMillis();System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");}
}@Data
@AllArgsConstructor
class NetMall {private String mallName;public double calcPrice(String productName) {//检索需要1秒钟try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}
/*mysql in jd price is 110.59mysql in pdd price is 110.23mysql in taobao price is 110.04mysql in dangdangwang price is 110.08mysql in tmall price is 109.91----costTime: 5030 毫秒mysql is jd price is 109.07mysql is pdd price is 109.47mysql is taobao price is 109.04mysql is dangdangwang price is 110.09mysql is tmall price is 110.72----costTime: 1021 毫秒
**/

100个任务都返回一个数字且运行都要时间,使用多线程快速求和

三种写法,自我感受

package com.bilibili.juc.study;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;/*** CompletableFuture 多任务合并*/
public class Test {public static void main(String[] args) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ArrayList lists = new ArrayList<>();for (int i = 1; i <= 100; i++) {lists.add(1);}System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","100");int sum = lists.stream().parallel().mapToInt(integer -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return integer;}).sum();System.out.println(sum);long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));}public static void fangshi1() {long start = System.currentTimeMillis();ExecutorService threadPool = Executors.newFixedThreadPool(50);ArrayList lists = new ArrayList<>();for (int i = 1; i <= 100; i++) {lists.add(1);}int sum = lists.stream().map(integer -> CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return integer;}, threadPool)).collect(Collectors.toList()).stream().map(CompletableFuture::join).mapToInt(value -> value).sum();System.out.println(sum);long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));threadPool.shutdown();}public static void fangshi2(){long start = System.currentTimeMillis();ExecutorService threadPool = Executors.newFixedThreadPool(100);ArrayList lists = new ArrayList<>();for (int i = 1; i <= 100; i++) {lists.add(1);}List> completableFutureList = lists.stream().map(integer -> CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return integer;}, threadPool)).collect(Collectors.toList());CompletableFuture completableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));CompletableFuture> listCompletableFuture = completableFuture.thenApply(v -> {return completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());});CompletableFuture longCompletableFuture = listCompletableFuture.thenApply(list -> {return list.stream().count();});System.out.println(longCompletableFuture.join());long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));threadPool.shutdown();}
}}

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...