一般来说对于整个项目,或者说整个任务的层面去看待问题,但是其实具体来说,所有的异步编程,就是基于了一个分工的思路来描述问题的

图片

基本上,异步编程就是采用了一个分治的任务模型

分治,简单来说,就是分而治之,一个解决复杂问题的思维方式

将一个复杂的问题分为多个相似的小问题,然后把子问题进一步的进行划分,直到分不可分

分治思想在很多领域都有所应用,例如算法领域的归并 快速排序 二分查找

Java领域,提供了Fork/Join框架去支持分治任务

分治模型可以概括为两个阶段,第一各阶段是任务分解,将任务迭代的分解为了子任务,直到分不可分,第二个阶段是合并,将子任务的结果合并,获取到最终结果

图片

对于每个线程来说,只是规模变小了,但是算法是相同的

在Java汇总Fork/Join的具体实现是ForkJoinPool线程池和分治任务ForkjoinTask,就是普通的线程池和Runnbale接口的关系

ForkJoinTask是一个抽象类,其中分为两个核心的任务fork() 和 join()

ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,它们都是用递归的方式来处理分治任务的。

这两个子类都定义了抽象方法 compute(),

不过区别是 RecursiveAction 定义的 compute() 没有返回值

而 RecursiveTask 定义的 compute() 方法是有返回值的

简单的操作,我们可以利用这个框架去实现计算斐波那契数列

分为了两步

分别是创建分治任务线程池和计算斐波那契数列的分治任务

分治任务需要实现compute()方法

public static void main(String[] args) {

// 创建分治任务线程池

ForkJoinPool fjp = new ForkJoinPool(4);

// 创建分治任务

Fibonacci fib = new Fibonacci(30);

// 启动分治任务

Integer result = fjp.invoke(fib);

// 输出结果

System.out.println(result);

}

// 递归任务

static class Fibonacci extends

RecursiveTask<Integer> {

final int n;

Fibonacci(int n) {

this.n = n;

}

@Override

protected Integer compute() {

if (n <= 1)

return n;

Fibonacci f1 = new Fibonacci(n – 1);

// 创建子任务

f1.fork();

Fibonacci f2 = new Fibonacci(n – 2);

// 等待子任务结果,并合并结果

return f2.compute() + f1.join();

}

}

其工作原理很简单

是一个生产者-消费者的实现,内部有多个任务队列,还维护着多个工作线程去对应着队列

当我们提交一个任务时候,会创建出子任务,子任务会提交到对应的任务队列中

但是一个工作线程的任务队列空了,不会没活干了,还有一种任务窃取的机制,如果一个线程空了之后,那么他会尝试着从其他的线程的队列中拿去队列,为了保证拿取任务的线程安全

我们将整个任务队列设计为双端的,线程本身从队首拿取任务,窃取者从队尾拿取任务

图片

那么就可以将一个文件拆分更小的文件,然后一直拆分下去,直到只有一行,然后统计这一行中单词数量,最后join进去

static void main(String[] args) {

String[] fc = {“hello world”,

“hello me”,

“hello fork”,

“hello join”,

“fork join in world”};

// 创建 ForkJoin 线程池

ForkJoinPool fjp = new ForkJoinPool(3);

// 创建任务

MR mr = new MR(fc, 0, fc.length);

// 启动任务

Map<String, Long> result = fjp.invoke(mr);

// 输出结果

result.forEach((k, v) -> System.out.println(k + “:” + v));

}

//MR 模拟类

static class MR extends RecursiveTask<Map<String, Long>> {

private String[] fc;

private int start, end;

// 构造函数

MR(String[] fc, int fr, int to) {

this.fc = fc;

this.start = fr;

this.end = to;

}

@Override

protected Map<String, Long> compute() {

if (end – start == 1) {

return calc(fc[start]);

} else {

int mid = (start + end) / 2;

MR mr1 = new MR(fc, start, mid);

mr1.fork();

MR mr2 = new MR(fc, mid, end);

// 计算子任务,并返回合并的结果

return merge(mr2.compute(), mr1.join());

}

}

// 合并结果

private Map<String, Long> merge(

Map<String, Long> r1,

Map<String, Long> r2) {

Map<String, Long> result =

new HashMap<>();

result.putAll(r1);

// 合并结果

r2.forEach((k, v) -> {

Long c = result.get(k);

if (c != null) {

result.put(k, c + v);

}else {

result.put(k, v);

}

});

return result;

}

// 统计单词数量

private Map<String, Long> calc(String line) {

Map<String, Long> result = new HashMap<>();

// 分割单词

String[] words = line.split(“\\s+”);

// 统计单词数量

for (String w : words) {

Long v = result.get(w);

if (v != null) {

result.put(w, v + 1);

}else {

result.put(w, 1L);

}

}

return result;

}

}

最后要提一嘴,其核心组件是ForkJoinPool,支持任务窃取机制,所有的线程工作量均衡,但是一般来说一个线程池同时面对CPU操作,IO操作的话

可能会导致因为一个很慢的IO操作拖累整个任务池的性能,建议采用不同的ForkJoinPool来执行不同的类型计算任务

发表评论

邮箱地址不会被公开。 必填项已用*标注