温馨提示:
1. 部分包含数学公式或PPT动画的文件,查看预览时可能会显示错乱或异常,文件下载后无此问题,请放心下载。
2. 本文档由用户上传,版权归属用户,汇文网负责整理代发布。如果您对本文档版权有争议请及时联系客服。
3. 下载前请仔细阅读文档内容,确认文档内容符合您的需求后进行下载,若出现内容与标题不符可向本站投诉处理。
4. 下载文档时可能由于网络波动等原因无法下载或下载错误,付费完成后未能成功下载的用户请联系客服处理。
网站客服:3074922707
基于Flink1.13的TableApi和SQL
基于
Flink1
13
TableApi
SQL
Flink Table API和Flink SQL的全面解析
1. 整体概述
1.1 什么是 Table API 和 Flink SQL
Apache Flink是批流统一的处理框架,具有两个关系API-Table API和SQL-用于统一流和批处理的上层API。
Table API是Java,Scala和Python的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
Flink的SQL就是可以在代码中写sql,实现一些查询操作,支持基于实现SQL标准的Apache Calcite(Apache开源SQL解析工具)。
无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。
官网介绍:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/overview/
1.2 为什么需要Table API & SQL
l 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;
l 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
l 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低;
l 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
l 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批模式运行,Flink底层Runtime本身就是一个流与批统一的引擎
1.3 Table API& SQL发展历程
l 架构升级
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。随着版本的不断更新,API 也出现了很多不兼容的地方。
在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能
图示,在Flink 1.9 之前,Flink API 层 一直分为DataStream API 和 DataSet API,Table API & SQL 位于 DataStream API 和 DataSet API 之上。可以看出流处理和批处理有各自独立的api (流处理DataStream,批处理DataSet)。而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好。
在Flink1.9之后新的架构中,有两个查询处理器:Flink Query Processor,也称作Old Planner和Blink Query Processor,也称作Blink Planner。为了兼容老版本Table及SQL模块,插件化实现了Planner,Flink原有的Flink Planner不变,后期版本会被移除。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的,不像Flink Planner,批是基于Dataset,流是基于DataStream。
l 查询处理器的选择
查询处理器是 Planner 的具体实现,通过parser、optimizer、codegen(代码生成技术)等流程将 Table API & SQL作业转换成 Flink Runtime 可识别的 Transformation DAG,最终由 Flink Runtime 进行作业的调度和执行。
Flink Query Processor查询处理器针对流计算和批处理作业有不同的分支处理,流计算作业底层的 API 是 DataStream API, 批处理作业底层的 API 是 DataSet API
Blink Query Processor查询处理器则实现流批作业接口的统一,底层的 API 都是Transformation,这就意味着我们和Dataset完全没有关系了
Flink1.11之后Blink Query Processor查询处理器已经是默认的了
1.4 两种planner(old & blink)的区别
l Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。
l Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。
l 旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。
l PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。
l Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
l 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/
2. 概念和通用API
Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。本文介绍了 Table API 和 SQL 查询程序的通用结构、如何注册 Table 、如何查询 Table 以及如何输出 Table 。
2.1 需要引入的依赖
参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/overview/
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink执行计划,这是1.9版本之前的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
l flink-table-common:这个包中主要是包含 Flink Planner 和 Blink Planner一些共用的代码。
l flink-table-api-java:这部分是用户编程使用的 API,包含了大部分的 API。
l flink-table-api-scala:这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。
l 两个 Planner:flink-table-planner 和 flink-table-planner-blink。
l 两个 Bridge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge,
Flink Planner 和 Blink Planner 都会依赖于具体的 JavaAPI,也会依赖于具体的 Bridge,通过 Bridge 可以将 API 操作相应的转化为Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者Data Set
2.2 基本程序结构
所有用于批处理和流处理的 Table API 和 SQL 程序结构,与流式处理的程序结构类似;也可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。
具体操作流程如下:
// 创建表的执行环境
TableEnvironment tableEnv = ...;
// 创建一张表,用来读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
//注册一张表,用来输出计算结果数据
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
//通过Table API查询算子,得到一张结果表
Table table2 = tableEnv.from("table1").select(...);
//通过SQL查询语句,得到一张结果表
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
//将结果表写入到输出表中
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...
2.3 创建 TableEnvironment
TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
l 注册catalog
l 在内部 catalog 中注册表
l 执行 SQL 查询
l 注册用户自定义函数
l 将 DataStream转换为表
l 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
在创建TableEnv的时候,可以多传入一个EnvironmentSettings或者TableConfig参数,可以用来配置 TableEnvironment 的一些特性。
比如:
l 配置老版本的流式查询(Flink-Streaming-Query):
// **********************// FLINK STREAMING QUERY// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useOldPlanner() // 使用老版本planner
.inStreamingMode() // 流处理模式
.build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
l 基于老版本的批处理环境(Flink-Batch-Query):
// ******************// FLINK BATCH QUERY// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
l 基于blink版本的流处理环境(Blink-Streaming-Query):
// **********************// BLINK STREAMING QUERY// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()// 使用新版本planner
.inStreamingMode()// 流处理模式
.build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
l 基于blink版本的批处理环境(Blink-Batch-Query):
// ******************// BLINK BATCH QUERY// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()// 使用新版本planner
.inBatchMode()// 批处理模式
.build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
2.4 在Catalog中注册表
2.4.1 表(Table)的概念
TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:Catalog名、数据库(database)名和对象名(表名)。如果 catalog 或者数据库没有指明,就会使用当前默认值。
Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。 表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。
2.4.2 临时表(Temporary Table)和永久表(Permanent Table)
表可以是临时的,并与单个 Flink 会话(session)的生命周期相关
表也可以是永久的,并且在多个 Flink 会话和群集(cluster)中可见
永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。
另一方面,临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
2.4.3 创建表
2.4.3.1 虚拟表
在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
l 注意事项
从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享(注:Blink 计划器的TableEnvironment会优化成只执行一次)
2.4.3.2 Connector Tables
另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
2.4.3.3 扩展表标识符
表总是通过三元标识符注册,包括 catalog 名、数据库名和表名。
用户可以指定一个 catalog 和数据库作为 “当前catalog” 和"当前数据库"。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。
标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
//在custom_catalog”的目录中注册名称“exampleView”的视图
//在“custom_database”的数据库中
tableEnv.createTemporaryView("exampleView", table);
//在“custom_catalog”的目录中注册名称“exampleView”的视图
//在“other_database”的数据库中
tableEnv.createTemporaryView("other_database.exampleView", table);
//在“custom_catalog”的目录中注册名称“example.view”的视图
//在“custom_database”的数据库中
tableEnv.createTemporaryView("`example.View`", table);
//在“other_catalog”的目录中注册名称“exampleView”的视图
//在“other_database”的数据库中
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
2.5 查询表
利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。
Flink给我们提供了两种查询方式:Table API和 SQL。
2.5.1 Table API的调用
Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段,filter(…)表示筛选条件。
注意:
通过导入可以启用Java Table API org.apache.flink.table.api.java.*。下面的示例演示如何构造Java Table API程序以及如何将表达式指定为字符串。对于Expression DSL,还必须导入静态org.apache.flink.table.api.Expressions.*
import static org.apache.flink.table.api.Expressions.*;
代码中的实现如下:
// 3. 表的查询转换
Table orderTable = tableEnv.from("inputTable");
// 3.1 简单查询转换
Table resultTable = orderTable
.select($("id"),$("timestamp"),$("category"),$("areaName"),$("money"))
.filter($("areaName").isEqual("北京"));
也可以加上聚合操作,比如我们统计每个城市订单数据出现的个数,做个count统计:
// 3. 表的查询转换
Table orderTable = tableEnv.from("inputTable");
// 3.2 聚合转换
Table aggResultSqlTable = orderTable
.groupBy($("areaName"))
.select($("areaName"), $("id").count().as("cnt"));
2.5.2 SQL查询
Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。
l 下面的示例演示了如何指定查询并将结果作为 Table 对象返回
// 3. 表的查询转换
Table orderTable = tableEnv.from("inputTable");
// 3.1 简单查询转换
Table resultTable2 = tableEnv.sqlQuery("select id,`timestamp`,category,areaName,money from inputTable where areaName='北京'");
也可以加上聚合操作,比如我们统计每个城市订单数据出现的个数,做个count统计:
// 3. 表的查询转换
Table orderTable = tableEnv.from("inputTable");
// 3.2 聚合转换
Table aggResultSqlTable2 = tableEnv.sqlQuery("select areaName, count(1) as cnt from inputTable group by areaName");
l 如下的示例展示了如何指定一个更新查询,将查询的结果插入到已注册的表中
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
2.6 将DataStream 转换成表
Flink允许我们把Table和DataStream做转换:可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义schema了。
2.6.1 代码表达
代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。
这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)。
代码具体如下:
// 从文件读取数据
String filePath = TableApiTest.class.getClassLoader().getResource("order.csv").getPath();
DataStreamSource<String> inputStream = env.readTextFile(filePath);
// map成样例类类型
SingleOutputStreamOperator<OrderInfo> dataStream = inputStream.map(new MapFunction<String, OrderInfo>() {
@Override
public OrderInfo map(String data) throws Exception {
String[] dataArray = data.split(",");
return new OrderInfo(dataArray[0], dataArray[1], Double.parseDouble(dataArray[2]), dataArray[3]);
}
});
// 2. 基于tableEnv,将流转换成表
Table dataTable = tableEnv.fromDataStream(dataStream);
2.6.2 数据类型与Table schema的对应
在上面的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。
另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。
l 基于名称的对应:
Table dataTable = tableEnv.fromDataStream(dataStream,$("id").as("ts"), $("timestamp").as("myId"), $("money"), $("category"));
l 基于位置的对应:
Table dataTable3 = tableEnv.fromDataStream(dataStream,$("id"), $("timestamp"), $("money"), $("category"));
Flink的DataStream和 DataSet API支持多种类型。
组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。
元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的:
元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。
2.6.3 使用案例
/**
* 将DataStream 转换成表
*/
public class DataStreamToTable {
public static void main(String[] args) throws Exception {
// 0. 创建流执行环境,读取数据并转换成样例类类型
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);
// 从文件读取数据
String filePath = TableApiTest.class.getClassLoader().getResource("order.csv").getPath();
DataStreamSource<String> inputStream = env.readTextFile(filePath);
// map成样例类类型
SingleOutputStreamOperator<OrderInfo> dataStream = inputStream.map(new MapFunction<String, OrderInfo>() {
@Override
public OrderInfo map(String data) throws Exception {
String[] dataArray = data.split(",");