Flink高级API开发
Flink
高级
API
开发
第三章 Flink 高级API开发
课程目标
l 掌握Flink的Time分类及各自作用
l 掌握Flink的Window操作及原理
l 掌握Flink的State操作及原理
l 掌握Flink的Checkpoint操作及原理
l 了解Flink的任务链
1. Flink的Window操作
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。
1.1 为什么需要Window
在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。
Windows 是处理无限流的核心。Windows 将流拆分为有限大小的“桶”,我们可以对其进行计算。
1.2 Flink窗口应用代码结构
Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理。本文将介绍如何在Flink上进行窗口的计算。
一个Flink窗口应用的大致骨架结构如下所示:
l Keyed Window
// Keyed Window
stream
.keyBy(...) <- 按照一个Key进行分组
.window(...) <- 将数据流中的元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
l Non-Keyed Window
// Non-Keyed Window
stream
.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
在上面,方括号 ([…]) 中的命令是可选的。这表明 Flink 允许您以多种不同的方式自定义窗口逻辑,使其最适合您的需求。
首先:我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。决定是否分组之后,窗口的后续操作基本相同,经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。
1.3 Window类型和概念
Window可以分成两类:
l CountWindow:按照指定的数据条数生成一个Window,与时间无关。
n 滚动计数窗口,每隔N条数据,统计前N条数据
n 滑动计数窗口,每隔N条数据,统计前M条数据
l TimeWindow:按照时间生成Window。
n 滚动时间窗口,每隔N时间,统计前N时间范围内的数据,窗口长度N,滑动距离N
n 滑动时间窗口,每隔N时间,统计前M时间范围内的数据,窗口长度M,滑动距离N
n 会话窗口,按照会话划定的窗口
1.3.1 滚动窗口 - TumblingWindow概念
流是连续的,无界的(有明确的开始,无明确的结束)
假设有个红绿灯,提出个问题:计算一下通过这个路口的汽车数量
对于这个问题,肯定是无法回答的,为何?
因为,统计是一种对固定数据进行计算的动作。
因为流的数据是源源不断的,无法满足固定数据的要求(因为不知道何时结束)
那么,我们换个问题:统计1分钟内通过的汽车数量
那么,对于这个问题,我们就可以解答了。因为这个问题确定了数据的边界,从无界的流数据中,取出了一部分有边界的数据子集合进行计算。
那么,这个行为或者说这个统计的数据边界,就称之为窗口。
同时,我们的问题,是以时间来划分被处理的数据边界的,那么按照时间划分边界的就称之为:时间窗口
反之,如果换个问题,统计100辆通过的车里面有多少宝马品牌,那么这个边界的划分就是按照数量的,这样的称之为:计数窗口
同时,这样的窗口被称之为 滚动窗口,按照窗口划分依据分为:滚动时间窗口、滚动计数窗口
1.3.2 滑动窗口 – SlidingWindow概念
同样是需求,改为:
每隔1分钟,统计前面2分钟内通过的车辆数
对于这个需求我们可以看出,窗口长度是2分钟,每隔1分钟统计一次
或者:每通过100辆车,统计前面通过的50辆车的品牌占比
对于这个需求可以看出,窗口长度是50辆车,但是每隔100辆车统计一次
对于这样的窗口,我们称之为滑动窗口
那么在这里面,统计多少数据是窗口长度(如统计2分钟内的数据,统计50辆车中的数据)
隔多久统计一次称之为滑动距离(如,每隔1分钟,每隔100辆车)
那么可以看出,滑动窗口,就是滑动距离 不等于 窗口长度的一种窗口
比如,每隔1分钟 统计先前5分钟的数据,窗口长度5分钟,滑动距离1分钟,不相等
比如,每隔100条数据,统计先前50条数据,窗口长度50条,滑动距离100条,不相等
那如果相等呢?相等就是比如:每隔1分钟统计前面1分钟的数据,窗口长度1分钟,滑动距离1分钟,相等。
对于这样的需求可以简化成:每隔1分钟统计一次数据,这就是前面说的滚动窗口咯
那么,我们可以看出:
滚动窗口: 窗口长度 = 滑动距离
滑动窗口: 窗口长度 != 滑动距离
其中可以发现,对于滑动窗口:
滑动距离 > 窗口长度, 会漏掉数据,比如:每隔5分钟,统计前面1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟的数据)
滑动距离 < 窗口长度, 会重复处理数据,比如:每隔1分钟,统计前面5分钟的数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟的数据)
滑动距离 = 窗口长度, 不漏也不会重复,也就是滚动窗口
1.4 Time - Flink的三种时间语义
我们抛开计数窗口,先看时间窗口
对于时间窗口最主要的就是时间,比如1分钟的窗口长度,那么这个1分钟是如何定义呢?
Flink中针对时间有3种类型
l EventTime[事件时间]
事件发生的时间,例如:点击网站上的某个链接的时间
l IngestionTime[摄入时间]
某个Flink节点的source operator接收到数据的时间,例如:某个source消费到kafka中的数据
l ProcessingTime[处理时间]
某个Flink节点执行某个operation的时间,例如:timeWindow接收到数据的时间
l 事件时间 event time
事件真实发生的时间。Flink1.12版本起默认事件时间。
xxx.window(TumblingEventTimeWindows)
l 处理时间 process time
Flink处理start-log中这条数据时的设备时间。Flink1.12之前默认处理时间。
xxx.window(TumblingProcessingTimeWindows)
Flink1.12版本之前,如何指定为事件时间呢?
// 设置按照事件时间来进行计算env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
1.4.1 Event Time
在大数据领域,日志服务器生成的一条数据也可以称为一个事件。Event Time是指在数据产生时该设备上对应的时间,这个时间在进入Flink之前已经存在于数据记录中了。以后数据被Flink处理数据,如果使用Event Time作为时间标准,那么数据并不是按照Event Time的先后顺序被处理的,由于数据可能产生在多个不同的日志服务器,然后通常是再将数据写入到分布性消息中间件,然后被被Flink拉取进行处理时,处理的实际时间相对于数据产生的实际肯定有一定的延迟,并且Event Time可能也是乱序的。那么为什么还要使用Event Time呢?是因为使用Event Time时,Flink程序可以处理乱序事件和延迟数据。并且最重要的功能就是可以统计在数据产生时,对应时间的数据指标。
总之,使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。
1.4.2 Processing Time
Processing Time是指事件数据被Operator处理时所在机器的系统时间,它提供了最好的性能和最低的延迟。但是,Flink是一个在分布式的计算框架,数据从产生到被处理会有一定的延迟(例如从消息队列拉取数据到Source,Source再到处理的Operator会有一定的延迟),所以Processing Time无法精准的体现出数据在产生的那个时刻的变化情况。
1.4.3 Ingestion Time
Ingestion Time指的是事件数据进入到Flink的时间。每条数据的Ingestion Time就是进入到Source Operator时所在机器的系统时间。比如Flink从Kafka消息中间件消费数据,每一条数据的Ingestion Time就是FlinkKafkaConsumer拉取数据进入到TaskManager对应的时间。Ingestion Time介于Event Time和Processing Time之间,与 Event Time 相比,Ingestion Time程序无法处理任何无序事件或延迟数据,并且程序不必指定如何生成水,Flink会自动分配时间戳和自动生成水位线。
1.5 窗口的使用
1.5.1 滚动窗口
滚动窗口下窗口之间之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time中的seconds、minutes、hours和days来设置。
下面的代码展示了如何使用滚动窗口。代码中最后一个例子,我们在固定长度的基础上设置了偏移(Offset)。默认情况下,时间窗口会做一个对齐,比如设置一个一小时的窗口,那么窗口的起止时间是[0:00:00.000 - 0:59:59.999)。如果设置了Offset,那么窗口的起止时间将变为[0:15:00.000 - 1:14:59.999)。Offset可以用在全球不同时区设置上,如果系统时间基于格林威治标准时间(UTC-0),中国的当地时间可以设置offset为Time.hours(-8)。
DataStream<T> input = ...
// 基于Event Time的滚动窗口
input
.keyBy(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)
// 基于Processing Time的滚动窗口
input
.keyBy(<KeySelector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)
// 在小时级滚动窗口上设置15分钟的Offset偏移
input
.keyBy(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)
注意:
时间窗口使用的是timeWindow()也可以使用window(),比如,input.keyBy(...).timeWindow(Time.seconds(1))。timeWindow()是一种简写。
当我们在执行环境设置了TimeCharacteristic.EventTime时,Flink对应调用TumblingEventTimeWindows;如果我们基于TimeCharacteristic.ProcessingTime,Flink使用TumblingProcessingTimeWindows,但是这种方式被废弃。
1.5.2 滑动窗口
滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多快的速度来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个元素会被分配到多个窗口;Slide大于Size,有些元素可能被丢掉。
跟前面介绍的一样,我们使用Time类中的时间单位来定义Slide和Size,也可以设置offset。同样,timeWindow是一种缩写,根据执行环境中设置的时间语义来选择相应的方法初始化窗口。
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
// sliding processing-time windows
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)
1.5.3 会话窗口
会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。我们可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。
下面的代码展示了如何使用定长和可变的Session gap来建立会话窗口,其中SessionWindowTimeGapExtractor[T]的泛型T为数据流的类型,我们可以根据数据流中的元素来生成Session gap。
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// event-time session windows with dynamic gap
input
.keyBy(...)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
// determine and return session gap
}
}))
.<window function>(...)
// processing-time session windows with static gap
input
.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// processing-time session windows with dynamic gap
input
.keyBy(...)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
// determine and return session gap
}
}))
.<window function>(...)
1.6 窗口的范围
窗口的判断是按照毫秒为单位
如果窗口长度是5秒
窗口的开始: start
窗口的结束: start + 窗口长度 -1 毫秒
比如窗口长度是5秒, 从0开始
那么窗口结束是: 0 + 5000 -1 = 4999
1.6.1 窗口的关闭和触发
窗口不会一直存在,当达到某些条件后,窗口就会执行触发计算 + 关闭窗口的动作
窗口的关闭和触发是两个步骤
1. 触发窗口计算,对窗口内的数据进行计算
2. 关闭窗口,数据无法进入窗口了
这两者是同步进行的
后面会学习到先触发,然后等待一段时间后才关闭的情况,后面再说
1.6.2 窗口关闭、触发的条件
每个窗口都会有:
- 开始时间
- 结束时间
窗口的时间单位是毫秒
1.6.3 如何确定数据进入哪个窗口
开始时间 和 结束时间两者结合 决定了数据是属于哪个窗口的
数据的时间要满足:
- 大于等于开始时间
- 小于等于结束时间
如 5秒的窗口,假设窗口开始是0,结束是5000(毫秒)
那么时间1000属于这个窗口 时间6000不属于这个窗口
时间4999属于这个窗口,时间5000不属于这个窗口
1.6.4 窗口如何确定执行触发和关闭
结束时间决定了窗口何时关闭和触发计算,规则是:数据的时间 满足 大于等于 结束时间 - 1毫秒
1.6.4.1 使用处理时间(Processing Time)的情况
如果使用处理时间,那么窗口按照系统时间进行判断
如果当前系统时间,大于等于窗口的结束时间,那么这个窗口就会被关闭,并且被触发计算
比如 0 – 5000的窗口
当系统时间走到了: 大于 等于 4999就会触发窗口计算和关闭
1.6.4.2 使用事件时间(Event Time)的情况
如果使用事件时间,那么:
当新进入的一条数据,其事件时间大于等于某个窗口的结束时间,那么这个窗口被关闭并触发计算
比如:两个窗口 窗口A是0-5000,窗口B是5000-10000
当数据事件时间是大于等于4999(5000 – 1)的数据进来,会导致窗口A进行关闭和触发计算。
1.6.4.3 使用水印的情况
如果使用水印,那么:
当新进入的一条数据,其水印时间,大于等于某个窗口的结束时间,那么这个窗口被关闭并触发计算
1.6.4.4 总结
l 处理时间:通过当前系统时间决定窗口触发和关闭
当前系统时间会不停的向前走,所以这样的情况下,窗口的关闭和触发很稳定,比如5秒窗口,就每隔5秒触发一次
l 事件时间:通过进入到Flink的数据,所带的 事件时间来决定是否关闭窗口
数据如果不进入Flink,那么这个窗口就一直不会被关闭。
所以事件时间窗口的开关不稳定,取决于数据
l 水印时间:基于数据的事件时间,同样开闭不稳定,取决于数据是否到来以及到来的数据的事件时间是多少,后面学习水印机制的时候细说
1.7 Window API
1.7.1 Window API的调用方式
我们如果想要对数据加窗口可以调用以下两种方法
1.7.1.1 window方法
仅针对keyby后的流可以使用
对分流后的每个子流加窗口
如图,可见有8个快捷方法可以使用
底层是帮组我们调用的window和windowAll方法
具体根据需要使用即可
1.7.1.2 windowAll方法
使用了keyby分流后的流或者未使用keyby分流后的流,均可使用
作用是:对数据进行加窗口操作,并且会忽略是否进行了keyby分流
区别在于:
l 使用keyby分流后的流如果调用windowAll, 相当于未分流的效果, Flink会忽略分流后的各个子流,而是将全量数据一起进行窗口计算
l 而未使用keyby分流后的数据,只能调用windowAll方法,无法调用window方法
这两个方法均需要传入一个WindowAssigner对象的实例
WindowAssigner对象就是指窗口的类型具体是什么?是时间窗口还是计数窗口还是会话窗口
如图,WindowAssigner是一个抽象类,我们不能直接实例化它,一般使用它的子类
如图,这些是WindowAssigner的一些子类
我们一般常用的有:
l TumblingEventTimeWindows 滚动时间窗口, 以event时间为时间依据
实例化方式:TumblingEventTimeWindows.of(滚动窗口时间)
l TumblingProcessingTimeWindows 滚动时间窗口, 以processing时间为依据
实例化方式:TumblingProcessingTimeWindows.of()
l SlidingEventTimeWindows 滑动时间窗口, 以event时间为依据
实例化方式:SlidingEventTimeWindows of(窗口长度, 滑动距离)
l SlidingProcessingTimeWindows 滑动时间窗口, 以processing时间为依据
实例化方式:SlidingProcessingTimeWindows.of(窗口长度, 滑动距离)
l GlobalWindows 全局窗口, 滚动计数, 滑动计数均使用这个窗口来实现
实例化方式:GlobalWindows.create()
l EventTimeSessionWindows 会话时间窗口, 以event时间为依据
实例化方式:EventTimeSessionWindows.withGap(会话gap时间)
l ProcessingTimeSessionWindows 会话时间窗口, 以processing时间为依据
实例化方式:ProcessingTimeSessionWindows.withGap(会话gap时间)
1.8 Time Window 案例
1.8.1 tumbling-time-window (滚动窗口-无重叠数据)
窗口可以作用与DataStream之上。
如果数据是未分流(keyby)的,那么就对全量数据加窗口
如果数据是分流后的,那么针对每个流加窗口(类似SQL的group by 后对每个分组做聚合)
可以看出,未分流的数据,只能使用带ALL关键字的方法
l 案例:
n 自定义一个Source, 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字
n 对数据加窗口, 窗口1对未分流的数据统计数字总和
n 窗口2对按key分组后的数据统计每个key对应的数字总和
l 代码实现
/**
* 滚动-时间-窗口演示
* 自定义一个Source, 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字
* 用时间窗口统计和
*/
public class TumblingTimeWindowDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Source
DataStreamSource<Tuple2<String, Integer>> randomIntSource = env.addSource(new GenerateRandomNumEverySecond());
// 如果直接对source执行窗口的话, 是执行windowAll系列的方法
// 对未分组的数据统计总和, 每5秒统计一次
SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = randomIntSource
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);
// 安装key进行分流, 对分流后每个组进行求和统计, 窗口是滚动窗口, 每5秒一次
SingleOutputStreamOperator<Tuple2<String, Integer>> sumEachKey = randomIntSource
.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);
sumOfAll.print("Sum of all:");
sumEachKey.print("Sum each key:");
env.execute();
}
/*
自定义Source
每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字
*/
public static class GenerateRandomNumEverySecond implements SourceFunction<Tuple2<String, Integer>> {
private boolean isRun = true;
private final Random random = new Random();
private final List<String> keyList = Arrays.asList("hadoop", "spark", "flink");
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
while (this.isRun) {
String key = keyList.get(random.nextInt(3));
ctx.collect(Tuple2.of(key, random.nextInt(99)));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
this.isRun = false;
}
}
}
1.8.2 sliding-time-window (滑动窗口-有重叠数据)
按照时间来进行窗口划分,每次窗口的滑动距离小于窗口的长度,这样数据就会有一部分重复计算,我们参考上面的案例
/**
* 滑动时间窗口案例
* 自定义一个Source, 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字
* 每隔5秒统计前10秒的数据, 分别统计
* 1. 全量数字之和
* 2. 分组后每个key对应的数字之和
*/
public class SlidingTimeWindowDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Source
DataStreamSource<Tuple2<String, Integer>> source = env.addSource(new GenerateRandomNumEverySecond());
// 统计全量的滑动窗口
SingleOutputStreamOperator<Tuple2<String, Integer>> sumAll = source
.timeWindowAll(Time.seconds(10), Time.seconds(5)).sum(1);
// 按照key分组后统计
SingleOutputStreamOperator<Tuple2<String, Integer>> sumEachKey = source
.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1);
sumAll.print("Sum all>>>");
sumEachKey.print("Sum each key>>>");
env.execute();
}
/*
自定义Source
每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字
*/
public static class GenerateRandomNumEverySecond implements SourceFunction<Tuple2<String, Integer>> {
private boolean isRun = true;
private final Random random = new Random();
private final List<String> keyList = Arrays.asList("hadoop", "spark", "flink");
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
while (this.isRun) {
String key = keyList.get(random.nextInt(3));
Tuple2<String, Integer> value = Tuple2.of(key, random.nextInt(99));
ctx.collect(value);
System.out.println("------: " + value);
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
this.isRun = false;
}
}
}
结果:
------: (spark,33)
------: (spark,66)
------: (flink,25)
------: (flink,57)
Sum each key>>>:10> (flink,82)
Sum each key>>>:1> (spark,99)
Sum all>>>:11> (spark,181)
------: (spark,25)
------: (spark,80)
------: (spark,4)
------: (flink,61)
------: (hadoop,2)
Sum each key>>>:11> (hadoop,2)
Sum each key>>>:1> (spark,208)
Sum ea