对于某些时候,如果直接使用Future的get方法的话,可能导致一个问题,只要有一个子线程执行时间过长,就会导致阻塞到了f1.get()操作上
那么该怎么办呢
java中尝试将生产者-消费者模型,将所有线程的返回值保存在一个阻塞队列中,将任务的最终执行结果放在队列中
那么在后来,Java设置精美的CompletionService 来帮你维护这个阻塞队列,不过不同于我们内部维护的一个阻塞队列将任务结果放入,CompletionService将任务放进去了
对应的构造函数为:
ExecutorCompletionService(Executor executor);
ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)
一个简单的使用示例如下:
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3); // 创建 CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 异步向电商 S1 询价 cs.submit(()->getPriceByS1()); // 异步向电商 S2 询价 cs.submit(()->getPriceByS2()); // 异步向电商 S3 询价 cs.submit(()->getPriceByS3()); // 将询价结果异步保存到数据库 for (int i=0; i<3; i++) { Integer r = cs.take().get(); executor.execute(()->save(r)); } |
任务执行完成后通过take获取到对应的Future对象,然后get对应的值
常见的Api如下
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; |
首先是两个submit
一个传入的Callable接口,可以获取到返回值
另一个传入一个Runnable task和result 进行加工
剩下的take和poll都是从队列中获取并移除一个元素
take会直接阻塞,poll拿不到会直接返回null
在Dubbo中,有一种叫做Forking的集群,也就是一个服务由多个运营商提供,只要有一个运营商提供了服务,就将其他所有的查询都停下
在CompletionService中可以快速实现这种模式,就是一个线程Executor,一个CompletionService对象cs,和一个返回值futures,Future的集合
在cs中创建多个Future,并保存在futures中,然后调用cs.take().get()方法来获取执行结果
基本的代码如下
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs =
new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures =
new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures
futures.add(
cs.submit(()->geocoderByS1()));
futures.add(
cs.submit(()->geocoderByS2()));
futures.add(
cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;