异步任务即开辟分支任务,不阻塞主线程。Tips:异步线程的创建是纳秒级别
创建方式
// 创建任务对象
FutureTask task3 = new FutureTask<>(() -> {return 100;
});
// 参数1 是任务对象; 参数2 是线程名字,推荐
new Thread(task3, "t3").start();
// 主线程阻塞,同步等待 task 执行完毕的结果
Integer result = task3.get();
缺点
1、获取值get阻塞
FutureTask task = new FutureTask<>(() -> {TimeUnit.SECONDS.sleep(10);return 100;
});
new Thread(task, "t3").start();
System.out.println(task.get());
2、轮训获取值isDone
FutureTask task = new FutureTask<>(() -> {TimeUnit.SECONDS.sleep(10);return 100;
});
new Thread(task, "t3").start();
while (!task.isDone()) {System.out.println(task.get());
}
原有的FutureTask类,get()方法会导致阻塞,isDone()轮询也占用cpu,并且能用的api较少,对于以上缺点,jdk8推出了CompletableFuture。
案例展示
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("-----1秒钟后出结果:" + result);return result;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
System.out.println(completableFuture.get());
CompletableFuture 提供了四个静态方法来创建一个异步操作
比如在一个方法中,调用6个接口,接口A的结果需要作为接口B的入参,这个时候适合用带返回值的构造
//runAsync方法不支持返回值
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
//supplyAsync可以支持返回值
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同
public class CompletableFutureTest {public static void main(String[] args) throws Exception{ThreadPoolExecutor executor = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3));//(1). CompletableFuture.runAsync(Runnable runnable);CompletableFuture future1=CompletableFuture.runAsync(()->{System.out.println(Thread.currentThread().getName()+"*********future1 coming in");});//这里获取到的值是nullSystem.out.println(future1.get());//(2). CompletableFuture.runAsync(Runnable runnable,Executor executor);CompletableFuture future2 = CompletableFuture.runAsync(() -> {//ForkJoinPool.commonPool-worker-9 System.out.println(Thread.currentThread().getName() + "\t" + "*********future2 coming in");}, executor);//(3).public static CompletableFuture supplyAsync(Supplier supplier)CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {//pool-1-thread-1System.out.println(Thread.currentThread().getName() + "\t" + "future3带有返回值");return 1024;});System.out.println(future3.get());//(4).public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "future4带有返回值");return 1025;}, executor);System.out.println(future4.get());//关闭线程池executor.shutdown();}
}
private static void group1() throws InterruptedException, ExecutionException{CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "abc";});//System.out.println(completableFuture.get());//System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));//System.out.println(completableFuture.join());//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }//System.out.println(completableFuture.getNow("xxx"));System.out.println(completableFuture.complete("completeValue")+"\t"+completableFuture.get());}
ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(() ->{//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1;},threadPool).handle((f,e) -> {int i=10/0;System.out.println("222");return f + 2;}).handle((f,e) -> {System.out.println("333");return f + 3;}).whenComplete((v,e) -> {if (e == null) {System.out.println("----计算结果: "+v);}}).exceptionally(e -> {e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"----主线程先去忙其它任务");threadPool.shutdown();
whenComplete与handle的区别在于,它不参与返回结果的处理,把它当成监听器即可
即使异常被处理,在CompletableFuture外层,异常也会再次复现
CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f+2;}).thenApply(f -> {return f+3;}).thenAccept(r -> System.out.println(r));// 任务A执行完执行B,并且B不需要A的结果System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());// 任务A执行完成执行B,B需要A的结果,但是任务B无返回值System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());// 任务A执行完成执行B,B需要A的结果,同时任务B有返回值System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T, U> fn)
这个方法表示的是,谁快就用谁的结果,类似于我们在打跑得快,或者麻将谁赢了就返回给谁
//这个方法表示的是,谁快就用谁的结果,类似于我们在打跑得快,或者麻将谁赢了就返回给谁//public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T, U> fn);//下面这个在第一个中停留1s,在第二种停留2s,返回的结果是1System.out.println(CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace();}return 1;}).applyToEither(CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace();}return 2;}), r -> {return r;}).join());//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值
public CompletableFuture thenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn)
两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理
先完成的先等着,等待其他分支任务
//public CompletableFuture thenCombine//(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn)//两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理//先完成的先等着,等待其他分支任务System.out.println(CompletableFuture.supplyAsync(() -> {return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {return 20;}), (r1, r2) -> {return r1 + r2;}).thenCombine(CompletableFuture.supplyAsync(() -> {return 30;}), (r3, r4) -> {return r3 + r4;}).join());System.out.println(CompletableFuture.supplyAsync(() -> {return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {return 20;}), (r1, r2) -> {return r1 + r2;}).join());
两任务组合,都要完成
CompletableFuture.supplyAsync(() -> {return 10;}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {return 20;}), (r1, r2) -> {System.out.println(r1);//10System.out.println(r2);//20});
(public static CompletableFuture allOf(CompletableFuture>… cfs))
allOf:等待所有任务完成
(public static CompletableFuture anyOf(CompletableFuture>… cfs))
anyOf:只要有一个任务完成
CompletableFuture futureImg = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品的图片信息");return "hello.jpg";});CompletableFuture futureAttr = CompletableFuture.supplyAsync(() -> {System.out.println("查询商品的属性");return "黑色+256G";});CompletableFuture futureDesc = CompletableFuture.supplyAsync(() -> {try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}System.out.println("查询商品介绍");return "华为";});//需要全部完成
// futureImg.get();
// futureAttr.get();
// futureDesc.get();//CompletableFuture all = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);//all.get();CompletableFuture
电商比价需求
同一款产品,同时搜索出同款产品在各大电商的售价;
同一款产品,同时搜索出本产品在某一个电商平台下,各个入驻门店的售价是多少
出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
in jd price is 88.05
in pdd price is 86.11
in taobao price is 90.43
public class CompletableFutureNetMallDemo {static List list = Arrays.asList(new NetMall("jd"),new NetMall("pdd"),new NetMall("taobao"),new NetMall("dangdangwang"),new NetMall("tmall"));//同步 ,step by step/*** List ----> List* @param list* @param productName* @return*/public static List getPriceByStep(List list,String productName) {return list.stream().map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(),netMall.calcPrice(productName))).collect(Collectors.toList());}//异步 ,多箭齐发/*** List ---->List> ---> List* @param list* @param productName* @return*/public static List getPriceByASync(List list,String productName) {return list.stream().map(netMall ->CompletableFuture.supplyAsync(() ->String.format(productName + " is %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();List list1 = getPriceByStep(list, "mysql");for (String element : list1) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");System.out.println();long startTime2 = System.currentTimeMillis();List list2 = getPriceByASync(list, "mysql");for (String element : list2) {System.out.println(element);}long endTime2 = System.currentTimeMillis();System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");}
}@Data
@AllArgsConstructor
class NetMall {private String mallName;public double calcPrice(String productName) {//检索需要1秒钟try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}
}
/*mysql in jd price is 110.59mysql in pdd price is 110.23mysql in taobao price is 110.04mysql in dangdangwang price is 110.08mysql in tmall price is 109.91----costTime: 5030 毫秒mysql is jd price is 109.07mysql is pdd price is 109.47mysql is taobao price is 109.04mysql is dangdangwang price is 110.09mysql is tmall price is 110.72----costTime: 1021 毫秒
**/
100个任务都返回一个数字且运行都要时间,使用多线程快速求和
三种写法,自我感受
package com.bilibili.juc.study;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;/*** CompletableFuture 多任务合并*/
public class Test {public static void main(String[] args) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ArrayList lists = new ArrayList<>();for (int i = 1; i <= 100; i++) {lists.add(1);}System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","100");int sum = lists.stream().parallel().mapToInt(integer -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return integer;}).sum();System.out.println(sum);long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));}public static void fangshi1() {long start = System.currentTimeMillis();ExecutorService threadPool = Executors.newFixedThreadPool(50);ArrayList lists = new ArrayList<>();for (int i = 1; i <= 100; i++) {lists.add(1);}int sum = lists.stream().map(integer -> CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return integer;}, threadPool)).collect(Collectors.toList()).stream().map(CompletableFuture::join).mapToInt(value -> value).sum();System.out.println(sum);long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));threadPool.shutdown();}public static void fangshi2(){long start = System.currentTimeMillis();ExecutorService threadPool = Executors.newFixedThreadPool(100);ArrayList lists = new ArrayList<>();for (int i = 1; i <= 100; i++) {lists.add(1);}List> completableFutureList = lists.stream().map(integer -> CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return integer;}, threadPool)).collect(Collectors.toList());CompletableFuture completableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));CompletableFuture> listCompletableFuture = completableFuture.thenApply(v -> {return completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());});CompletableFuture longCompletableFuture = listCompletableFuture.thenApply(list -> {return list.stream().count();});System.out.println(longCompletableFuture.join());long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start));threadPool.shutdown();}
}}