转载

Java Streams,第 2 部分: 使用流执行聚合

本系列的 第 1 部分 介绍了 Java SE 8 中添加的 java.util.stream 库。第二期文章将重点介绍 Streams 库的一个最重要的、灵活的方面 — 聚合和汇总数据的能力。

“累加器反模式”

第 1 部分中的第 1 个例子使用 Streams 执行了一次简单的汇总,如清单 1 所示。

清单 1. 使用 Streams 声明性地计算聚合值
int totalSalesFromNY     = txns.stream()           .filter(t -> t.getSeller().getAddr().getState().equals("NY"))           .mapToInt(t -> t.getAmount())           .sum();

清单 2 展示了如何采用 “老方法” 编写这个示例。

清单 2. 通过命令计算同一个聚合值
int sum = 0; for (Txn t : txns) {     if (t.getSeller().getAddr().getState().equals("NY"))         sum += t.getAmount(); }

第 1 部分介绍了尽管新方法比老方法更长,但新方法更可取的一些原因:

关于本系列

借助 java.util.stream 包,您可以简明地、声明性地表达集合、数组和其他数据源上可能的并行批量操作。在 Java 语言架构师 Brian Goetz 编写的这个 系列 中,全面了解 Streams 库,并学习如何最充分地使用它。

  • 代码更加清晰,因为它被简单地构造为一些简单操作的组合。
  • 该代码是通过声明进行表达的(描述想要的结果),而不是通过命令进行表达的(一个计算结果的循序渐进的过程)。
  • 随着表达的查询变得更加复杂,此方法可以更干净地扩展。

应用这个特殊的聚合是有一些额外原因的。清单 2 演示了累加器反模式,其中代码首先声明并初始化一个可变累加器变量 (sum),然后继续在循环中更新累加器。为什么这样做是不正确的?首先,此代码样式难以并行化。没有协调(比如同步),对累加器的每次访问都导致一次数据争用(而借助协调,协调导致的争用会破坏并行性所带来的效率收益)。

学习更多知识。开发更多项目。联系更多同行。

全新的 developerWorks Premium 订阅计划提供了强大的开发工具和资源,包括 500 篇通过 Safari Books Online 提供的顶级技术文章(包含作者的 Java 并发性实战)、最重要开发人员活动的大幅折扣、最新的 O'Reilly 大会的视频录像,等等。立即注册。

累加器方法更不可取的另一个原因是,它在太低的级别上建模计算 — 在各个元素的级别上,而不是在整个数据集的级别上。与 “逐个依次迭代交易金额,将每笔金额添加到一个已初始化为 0 的累加器” 相比,“所有交易金额的总和” 是目标的更抽象、更直接的陈述。

所以,如果命令式累加是错误的工具,那什么才是正确的工具?在这个特定的问题中,您已经看到了答案的线索(sum() 方法),但这只是一个强大的、通用的缩减 技术的一种特殊情况。缩减技术简单、灵活,而且可并行化,还能在比命令式累加更高的抽象级别上操作。


缩减

缩减技术简单、灵活,而且可并行化,还能在比命令式累加更高的抽象级别上操作。

缩减(也称为折叠)是一种来自函数编程的技术,它抽象化了许多不同的累加操作。给定一个类型为 T,包含 x 个元素的非空数列 X1, x2, ..., xn 和 T 上的二元运算符(在这里表示为 *),* 下的 X 的缩减 被定义为:

   (x1 * x2 * ...* xn)

当使用普通的加法作为二元运算符来应用于某个数列时,缩减就是求和。但其他许多操作也可以使用缩减来描述。如果二元运算符是 “获取两个元素中较大的一个”(这在 Java 中可以使用拉姆达表达式 (x,y) -> Math.max(x,y) 来表示,或者更简单地表示为方法引用 Math::max),则缩减对应于查找最大值。

通过将累加描述为缩减,而不使用累加器反模式,可以采用更抽象、更紧凑、更并行化 的方式来描述计算 — 只要您的二元运算符满足一个简单条件:结合性。回想一下,如果 a、b 和 c 元素满足以下条件,二元运算符 * 就是结合性的

   ((a * b) * c) = (a * (b * c))

结合性意味着分组无关紧要。如果二元运算符是结合性的,那么可以按照任何顺序安全地执行缩减。在顺序执行中,执行的自然顺序是从左向右;在并行执行中,数据划分为分段,分别缩减每个分段,然后组合结果。结合性可确保这两种方法得到相同的答案。如果将结合性的定义扩展到 4 项,可能更容易理解:

   (((a * b) * c) * d) = ((a * b) * (b * d))

左侧对应于典型的顺序计算;右侧对应于表示典型的并行执行的分区执行,其中输入序列被分解为几部分,各部分并行缩减,并使用 * 将各部分的结果组合起来。(或许令人惊奇的是,* 不需要是可交换的,但许多运算符通常都可用于缩减,比如相加和求最大值等。具有结合性但没有可交换性的二元运算符的一个例子是字符串串联。)

Streams 库有多种缩减方法,包括:

Optional<T> reduce(BinaryOperator<T> op) T reduce(T identity, BinaryOperator<T> op)

在这些方法中,最简单的方法仅获得一个结合性二元运算符,在该运算符下计算流元素的缩减结果。结果被描述为 Optional;如果输入流是空的,则缩减结果也是空的。(如果输入只有一个元素,那么缩减结果就是该元素。)如果您有一个字符串集合,您可以将这些元素的串联计算为:

String concatenated = strings.stream().reduce("", String::concat);

对于这两种方法中的第二种方法,您需要提供一个身份值,在字符串为空时,还可以将该值用作结果。身份值必须满足所有 x 的限制:

   身份 * x = x
   x * 身份 = x

不是所有二元运算符都有身份值,但当它们拥有身份值时,它们可能不会得到您想要的结果。例如,计算最大值时,您可能倾向于使用值 Integer.MIN_VALUE 作为身份(它确实满足要求)。但在空流上使用该身份时,结果可能不是您想要的;您无法确定空输入和仅包含 Integer.MIN_VALUE 的非空输入之间的区别。(有时这不是问题,但有时会导致问题 — 因此 Streams 库将留给客户,由客户决定是否指定身份。)

对于字符串串联,身份是空字符串,所以您可以将前面的示例重写为:

String concatenated = strings.stream().reduce("", String::concat);

类似地,您可以将数组上的整数总和描述为:

int sum = Stream.of(ints).reduce(0, (x,y) -> x+y);

(但实际上,您使用了 IntStream.sum() 便捷方法。)

缩减不需要仅应用于整数和字符串,它可以应用于您想要将元素序列缩减为该类型的单个元素的任何情形。例如,您可以通过缩减来计算最高的人:

Comparator<Person> byHeight = Comparators.comparingInt(Person::getHeight); BinaryOperator<Person> tallerOf = BinaryOperator.maxBy(byHeight); Optional<Person> tallest = people.stream().reduce(tallerOf);

如果提供的二元运算符不是结合性的,或者提供的身份值实际上不是该二元运算符的身份,那么在并行执行该操作时,结果可能是错的,而且同一个数据集上的不同执行过程可能会生成不同的结果。


可变缩减

缩减获取一个值序列并将它缩减为单个值,比如数列的和或它的最大值。但是有时您不想要单个汇总值;您想将结果组织为类似 ListMap 的数据结构,或者将它缩减为多个汇总值。在这种情况下,您应该使用缩减 的可变类似方法,也称为收集

考虑将元素累积到一个 List 中的简单情况。使用累加器反模式,您可以这样编写它:

ArrayList<String> list = new ArrayList<>(); for (Person p : people)     list.add(p.toString());

当累加器变量是一个简单值时,缩减是累加的更好替代方法,与此类似,在累加器结果是更复杂的数据结构时,也有一种更好的替代方法。缩减的构建块是一个身份值和一种将两个值组合成新值的途径;可变缩减的类似方法包括:

  • 一种生成空结果容器的途径
  • 一种将新元素合并到结果容器中的途径
  • 一种合并两个结果容器的途径

这些构建块可以轻松地表达为函数。这些函数中的第 3 个支持并行执行可变缩减:您可以对数据集进行分区,为每一部分生成一个中间累加结果,然后合并中间结果。Streams 库有一个 collect() 方法,它接受以下 3 个函数:

<R> collect(Supplier<R> resultSupplier,             BiConsumer<R, T> accumulator,              BiConsumer<R, R> combiner)

在前一节中,您看到了一个使用缩减来计算字符串串联的示例。该代码会生成正确的结果,但是,因为 Java 中的字符串是不可变的,而且串联要求复制整个字符串,所以它还有 O(n2) 运行时(一些字符串将复制多次)。您可以通过将结果收集到 StringBuilder 中,更高效地表达字符串串联:

StringBuilder concat = strings.stream()                               .collect(() -> new StringBuilder(),                                        (sb, s) -> sb.append(s),                                        (sb, sb2) -> sb.append(sb2));

此方法使用 StringBuilder 作为结果容器。传递给 collect() 的 3 个函数使用默认构造函数创建了一个空容器,append(String) 方法将一个元素添加到容器中,append(StringBuilder) 方法将一个容器合并到另一个容器中。使用方法引用可能可以比拉姆达表达式更好地表达此代码:

StringBuilder concat = strings.stream()                               .collect(StringBuilder::new,                                        StringBuilder::append,                                        StringBuilder::append);

类似地,要将一个流收集到一个 HashSet 中,您可以这样做:

Set<String> uniqueStrings = strings.stream()                                    .collect(HashSet::new,                                             HashSet::add,                                             HashSet::addAll);

在这个版本中,结果容器是一个 HashSet 而不是 StringBuilder,但方法是一样的:使用默认构造函数创建一个新的结果容器,使用 add() 方法将一个新元素合并到结果集中,使用 addAll() 方法合并两个结果集。很容易看到如何将此代码调整为其他任何类型的集合。

您可能会想,因为使用了可变结果容器(StringBuilderHashSet),所以这也是累加器反模式的一个例子。但其实不然。累加器反模式在这种情况下采用的类似方法是:

Set<String> set = new HashSet<>(); strings.stream().forEach(s -> set.add(s));

可将收集器组合到一起来形成更复杂的聚合。

就像只要组合函数是结合性的,且没有相互干扰的副作用,就可以安全地并行化缩减一样,如果满足一些简单的一致性要求(在 collect() 的规范中列出),就可以安全地并行化使用了 Stream.collect() 的可变缩减。关键区别在于,对于 forEach() 版本,多个线程会同时尝试访问一个结果容器,而对于并行 collect(),每个线程拥有自己的本地结果容器,会在以后合并其中的结果。


收集器

传递给 collect() 的 3 个函数(创建、填充和合并结果容器)之间的关系非常重要,所以有必要提供它自己的抽象 Collectorcollect() 的相应简化版本。字符串串联示例可重写为:

String concat = strings.stream().collect(Collectors.joining());

收集到结果集的示例可重写为:

Set<String> uniqueStrings = strings.stream().collect(Collectors.toSet());

Collectors 类包含许多常见聚合操作的因素,比如累加到集合中、字符串串联、缩减和其他汇总计算,以及创建汇总表(通过 groupingBy())。表 1 包含部分内置收集器的列表,而且如果它们不够用,编写您自己的收集器也很容易(请参阅 “自定义收集器” 部分)。

表 1. 内置收集器
收集器行为
toList() 将元素收集到一个 List 中。
toSet() 将元素收集到一个 Set 中。
toCollection(Supplier<Collection>)将元素收集到一个特定类型的 Collection 中。
toMap(Function<T, K>, Function<T, V>)将元素收集到一个 Map 中,依据提供的映射函数将元素转换为键值。
summingInt(ToIntFunction<T>)计算将提供的 int 值映射函数应用于每个元素(以及 longdouble 版本)的结果的总和。
summarizingInt(ToIntFunction<T>)计算将提供的 int 值映射函数应用于每个元素(以及 longdouble 版本)的结果的 summinmaxcountaverage
reducing() 向元素应用缩减(通常用作下游收集器,比如用于 groupingBy)(各种版本)。
partitioningBy(Predicate<T>)将元素分为两组:为其保留了提供的预期的组和未保留预期的组。
partitioningBy(Predicate<T>, Collector)将元素分区,使用指定的下游收集器处理每个分区。
groupingBy(Function<T,U>) 将元素分组到一个 Map 中,其中的键是所提供的应用于流元素的函数,值是共享该键的元素列表。
groupingBy(Function<T,U>, Collector)将元素分组,使用指定的下游收集器来处理与每个组有关联的值。
minBy(BinaryOperator<T>) 计算元素的最小值(与 maxBy() 相同)。
mapping(Function<T,U>, Collector)将提供的映射函数应用于每个元素,并使用指定的下游收集器(通常用作下游收集器本身,比如用于 groupingBy)进行处理。
joining() 假设元素为 String 类型,将这些元素联结到一个字符串中(或许使用分隔符、前缀和后缀)。
counting() 计算元素数量。(通常用作下游收集器。)

将收集器函数分组到 Collector 抽象中在语法上更简单,但实际收益来自您开始将收集器组合在一起时,比如您想要创建复杂的汇总结果(比如 groupingBy() 收集器创建的摘要)的时候,该收集器依据来自元素的一个键将元素收集到 Map 中。例如,要创建超过 1000 美元的交易的 Map,可以使用卖家作为键:

Map<Seller, List<Txn>> bigTxnsBySeller =     txns.stream()         .filter(t -> t.getAmount() > 1000)         .collect(groupingBy(Txn::getSeller));

但是,假设您不想要每个卖家的交易 List,而想要来自每个卖家的最大交易。您仍希望使用卖家作为结果的键,但您希望进一步处理与卖家关联的交易,以便将它缩减为最大的交易。可以使用 groupingBy() 的替代版本,无需将每个键的元素收集到列表中,而是将它们提供给另一个收集器(downstream 收集器)。对于下游收集器,您可以选择 maxBy() 等缩减方法:

Map<Seller, Txn> biggestTxnBySeller =     txns.stream()         .collect(groupingBy(Txn::getSeller,                             maxBy(comparing(Txn::getAmount))));

在这里,您将交易分组到以卖家作为键的映射中,但该映射的值是使用 maxBy() 收集器收集该卖家的所有销售的结果。如果您不想要该卖家的最大交易,而想要总和,可以使用 summingInt() 收集器:

Map<Seller, Integer> salesBySeller =     txns.stream()          .collect(groupingBy(Txn::getSeller,                             summingInt(Txn::getAmount)));

要获得多级汇总结果,比如每个区域和卖家的销售,可以使用另一个 groupingBy 收集器作为下游收集器:

Map<Region, Map<Seller, Integer>> salesByRegionAndSeller =     txns.stream()         .collect(groupingBy(Txn::getRegion,                             groupingBy(Txn::getSeller,                                         summingInt(Txn::getAmount))));

举一个不同领域的例子:要计算一个文档中的词频直方图,可以使用 BufferedReader.lines() 将文档拆分为行,使用 Pattern.splitAsStream() 将它分解为一个单词流,然后使用 collect()groupingBy() 创建一个 Map,后者的键是单词,值是这些单词的数量,如清单 3 所示。

清单 3. 使用 Streams 计算单词数量直方图
    Pattern whitespace = Pattern.compile("//s+");     Map<String, Integer> wordFrequencies =         reader.lines()               .flatMap(s -> whitespace.splitAsStream())               .collect(groupingBy(String::toLowerCase),                                   Collectors.counting());

自定义收集器

尽管 JDK 提供的标准的收集器集合非常大,但编写您自己的收集器非常容易。Collector 接口(如清单 4 所示)非常简单。该接口通过 3 种类型来实现参数化:输入类型 T、累加器类型 A 和最终的返回类型 RAR 通常是相同的),这些方法返回的函数与之前演示的 collect() 3 参数版本所接受的函数类似。

清单 4. Collector 接口
public interface Collector<T, A, R> {     /** Return a function that creates a new empty result container */     Supplier<A> supplier();      /** Return a function that incorporates an element into a container */     BiConsumer<A, T> accumulator();      /** Return a function that merges two result containers */     BinaryOperator<A> combiner();      /** Return a function that converts the intermediate result container         into the final representation */     Function<A, R> finisher();      /** Special characteristics of this collector */     Set<Characteristics> characteristics(); }

Collectors 中的大部分收集器工厂的实现都很简单。例如,toList() 的实现是:

return new CollectorImpl<>(ArrayList::new,                            List::add,                            (left, right) -> { left.addAll(right); return left; },                            CH_ID);

此实现使用 ArrayList 作为结果容器,使用 add() 合并一个元素,并使用 addAll() 将一个列表合并到另一个中,通过这些特征表明它的完成函数是身份函数(这使得流框架可以优化执行)。

与之前看到的一样,一些一致性要求与缩减中的身份和累加器函数之间的限制类似。这些要求已在 Collector 的规范中列出。

作为一个更复杂的示例,可以考虑在数据集上创建汇总统计数据的问题。很容易使用缩减来计算数字数据集的总和、最小值、最大值或数量(而且您可以使用总和和数量来计算平均值)。在数据上,使用缩减在一轮计算中一次性计算所有这些结果更加困难。但您可以轻松地编写一个 Collector 来高效地(如果愿意,还可并行地)执行此计算。

Collectors 类包含一个 collectingInt() 工厂方法,该方法返回一个 IntSummaryStatistics,后者会执行您想要的准确操作,比如在一轮计算中计算 summinmaxcountaverageIntSummaryStatistics 的实现很简单,而且您可以轻松地编写自己的类似收集器来计算任意数据汇总结果(或扩展此结果)。

清单 5 显示了 IntSummaryStatistics 类。实际实现包含更多细节(包含用于获取汇总统计数据的 getter),但它的核心是简单的 accept()combine() 方法。

清单 5. summarizingInt() 收集器使用的 IntSummaryStatistics
    public class IntSummaryStatistics implements IntConsumer {         private long count;         private long sum;         private int min = Integer.MAX_VALUE;         private int max = Integer.MIN_VALUE;          public void accept(int value) {             ++count;             sum += value;             min = Math.min(min, value);             max = Math.max(max, value);         }          public void combine(IntSummaryStatistics other) {             count += other.count;             sum += other.sum;             min = Math.min(min, other.min);             max = Math.max(max, other.max);         }          // plus getters for count, sum, min, max, and average     }

如您所见,这是一个非常简单的类。在观察每个新数据元素时,会以各种方式更新各种汇总结果,而且会以各种方式组合两个 IntSummaryStatistics 持有者。Collectors.summarizingInt() 的实现(如清单 6 所示)同样很简单;它创建了一个 Collector,以便通过应用一个整数值来提取器函数,并将结果传递给 IntSummaryStatistics.accept() 来合并一个元素。

清单 6. summarizingInt() 收集器工厂
    public static <T>     Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {         return new CollectorImpl<T, IntSummaryStatistics, IntSummaryStatistics>(                 IntSummaryStatistics::new,                 (r, t) -> r.accept(mapper.applyAsInt(t)),                 (l, r) -> { l.combine(r); return l; },                 CH_ID);     }

组合收集器的容易性(您在 groupingBy() 示例中已看到)和创建新收集器的容易性相结合,可以创建流数据的几乎任何汇总结果,同时保持您的代码紧凑而又清晰。


第 2 部分的小结

聚合工具是 Streams 库的最有用和灵活的部分之一。可以使用缩减操作来轻松地按顺序或并行聚合简单的值;更复杂的汇总结果可通过 collect() 创建。该库附带了一组简单的基本收集器,可以组合它们来执行更复杂的聚合,而且您可以轻松地将自己的收集器添加到组合中。

第 3 部分 中,将深入剖析 Streams 的内部结构,以便了解在性能至关重要时如何最高效地使用该库。


正文到此结束
Loading...