在常见的异步执行问题中,往往会出现场景为,需要获取多个异步执行的结果,再进行操作,那么对于线程池这种线程,执行的void execute()是不存在方法返回值的,为了解决这个问题,Java提供了三个Submit()方法和一个FutureTask工具类来获取任务执行的结果

首先是三个submit()方法

// 提交 Runnable 任务

Future<?> submit(Runnable task);

// 提交 Callable 任务

<T> Future<T> submit(Callable<T> task);

// 提交 Runnable 任务及结果引用

<T> Future<T> submit(Runnable task, T result);

返回值是一个Future接口

Future中内含的API有

// 取消任务

boolean cancel(

boolean mayInterruptIfRunning);

// 判断任务是否已取消

boolean isCancelled();

// 判断任务是否已结束

boolean isDone();

// 获得任务执行结果

get();

// 获得任务执行结果,支持超时

get(long timeout, TimeUnit unit);

然后是Runnable和Callable的区别

1.提交的是一个Runnable接口,Runnable是没有返回值的,所以这个返回的Future只能用于判断任务已经结束

2.提交的是一个Callable接口,只有一个call方法,是有返回值的,get可以直接使用

3.提交的是Runable接口,但是同时提交的是一个result,这个result就是让Runable对其进行操作的,操作后返回这个result,这个result就是两者之间的共享数据

对于Runnable+ result的使用,可以看如下的代码

ExecutorService executor = Executors.newFixedThreadPool(1);

// 创建 Result 对象 r

Result r = new Result();

r.setAAA(a);

// 提交任务

//这是利用的构造函数来传入的结果集

Future<Result> future =

executor.submit(new Task(r), r);

Result fr = future.get();

// 下面等式成立

进行判断地址

fr == = r;

fr.getAAA() == = a;

fr.getXXX() == = x

class Task implements Runnable {

Result r;

// 通过构造函数传入 result

Task(Result r) {

this.r = r;

}

void run() {

// 可以操作 result

a = r.getAAA();

r.setXXX(x);

}

}

然后是关于FutureTask,这个工具类 ,实现了Runnable和Future接口

可以直接将其交给ThreadPoolExector去执行,亦可以直接被Thread执行

又因为实现了Future接口,所以可以获得任务执行接口

这个异步执行的最好方式,既可以运行,也可以获取结果

比如说如下的场景

图片

可以分为两个线程负责执行

图片 图片

对于FutureTask的执行,基本如下

// T1Task需要执行的任务:

// 洗水壶、烧开水、泡茶

class T1Task implements Callable<String> {

FutureTask<String> ft2;

// T1任务需要T2任务的FutureTask

T1Task(FutureTask<String> ft2){

this.ft2 = ft2;

}

@Override

String call() throws Exception {

System.out.println(“T1:洗水壶…”);

TimeUnit.SECONDS.sleep(1);

System.out.println(“T1:烧开水…”);

TimeUnit.SECONDS.sleep(15);

// 获取T2线程的茶叶

String tf = ft2.get();

System.out.println(“T1:拿到茶叶:”+tf);

System.out.println(“T1:泡茶…”);

return “上茶:” + tf;

}

}

// T2Task需要执行的任务:

// 洗茶壶、洗茶杯、拿茶叶

class T2Task implements Callable<String> {

@Override

String call() throws Exception {

System.out.println(“T2:洗茶壶…”);

TimeUnit.SECONDS.sleep(1);

System.out.println(“T2:洗茶杯…”);

TimeUnit.SECONDS.sleep(2);

System.out.println(“T2:拿茶叶…”);

TimeUnit.SECONDS.sleep(1);

return “龙井”;

}

}

public static void main(String[] args) {

// 创建任务T2的FutureTask

FutureTask<String> ft2

= new FutureTask<>(new T2Task());

// 创建任务T1的FutureTask

FutureTask<String> ft1

= new FutureTask<>(new T1Task(ft2));

// 线程T1执行任务ft1

Thread T1 = new Thread(ft1);

T1.start();

// 线程T2执行任务ft2

Thread T2 = new Thread(ft2);

T2.start();

// 等待线程T1执行结果

System.out.println(ft1.get());

}

/* 一次执行结果:

T1:洗水壶…

T2:洗茶壶…

T1:烧开水…

T2:洗茶杯…

T2:拿茶叶…

T1:拿到茶叶:龙井

T1:泡茶…

上茶:龙井*/

再之后,Java提供了CompletableFuture编程工具类

我们仍然拿着上述的流程举例

图片

对于CompletableFuture来说,可以分为

图片

//任务1:洗水壶->烧开水

CompletableFuture<Void> f1 =

CompletableFuture.runAsync(()->{

System.out.println(“T1:洗水壶…”);

sleep(1, TimeUnit.SECONDS);

System.out.println(“T1:烧开水…”);

sleep(15, TimeUnit.SECONDS);

});

//任务2:洗茶壶->洗茶杯->拿茶叶

CompletableFuture<String> f2 =

CompletableFuture.supplyAsync(()->{

System.out.println(“T2:洗茶壶…”);

sleep(1, TimeUnit.SECONDS);

System.out.println(“T2:洗茶杯…”);

sleep(2, TimeUnit.SECONDS);

System.out.println(“T2:拿茶叶…”);

sleep(1, TimeUnit.SECONDS);

return “龙井”;

});

//任务3:任务1和任务2完成后执行:泡茶

CompletableFuture<String> f3 =

f1.thenCombine(f2, (__, tf)->{

System.out.println(“T1:拿到茶叶:” + tf);

System.out.println(“T1:泡茶…”);

return “上茶:” + tf;

});

//等待任务3执行结果

System.out.println(f3.join());

void sleep(int t, TimeUnit u) {

try {

u.sleep(t);

}catch(InterruptedException e){}

}

/*一次执行结果:

T1:洗水壶…

T2:洗茶壶…

T1:烧开水…

T2:洗茶杯…

T2:拿茶叶…

T1:拿到茶叶:龙井

T1:泡茶…

上茶:龙井*/

代码简单明了,利用f1.thenCombine(f2)说明了,任务3等到1和2任务

代码更专注于业务逻辑

关于CompletableFuture对象

常见方式可以选择runAsync()加入Runnable 和 supplyAsync 加入Supplier

runAsync无返回值,supplyAsync有返回值

再往下还有这可以选择传入线程池的方法,当然可以不传入线程池,这样会使用公共的ForkJoinPool线程池,创建的线程数是CPU的核数

但是注意,如果共享一个线程池,有一个很慢的IO操作,可能拖累整个线程池的运行速度

还是为不同的任务进行分配不同的线程池,会跟家好一点\

// 可以指定线程池

static CompletableFuture<Void>

runAsync(Runnable runnable, Executor executor)

static <U> CompletableFuture<U>

supplyAsync(Supplier<U> supplier, Executor executor)

创建完成CompletableFuture对象,会自动执行run方法和get方法来获取到执行结果

对于上面的情况,其还实现了CompletionStage接口

CompletionStage接口中的方法众多,大约有40乃至50个之多

其主要为了工作流的分工问题而准备的

比如说 串行 并行 汇聚这样的问题

串行不必说,就是任务C必须等待任务A完成之后才能开始这就是串行

汇聚则是任务C必须等待任务A和任务B完成后才行,一种And聚合关系,当然还有Or聚合关系,就是任务C等待任务A或者任务B之一完成即可

接下来具体的介绍一下

串行关系

thenApply thenAccept thenRun thenCompose这四个接口

thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage。

而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。

thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage。

这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的

API:

CompletionStage<R> thenApply(fn);

CompletionStage<R> thenApplyAsync(fn);

CompletionStage<Void> thenAccept(consumer);

CompletionStage<Void> thenAcceptAsync(consumer);

CompletionStage<Void> thenRun(action);

CompletionStage<Void> thenRunAsync(action);

CompletionStage<R> thenCompose(fn);

CompletionStage<R> thenComposeAsync(fn);

总结下来说,虽然返回的都是CompletionStage,但是有些具有返回值,有些不具有返回值,都返回CompletionStage可以保证对同一个对象进行操作

而且可以细分为异步和非异步去执行

一个简答的操作代码可以为

String join = CompletableFuture.supplyAsync(() -> “Hello World”)

.thenApply(s -> s + “,QQ”)

.thenApply(String::toLowerCase).join();

System.out.println(join);

还有着AND方法

CompletionStage<R> thenCombine(other, fn);

CompletionStage<R> thenCombineAsync(other, fn);

CompletionStage<Void> thenAcceptBoth(other, consumer);

CompletionStage<Void> thenAcceptBothAsync(other, consumer);

CompletionStage<Void> runAfterBoth(other, action);

CompletionStage<Void> runAfterBothAsync(other, action);

以及OR方法

CompletionStage applyToEither(other, fn);

CompletionStage applyToEitherAsync(other, fn);

CompletionStage acceptEither(other, consumer);

CompletionStage acceptEitherAsync(other, consumer);

CompletionStage runAfterEither(other, action);

CompletionStage runAfterEitherAsync(other, action);

我们展示下如何使用OR关系

public static void main(String[] args) {

CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {

int t = ThreadLocalRandom.current().nextInt(10, 15);

try {

Thread.sleep(t);

} catch (InterruptedException e) {

e.printStackTrace();

}

return “A”;

});

CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {

int t = ThreadLocalRandom.current().nextInt(5, 10);

try {

Thread.sleep(t);

} catch (InterruptedException e) {

e.printStackTrace();

}

return “B”;

});

String s = taskA.applyToEither(taskB, String::toLowerCase).join();

System.out.println(s);

}

最后是关于异常处理

CompletionStage exceptionally(fn);

CompletionStage<R> whenComplete(consumer);

CompletionStage<R> whenCompleteAsync(consumer);

CompletionStage<R> handle(fn);

CompletionStage<R> handleAsync(fn);

主要介绍上面的几个方法

exceptionally相当于try()catch()中的catch操作

对于whenComplete和handle操作,相当于try() finally()中的finally操作

但是whenComplete的传入是consumer没有返回值

handler的支持返回值

简单点的使用就是

Integer join = CompletableFuture.supplyAsync(() -> 0 / 7)

.thenApply(r -> r * 7)

.exceptionally(e -> 0).join();

System.out.println(join);

现在对于异步的回调函数,还支持了一个实验室项目RxJava,就是为了异步编程准备的

发表评论

邮箱地址不会被公开。 必填项已用*标注