转载

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

Flink的网络堆栈是组成flink-runtime模块的核心组件之一,是每个Flink Job的核心。 它连接所有TaskManagers的各个工作单元(子任务)。 这是流式传输数据流经的地方,因此,它对于Flink作业的性能(吞吐量和观察到的延迟)至关重要。 与通过Akka使用RPC的TaskManagers和JobManagers之间的协调通道相比,TaskManagers之间的网络堆栈依赖于更底层的Netty API。

这篇博文是关于网络堆栈的一系列帖子中的第一篇。 在下面的部分中,我们将首先深入了解流操作符所呈现的抽象,然后详细介绍Flink的物理实现和各种优化。 我们将简要介绍这些优化的结果以及Flink在吞吐量和延迟之间的权衡。 本系列中的未来博客文章将详细介绍监控和指标,调整参数和常见的反模式。

Logical View

Flink的网络堆栈在相互通信时为子任务提供以下逻辑视图,例如在keyBy()要求的网络混洗期间:

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

它抽象了以下三个概念的不同设置:

  • Subtask output type ( ResultPartitionType ):    

    • pipelined (bounded or unbounded): 一旦产生数据,就可以一个接一个地向下游发送数据,作为有界或无限的记录流。

    • blocking: 仅在生成完整结果时向下游发送数据。

  • Scheduling type:

    • all at once (eager): 同时部署作业的所有子任务(对于流应用程序).

    • next stage on first output (lazy): 一旦任何生产者生成输出,就立即部署下游任务。

    • next stage on complete output: 当任何或所有生产者生成完整的输出集时,部署下游任务。

  • Transport:

    • high throughput: Flink不是一个一个地发送每个记录,而是将一堆记录缓冲到其网络缓冲区中并完全发送它们。 这降低了每个记录的成本并导致更高的吞吐量。

    • low latency via buffer timeout: 通过减少发送未完全填充的缓冲区的超时,您可能会牺牲吞吐量来延迟。

我们将在下面的部分中查看吞吐量和低延迟优化,这些部分将查看网络堆栈的物理层。 对于这一部分,让我们详细说明输出和调度类型。 首先,重要的是要知道子任务输出类型和调度类型是紧密交织在一起的,只能使两者的特定组合有效。

流水线结果分区是流式输出,需要实时目标子任务才能发送数据。 可以在生成结果之前或首次输出时安排目标。 批处理作业生成有界结果分区,而流式处理作业产生无限结果。

批处理作业也可能以阻塞方式产生结果,具体取决于所使用的运算符和连接模式。 在这种情况下,必须先生成完整的结果,然后才能安排接收任务。 这允许批处理作业更有效地工作并且资源使用更少。

下表总结了有效组合:

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

1 目前Flink尚未使用。

2 批量/流式统一完成后,这可能适用于流式作业。

此外,对于具有多个输入的子任务,调度以两种方式启动:毕竟或在任何输入生成器生成记录/其完整数据集之后。要调整批处理作业中的输出类型和调度决策,请查看ExecutionConfig #setExecutionMode() - 特别是ExecutionMode - 以及ExecutionConfig #setDefaultInputDependencyConstraint()。

Physical Transport

为了理解物理数据连接,请回想一下,在Flink中,不同的任务可以通过插槽共享组共享相同的插槽。 TaskManagers还可以提供多个插槽,以允许将同一任务的多个子任务安排到同一个TaskManager上。

对于下图所示的示例,我们将假设4的并行性和具有两个任务管理器的部署,每个任务管理器提供2个插槽。 TaskManager 1执行子任务A.1,A.2,B.1和B.2,TaskManager 2执行子任务A.3,A.4,B.3和B.4。在任务A和任务B之间的随机类型连接中,例如从keyBy(),在每个TaskManager上有2x4个逻辑连接,其中一些是本地的,一些是远程的:

B.1 B.2 B.3 B.4
A.1 local remote
A.2
A.3 remote local
A.4

不同任务之间的每个(远程)网络连接将在Flink的网络堆栈中获得自己的TCP通道。 但是,如果同一任务的不同子任务被安排到同一个TaskManager,则它们与同一个TaskManager的网络连接将被多路复用并共享一个TCP信道以减少资源使用。 在我们的例子中,这适用于A.1→B.3,A.1→B.4,以及A.2→B.3和A.2→B.4,如下图所示:

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

每个子任务的结果称为ResultPartition,每个子结果分成单独的ResultSubpartitions - 每个逻辑通道一个。 在堆栈的这一点上,Flink不再处理单个记录,而是将一组序列化记录组装到网络缓冲区中。 每个子任务可用于其自己的本地缓冲池中的缓冲区数量(每个发送方和接收方各一个)最多限制为一个

单个TaskManager上的缓冲区总数通常不需要配置。 有关如何在需要时执行此操作的详细信息,请参阅配置网络缓冲区文档。

Inflicting Backpressure (1)

每当子任务的发送缓冲池耗尽时 - 缓冲区驻留在结果子分区的缓冲区队列中或位于较低的Netty支持的网络堆栈内 - 生产者被阻止,无法继续,并且经历反压。 接收器以类似的方式工作:较低网络堆栈中的任何传入Netty缓冲区需要通过网络缓冲区提供给Flink。 如果相应子任务的缓冲池中没有可用的网络缓冲区,Flink将停止从该通道读取,直到缓冲区可用。 这将有效地反压该多路复用上的所有发送子任务,因此也限制其他接收子任务。 下图说明了过载的子任务B.4,它会导致多路复用的背压,并且即使它仍然具有容量,也会阻止子任务B.3接收和处理其他缓冲区。

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

为了防止这种情况发生,Flink 1.5引入了自己的流量控制机制。

Credit-based Flow Control

基于信用的流量控制可确保“线上”的任何内容都具有接收器处理能力。它基于网络缓冲区的可用性,作为Flink之前机制的自然延伸。每个远程输入通道现在都有自己的一组独占缓冲区,而不是只有一个共享的本地缓冲池。相反,本地缓冲池中的缓冲区称为浮动缓冲区,因为它们会浮动并且可用于每个输入通道。

接收方将宣布缓冲区的可用性作为发送方的信用(1缓冲区= 1个信用)。每个结果子分区将跟踪其渠道信用。如果信用可用,则缓冲区仅转发到较低的网络堆栈,并且每个发送的缓冲区将信用分数降低一。除了缓冲区之外,我们还发送有关当前积压大小的信息,该大小指定在此子分区的队列中等待的缓冲区数量。接收器将使用它来请求适当数量的浮动缓冲区,以便更快地进行积压处理。它将尝试获取与积压大小一样多的浮动缓冲区,但这可能并不总是可行的,我们可能会得到一些或根本没有缓冲区。接收器将使用检索到的缓冲区,并将监听可用于继续的其他缓冲区。

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

基于信用的流控制将使用每通道缓冲区来指定本地缓冲池(可选3)的独占(强制)缓冲区数和每个浮动缓冲区数,从而实现与没有流控制相同的缓冲区限制。 选择这两个参数的默认值,使得流量控制的最大(理论)吞吐量至少与没有流量控制一样好,给定具有通常延迟的健康网络。 您可能需要根据实际的往返时间和带宽来调整这些。

3如果没有足够的缓冲区,每个缓冲池将获得全局可用缓冲池的相同份额(±1)。

Inflicting Backpressure (2)

与没有流量控制的接收器的背压机制相反,信用提供了更直接的控制:如果接收器无法跟上,其可用信用最终将达到0并阻止发送方将缓冲区转发到较低网络堆栈。 仅在此逻辑信道上存在背压,并且不需要阻止从多路复用TCP信道读取。 因此,其他接收器在处理可用缓冲器时不受影响。

What do we Gain? Where is the Catch?

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

由于通过流控制,多路复用中的信道不能阻塞其另一个逻辑信道,因此整体资源利用率应该增加。此外,通过完全控制“在线”数据的数量,我们还能够改进检查点对齐:如果没有流量控制,通道需要一段时间才能填满网络堆栈的内部缓冲区并传播接收器不再读了。在那段时间里,可以坐着很多缓冲区。任何检查点障碍都必须在这些缓冲区后排队,因此必须等到所有这些缓冲区都可以启动之前(“障碍永远不会超过记录!”)。

但是,来自接收方的附加通告消息可能会产生一些额外费用,尤其是在使用SSL加密通道的设置中。此外,单个输入通道不能使用缓冲池中的所有缓冲区,因为不共享独占缓冲区。它也不能立即开始发送尽可能多的数据,以便在加速期间(如果您生成数据的速度快于宣布信用额),则可能需要更长时间才能发送数据。虽然这可能会影响您的工作绩效,但由于其所有优点,通常更好地进行流量控制。您可能希望通过每个通道的缓冲区增加独占缓冲区的数量,但代价是使用更多内存。然而,与先前实现相比,总体内存使用可能仍然较低,因为较低的网络堆栈不再需要缓冲大量数据,因为我们总是可以立即将其传输到Flink。

在使用基于信用的流量控制时,您还可以注意到另一件事:由于我们在发送方和接收方之间缓冲较少的数据,您可能会更早地遇到背压。但是,这是期望的,并且您通过缓冲更多数据并没有真正获得任何优势。如果要缓冲更多但保持流量控制,可以考虑通过浮动缓冲区每个门增加浮动缓冲区的数量。

Advantages Disadvantages

• 多路复用连接中的数据偏差可以提高资源利用率

• 改进了检查点对齐

• 减少内存使用(较低网络层中的数据较少)

• 额外的信用通知消息

• 额外的积压 - 宣布消息(捎带缓冲消息,几乎没有开销)

• 潜在的往返延迟

• 背压出现得更早

NOTE: 如果您需要关闭基于信用的流量控制,可以将其添加到flink-conf.yaml:taskmanager.network.credit-model:false。 但是,此参数已弃用,最终将与非基于信用的流控制代码一起删除。

Writing Records into Network Buffers and Reading them again

下面的图片从上面扩展了更高级别的视图,其中包含网络堆栈及其周围组件的更多详细信息,从发送操作员中的记录集合到接收操作员获取它:

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

在创建记录并传递它之后,例如通过Collector #colle(),它被赋予RecordWriter,它将来自Java对象的记录序列化为一个字节序列,最终在一个网络缓冲区中被传递。如上所述。 RecordWriter首先使用SpanningRecordSerializer将记录序列化为灵活的堆上字节数组。然后,它尝试将这些字节写入目标网络通道的关联网络缓冲区。我们将在下面的部分回到最后一部分。

在接收方,下层网络堆栈(netty)将接收到的缓冲区写入适当的输入通道。 (stream)任务的线程最终从这些队列中读取并尝试在RecordReader的帮助下将累积的字节反序列化为Java对象并通过SpillingAdaptiveSpanningRecordDeserializer。与序列化器类似,此解串器还必须处理特殊情况,例如跨越多个网络缓冲区的记录,因为记录只是大于网络缓冲区(默认为32KiB,通过taskmanager.memory.segment-size设置)或者因为序列化记录被添加到没有足够剩余字节的网络缓冲区。然而,Flink将使用这些字节并继续将其余字节写入新的网络缓冲区。

Flushing Buffers to Netty

在上图中,基于信用的流控制机制实际上位于“Netty Server”(和“Netty Client”)组件内部,RecordWriter写入的缓冲区始终以空状态添加到结果子分区中,然后逐渐 填写(序列化)记录。 但是什么时候Netty真的得到了缓冲区呢? 显然,只要它们变得可用就不能占用字节,因为这不仅会因跨线程通信和同步而增加大量成本,而且还会使整个缓冲过时。

在Flink中,有三种情况使Netty服务器可以使用缓冲区:

  • 写入记录时缓冲区变满,或者

  • 缓冲区超时命中,或

  • 发送特殊事件,例如检查点障碍。

Flush after Buffer Full

RecordWriter与本地序列化缓冲区一起使用当前记录,并将这些字节逐渐写入位于相应结果子分区队列的一个或多个网络缓冲区。虽然RecordWriter可以处理多个子分区,但每个子分区只有一个RecordWriter向其写入数据。另一方面,Netty服务器正在从多个结果子分区读取并将适当的分区复用到单个信道中,如上所述。这是一个典型的生产者 - 消费者模式,网络缓冲区位于中间,如下图所示。在(1)序列化和(2)将数据写入缓冲区之后,RecordWriter相应地更新缓冲区的写入器索引。一旦缓冲区被完全填满,记录编写器将(3)从其本地缓冲池中获取当前记录的任何剩余字节 - 或下一个 - 的新缓冲区 - 并将新的缓冲区添加到子分区队列。这将(4)通知Netty服务器可用的数据,如果它还不知道4。每当Netty有能力处理此通知时,它将(5)获取缓冲区并沿适当的TCP通道发送它。 深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

4如果队列中有更多已完成的缓冲区,我们可以假设它已经收到通知。

Flush after Buffer Timeout

为了支持低延迟用例,我们不仅可以依赖缓冲区已满,以便向下游发送数据。可能存在某种通信信道没有太多记录流过并且不必要地增加实际拥有的少量记录的延迟的情况。因此,定期进程将刷新堆栈中可用的任何数据:输出刷新器。可以通过StreamExecutionEnvironment#setBufferTimeout配置周期性间隔,并充当延迟5的上限(对于低吞吐量通道)。下图显示了它与其他组件的交互方式:RecordWriter如前所述串行化并写入网络缓冲区,但同时,如果Netty尚未知晓,输出刷新器可以(3,4)通知Netty服务器可用的数据(类似)到上面的“缓冲区已满”场景)。当Netty处理此通知(5)时,它将使用缓冲区中的可用数据并更新缓冲区的读取器索引。缓冲区保留在队列中 - 从Netty服务器端对此缓冲区的任何进一步操作将在下次继续读取读取器索引。

深入理解Flink的网络栈:A Deep-Dive into Flink's Network Stack

5严格来说,输出刷新器不提供任何保证 - 它只向Netty发送通知,可以随意接收它。 这也意味着如果通道是背压,则输出冲洗器无效。

Flush after special event

如果通过RecordWriter发送,某些特殊事件也会触发立即刷新。 最重要的是检查点障碍或分区结束事件,显然应该快速进行,而不是等待输出冲洗器启动。

与Flink <1.5相反,请注意(a)网络缓冲区现在直接放在子分区队列中,(b)我们没有在每次刷新时关闭缓冲区。 这给我们带来了一些好处:

  • 同步开销较少(输出刷新和RecordWriter是独立的)

  • 在高负荷情况下,Netty是瓶颈(通过背压或直接),我们仍然可以在不完整的缓冲区中累积数据

  • Netty通知显着减少

但是,在低负载情况下,您可能会注意到CPU使用率和TCP数据包速率的增加。 这是因为,通过更改,Flink将使用任何可用的CPU周期来尝试维持所需的延迟。 一旦负载增加,这将通过填充更多的缓冲区进行自我调整。 由于同步开销减少,高负载方案不会受到影响,甚至可以获得更好的吞吐量。

深入理解Flink的网络栈:A Deep-Dive into Flink&#39;s Network Stack

Buffer Builder & Buffer Consumer

如果您想更深入地了解如何在Flink中实现生产者 - 消费者机制,请仔细查看Flink 1.5中引入的BufferBuilder和BufferConsumer类。虽然可能只对每个缓冲区进行读取,但写入它是按记录进行的,因此是Flink中所有网络通信的热路径。因此,我们非常清楚我们需要在任务的线程和Netty线程之间建立轻量级连接,这并不意味着过多的同步开销。有关详细信息,我们建议您查看源代码。

Latency vs. Throughput

引入网络缓冲区以获得更高的资源利用率和更高的吞吐量,代价是让一些记录在缓冲区中等待一段时间。虽然可以通过缓冲区超时给出此等待时间的上限,但您可能很想知道有关这两个维度之间权衡的更多信息:延迟和吞吐量,显然,您无法同时获得这两者。下图显示了缓冲区超时的各种值,从0开始(每个记录刷新)到100毫秒(默认值),并显示在具有100个节点和8个插槽的群集上生成的吞吐率,每个节点运行没有业务逻辑的作业因此只测试网络堆栈。为了进行比较,我们还在添加低延迟改进(如上所述)之前绘制Flink 1.4。

深入理解Flink的网络栈:A Deep-Dive into Flink&#39;s Network Stack

正如您所看到的,使用Flink 1.5+,即使是非常低的缓冲区超时(例如1ms)(对于低延迟方案)也提供高达默认超时的75%的最大吞吐量,其中在通过线路发送之前缓冲更多数据。

Conclusion

现在您了解结果分区,批处理和流式传输的不同网络连接和调度类型。 您还了解基于信用的流量控制以及网络堆栈如何在内部工作,以便推断与网络相关的调整参数和某些工作行为。 本系列中的未来博客文章将基于这些知识,并进入更多操作细节,包括要查看的相关指标,进一步的网络堆栈调整以及要避免的常见反模式。 请继续关注系列后续文章。

深入理解Flink的网络栈:A Deep-Dive into Flink&#39;s Network Stack

深入理解Flink的网络栈:A Deep-Dive into Flink&#39;s Network Stack

深入理解Flink的网络栈:A Deep-Dive into Flink&#39;s Network Stack

原文  http://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng==&mid=2651217420&idx=1&sn=44decfce5ed1ba02a62f0e320f4a3a41
正文到此结束
Loading...