对于某些时候,如果直接使用Future的get方法的话,可能导致一个问题,只要有一个子线程执行时间过长,就会导致阻塞到了f1.get()操作上

那么该怎么办呢

java中尝试将生产者-消费者模型,将所有线程的返回值保存在一个阻塞队列中,将任务的最终执行结果放在队列中

那么在后来,Java设置精美的CompletionService 来帮你维护这个阻塞队列,不过不同于我们内部维护的一个阻塞队列将任务结果放入,CompletionService将任务放进去了

对应的构造函数为:

ExecutorCompletionService(Executor executor);

ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)

一个简单的使用示例如下:

// 创建线程池

ExecutorService executor =

Executors.newFixedThreadPool(3);

// 创建 CompletionService

CompletionService<Integer> cs = new

ExecutorCompletionService<>(executor);

// 异步向电商 S1 询价

cs.submit(()->getPriceByS1());

// 异步向电商 S2 询价

cs.submit(()->getPriceByS2());

// 异步向电商 S3 询价

cs.submit(()->getPriceByS3());

// 将询价结果异步保存到数据库

for (int i=0; i<3; i++) {

Integer r = cs.take().get();

executor.execute(()->save(r));

}

任务执行完成后通过take获取到对应的Future对象,然后get对应的值

常见的Api如下

Future<V> submit(Callable<V> task);

Future<V> submit(Runnable task, V result);

Future<V> take()

throws InterruptedException;

Future<V> poll();

Future<V> poll(long timeout, TimeUnit unit)

throws InterruptedException;

首先是两个submit

一个传入的Callable接口,可以获取到返回值

另一个传入一个Runnable task和result 进行加工

剩下的take和poll都是从队列中获取并移除一个元素

take会直接阻塞,poll拿不到会直接返回null

在Dubbo中,有一种叫做Forking的集群,也就是一个服务由多个运营商提供,只要有一个运营商提供了服务,就将其他所有的查询都停下

在CompletionService中可以快速实现这种模式,就是一个线程Executor,一个CompletionService对象cs,和一个返回值futures,Future的集合

在cs中创建多个Future,并保存在futures中,然后调用cs.take().get()方法来获取执行结果

基本的代码如下

// 创建线程池

ExecutorService executor =

Executors.newFixedThreadPool(3);

// 创建 CompletionService

CompletionService<Integer> cs =

new ExecutorCompletionService<>(executor);

// 用于保存 Future 对象

List<Future<Integer>> futures =

new ArrayList<>(3);

// 提交异步任务,并保存 future 到 futures

futures.add(

cs.submit(()->geocoderByS1()));

futures.add(

cs.submit(()->geocoderByS2()));

futures.add(

cs.submit(()->geocoderByS3()));

// 获取最快返回的任务执行结果

Integer r = 0;

try {

// 只要有一个成功返回,则 break

for (int i = 0; i < 3; ++i) {

r = cs.take().get();

// 简单地通过判空来检查是否成功返回

if (r != null) {

break;

}

}

} finally {

// 取消所有任务

for(Future<Integer> f : futures)

f.cancel(true);

}

// 返回结果

return r;

发表评论

邮箱地址不会被公开。 必填项已用*标注