// 递归任务
@AllArgsConstructor
class Fibonacci extends RecursiveTask<Integer> {
private final int n;
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
// 创建子任务
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
// 等待子任务结果并合并
return f1.join() + f2.join();
}
}
// 创建分治任务线程池
ForkJoinPool pool = new ForkJoinPool(4);
// 创建分治任务
Fibonacci fibonacci = new Fibonacci(30);
// 启动分治任务
System.out.println(pool.invoke(fibonacci)); // 832040
@AllArgsConstructor
class MapReduce extends RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start;
private int end;
@Override
protected Map<String, Long> compute() {
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start + end) / 2;
// 前半部分数据fork一个递归任务
MapReduce mr1 = new MapReduce(fc, start, mid);
mr1.fork();
// 后半部分数据在当前任务中递归处理
MapReduce mr2 = new MapReduce(fc, mid, end);
// 计算子任务,返回合并的结果
return merge(mr2.compute(), mr1.join());
}
}
// 统计单词数量
private Map<String, Long> calc(String line) {
Map<String, Long> result = new HashMap<>();
String[] words = line.split("//s+");
for (String word : words) {
if (result.containsKey(word)) {
result.put(word, result.get(word) + 1);
} else {
result.put(word, 1L);
}
}
return result;
}
// 合并结果
private Map<String, Long> merge(Map<String, Long> r1, Map<String, Long> r2) {
Map<String, Long> result = new HashMap<>(r1);
r2.forEach((word, count) -> {
if (result.containsKey(word)) {
result.put(word, result.get(word) + count);
} else {
result.put(word, count);
}
});
return result;
}
}
String[] fc = {"hello world",
"hello me",
"hello fork",
"hello join",
"fork join in world"};
ForkJoinPool pool = new ForkJoinPool(3);
MapReduce mapReduce = new MapReduce(fc, 0, fc.length);
Map<String, Long> result = pool.invoke(mapReduce);
result.forEach((word, count) -> System.out.println(word + " : " + count));
转载请注明出处:http://zhongmingmao.me/2019/05/17/java-concurrent-fork-join/
访问原文「 Java并发 -- Fork + Join 」获取最佳阅读体验并参与讨论