转载

StreamOperator源码简析

点击上方蓝

字关注~

StreamOperator是任务执行过程中实际处理类,上层由StreamTask调用,下层调用UserFunction,列举一些常见的StreamOperator

  • env.addSource对应StreamSource

  • dataStream.map 对应StreamMap

  • dataStrem.window对应WindowOperator

  • dataStream.addSink对应StreamSink

  • dataStream.keyBy(..).process对应KeyedProcessOperator

StreamOperator涉及数据处理、checkpoint、状态存储、定时调用等,本篇幅将从源码角度分析StreamOperator所涉及的核心调用流程。

StreamOperator层级结构

StreamOperator源码简析

最顶层是一个StreamOperator的接口,定义了其生命周期一些方法,继承接口如下:

  • CheckpointListener接口,notifyCheckpointComplete表示checkpoint完成后的回调方法

  • KeyContext接口,用于当前key的切换,使用在KeyedStream中state的key设置

  • Disposable接口,dispose方法定义了资源释放

  • Serializable序列化接口

AbstractStreamOperator是StreamOperator的基础抽象实现类,所有的operator都必须继承该抽象类;

AbstractUdfStreamOperator 是继承AbstractStreamOperator的抽象实现类,其内部包含了userFunction, 在Task的生命周期都会调用userFunction中对应的方法;

OneInputStreamOperator、TwoInputStreamOperator都是继承StreamOperator的接口,分别表示处理一个输入、两个输入的Operator,包含了processElement/processWatermark/processLatencyMarker方法;

  • OneInputStreamOperator实现类StreamMap、WindowOperator、KeyedProcessOperator等单流入处理operator

  • TwoInputStreamOperator实现类CoStreamMap、KeyedCoProcessOperator、IntervalJoinOperator等多流处理operator

StreamSource表示的source端的operator,其既没有实现OneInputStreamOperator接口也没有实现TwoInputStreamOperator接口,由于其为流处理的源头,不需要接受输入

AbstractStreamOperator/AbstractUdfStreamOperator分析

AbstractStreamOperator是所有operator的基础抽象类,而AbstractUdfStreamOperator则是面向userFunction调用,接下来分析一下这两个类,其大部分方法都是由StreamTask触发调用,用于初始化或者资源释放等操作,以StreamTask.invoke方法为入口来分析里面的方法:

  • initializeState状态初始化,会调用到StreamOperator的initializeState方法,初始化operatorStateBackend/keyedStateBackend状态后端,定时器恢复初始化,对于KeyedState来说会自动初始化恢复,但是operatorState则需要手动初始化恢复,所以在其继承的AbstractUdfStreamOperator会调用userFunction的initializeState方法,前提是该userFunction需要实现CheckpointedFunction接口;

  • open初始化方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写open方法完成一些用户初始化工作,例如创建资源链接

  • run方法,在任务正常情况下一直执行的方法,根据收到的不同数据类型调用AbstractStreamOperator不同的方法

    1. 如果是watermark,会调用其processWatermark方法,在该方法里面做一些定时触发的判断与调用

    2. 如果是LatencyMarker,其表示的是一个延时标记,同于统计数据从source到下游operator耗时,会调用 processLatencyMarker方法,在该方法里面会上报Histogram类型的metric, 在默认情况下该功能是关闭的

    3. 如果是StreamRecord,也就是处理的业务数据,首先会调用setKeyContextElement方法,用于切换 KeyedStream类型的的statebackend的当前key, 然后调用processElement具体的数据处理流程

    4. 如果是CheckpointBarrier,表示的是需要checkpoint,首先会调用prepareSnapshotPreBarrier方法,在AbstractStreamOperator中是一个空实现doNothing,然后调用snapshotState方法,在AbstractUdfStreamOperator会调用userFunction的snapshotState方法,前提是该userFunction需要实现CheckpointedFunction接口;

  • close方法,任务正常结束调用方法,在AbstractStreamOperator中是一个空的实现,通常可以在userFunction重写close方法完成一些资源释放;

  • dispose方法,任务正常结束或者异常结束调用的方法,如果是异常结束那么就会调用到close方法,正常结束不会重复调用,在dispose里面完成一些状态最终资源的释放;

其他方法:

  • setup方法,初始化做一些参数配置

  • notifyCheckpointComplete方法,在checkpoint完成时调用的方法,面向用户实现的userFunction需要实现CheckpointListener接口

推荐阅读

1.  Flink中延时调用设计与实现

2. F link维表关联系列之Hbase维表关联:LRU策略

3.  你应该了解的Watermark

4. Flink exactly-once系列之事务性输出实现

5. F link时间系统系列之实例讲解:如何做定时输出

6.  Flink实战:全局TopN分析与实现

7.  Flink per-Job模式InfluxdbReporter上报JobName

原文  http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA==&mid=2247484052&idx=1&sn=33f53ec7f42e3d89e0a354e40d39b306
正文到此结束
Loading...