在常见的异步执行问题中,往往会出现场景为,需要获取多个异步执行的结果,再进行操作,那么对于线程池这种线程,执行的void execute()是不存在方法返回值的,为了解决这个问题,Java提供了三个Submit()方法和一个FutureTask工具类来获取任务执行的结果
首先是三个submit()方法
// 提交 Runnable 任务
Future<?> submit(Runnable task); // 提交 Callable 任务 <T> Future<T> submit(Callable<T> task); // 提交 Runnable 任务及结果引用 <T> Future<T> submit(Runnable task, T result); |
返回值是一个Future接口
Future中内含的API有
// 取消任务
boolean cancel( boolean mayInterruptIfRunning); // 判断任务是否已取消 boolean isCancelled(); // 判断任务是否已结束 boolean isDone(); // 获得任务执行结果 get(); // 获得任务执行结果,支持超时 get(long timeout, TimeUnit unit); |
然后是Runnable和Callable的区别
1.提交的是一个Runnable接口,Runnable是没有返回值的,所以这个返回的Future只能用于判断任务已经结束
2.提交的是一个Callable接口,只有一个call方法,是有返回值的,get可以直接使用
3.提交的是Runable接口,但是同时提交的是一个result,这个result就是让Runable对其进行操作的,操作后返回这个result,这个result就是两者之间的共享数据
对于Runnable+ result的使用,可以看如下的代码
ExecutorService executor = Executors.newFixedThreadPool(1);
// 创建 Result 对象 r
Result r = new Result();
r.setAAA(a);
// 提交任务
//这是利用的构造函数来传入的结果集
Future<Result> future =
executor.submit(new Task(r), r);
Result fr = future.get();
// 下面等式成立
进行判断地址
fr == = r;
fr.getAAA() == = a;
fr.getXXX() == = x
class Task implements Runnable {
Result r;
// 通过构造函数传入 result
Task(Result r) {
this.r = r;
}
void run() {
// 可以操作 result
a = r.getAAA();
r.setXXX(x);
}
}
然后是关于FutureTask,这个工具类 ,实现了Runnable和Future接口
可以直接将其交给ThreadPoolExector去执行,亦可以直接被Thread执行
又因为实现了Future接口,所以可以获得任务执行接口
这个异步执行的最好方式,既可以运行,也可以获取结果
比如说如下的场景
可以分为两个线程负责执行
对于FutureTask的执行,基本如下
// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
FutureTask<String> ft2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println(“T1:洗水壶…”);
TimeUnit.SECONDS.sleep(1);
System.out.println(“T1:烧开水…”);
TimeUnit.SECONDS.sleep(15);
// 获取T2线程的茶叶
String tf = ft2.get();
System.out.println(“T1:拿到茶叶:”+tf);
System.out.println(“T1:泡茶…”);
return “上茶:” + tf;
}
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println(“T2:洗茶壶…”);
TimeUnit.SECONDS.sleep(1);
System.out.println(“T2:洗茶杯…”);
TimeUnit.SECONDS.sleep(2);
System.out.println(“T2:拿茶叶…”);
TimeUnit.SECONDS.sleep(1);
return “龙井”;
}
}
public static void main(String[] args) {
// 创建任务T2的FutureTask
FutureTask<String> ft2
= new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> ft1
= new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());
}
/* 一次执行结果:
T1:洗水壶…
T2:洗茶壶…
T1:烧开水…
T2:洗茶杯…
T2:拿茶叶…
T1:拿到茶叶:龙井
T1:泡茶…
上茶:龙井*/
再之后,Java提供了CompletableFuture编程工具类
我们仍然拿着上述的流程举例
对于CompletableFuture来说,可以分为
//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(()->{
System.out.println(“T1:洗水壶…”);
sleep(1, TimeUnit.SECONDS);
System.out.println(“T1:烧开水…”);
sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
System.out.println(“T2:洗茶壶…”);
sleep(1, TimeUnit.SECONDS);
System.out.println(“T2:洗茶杯…”);
sleep(2, TimeUnit.SECONDS);
System.out.println(“T2:拿茶叶…”);
sleep(1, TimeUnit.SECONDS);
return “龙井”;
});
//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 =
f1.thenCombine(f2, (__, tf)->{
System.out.println(“T1:拿到茶叶:” + tf);
System.out.println(“T1:泡茶…”);
return “上茶:” + tf;
});
//等待任务3执行结果
System.out.println(f3.join());
void sleep(int t, TimeUnit u) {
try {
u.sleep(t);
}catch(InterruptedException e){}
}
/*一次执行结果:
T1:洗水壶…
T2:洗茶壶…
T1:烧开水…
T2:洗茶杯…
T2:拿茶叶…
T1:拿到茶叶:龙井
T1:泡茶…
上茶:龙井*/
代码简单明了,利用f1.thenCombine(f2)说明了,任务3等到1和2任务
代码更专注于业务逻辑
关于CompletableFuture对象
常见方式可以选择runAsync()加入Runnable 和 supplyAsync 加入Supplier
runAsync无返回值,supplyAsync有返回值
再往下还有这可以选择传入线程池的方法,当然可以不传入线程池,这样会使用公共的ForkJoinPool线程池,创建的线程数是CPU的核数
但是注意,如果共享一个线程池,有一个很慢的IO操作,可能拖累整个线程池的运行速度
还是为不同的任务进行分配不同的线程池,会跟家好一点\
// 可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) |
创建完成CompletableFuture对象,会自动执行run方法和get方法来获取到执行结果
对于上面的情况,其还实现了CompletionStage接口
CompletionStage接口中的方法众多,大约有40乃至50个之多
其主要为了工作流的分工问题而准备的
比如说 串行 并行 汇聚这样的问题
串行不必说,就是任务C必须等待任务A完成之后才能开始这就是串行
汇聚则是任务C必须等待任务A和任务B完成后才行,一种And聚合关系,当然还有Or聚合关系,就是任务C等待任务A或者任务B之一完成即可
接下来具体的介绍一下
串行关系
thenApply thenAccept thenRun thenCompose这四个接口
thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。
而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。 thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。 这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的 |
API:
CompletionStage<R> thenApply(fn); CompletionStage<R> thenApplyAsync(fn); CompletionStage<Void> thenAccept(consumer); CompletionStage<Void> thenAcceptAsync(consumer); CompletionStage<Void> thenRun(action); CompletionStage<Void> thenRunAsync(action); CompletionStage<R> thenCompose(fn); CompletionStage<R> thenComposeAsync(fn); |
总结下来说,虽然返回的都是CompletionStage,但是有些具有返回值,有些不具有返回值,都返回CompletionStage可以保证对同一个对象进行操作
而且可以细分为异步和非异步去执行
一个简答的操作代码可以为
String join = CompletableFuture.supplyAsync(() -> “Hello World”)
.thenApply(s -> s + “,QQ”)
.thenApply(String::toLowerCase).join();
System.out.println(join);
还有着AND方法
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn); CompletionStage<Void> thenAcceptBoth(other, consumer); CompletionStage<Void> thenAcceptBothAsync(other, consumer); CompletionStage<Void> runAfterBoth(other, action); CompletionStage<Void> runAfterBothAsync(other, action); |
以及OR方法
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn); CompletionStage acceptEither(other, consumer); CompletionStage acceptEitherAsync(other, consumer); CompletionStage runAfterEither(other, action); CompletionStage runAfterEitherAsync(other, action); |
我们展示下如何使用OR关系
public static void main(String[] args) {
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
int t = ThreadLocalRandom.current().nextInt(10, 15);
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
return “A”;
});
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
int t = ThreadLocalRandom.current().nextInt(5, 10);
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
return “B”;
});
String s = taskA.applyToEither(taskB, String::toLowerCase).join();
System.out.println(s);
}
最后是关于异常处理
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer); CompletionStage<R> whenCompleteAsync(consumer); CompletionStage<R> handle(fn); CompletionStage<R> handleAsync(fn); |
主要介绍上面的几个方法
exceptionally相当于try()catch()中的catch操作
对于whenComplete和handle操作,相当于try() finally()中的finally操作
但是whenComplete的传入是consumer没有返回值
handler的支持返回值
简单点的使用就是
Integer join = CompletableFuture.supplyAsync(() -> 0 / 7)
.thenApply(r -> r * 7)
.exceptionally(e -> 0).join();
System.out.println(join);
现在对于异步的回调函数,还支持了一个实验室项目RxJava,就是为了异步编程准备的