分享
5 - 基于Flink1.13的TableApi和SQL.docx
下载文档

ID:3397039

大小:3.75MB

页数:82页

格式:DOCX

时间:2024-04-28

收藏 分享赚钱
温馨提示:
1. 部分包含数学公式或PPT动画的文件,查看预览时可能会显示错乱或异常,文件下载后无此问题,请放心下载。
2. 本文档由用户上传,版权归属用户,汇文网负责整理代发布。如果您对本文档版权有争议请及时联系客服。
3. 下载前请仔细阅读文档内容,确认文档内容符合您的需求后进行下载,若出现内容与标题不符可向本站投诉处理。
4. 下载文档时可能由于网络波动等原因无法下载或下载错误,付费完成后未能成功下载的用户请联系客服处理。
网站客服:3074922707
基于Flink1.13的TableApi和SQL 基于 Flink1 13 TableApi SQL
Flink Table API和Flink SQL的全面解析 1. 整体概述 1.1 什么是 Table API 和 Flink SQL Apache Flink是批流统一的处理框架,具有两个关系API-Table API和SQL-用于统一流和批处理的上层API。 Table API是Java,Scala和Python的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。 Flink的SQL就是可以在代码中写sql,实现一些查询操作,支持基于实现SQL标准的Apache Calcite(Apache开源SQL解析工具)。 无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。 官网介绍:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/overview/ 1.2 为什么需要Table API & SQL l 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行; l 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划; l 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低; l 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少; l 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批模式运行,Flink底层Runtime本身就是一个流与批统一的引擎 1.3 Table API& SQL发展历程 l 架构升级 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。随着版本的不断更新,API 也出现了很多不兼容的地方。 在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能 图示,在Flink 1.9 之前,Flink API 层 一直分为DataStream API 和 DataSet API,Table API & SQL 位于 DataStream API 和 DataSet API 之上。可以看出流处理和批处理有各自独立的api (流处理DataStream,批处理DataSet)。而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好。 在Flink1.9之后新的架构中,有两个查询处理器:Flink Query Processor,也称作Old Planner和Blink Query Processor,也称作Blink Planner。为了兼容老版本Table及SQL模块,插件化实现了Planner,Flink原有的Flink Planner不变,后期版本会被移除。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的,不像Flink Planner,批是基于Dataset,流是基于DataStream。 l 查询处理器的选择 查询处理器是 Planner 的具体实现,通过parser、optimizer、codegen(代码生成技术)等流程将 Table API & SQL作业转换成 Flink Runtime 可识别的 Transformation DAG,最终由 Flink Runtime 进行作业的调度和执行。 Flink Query Processor查询处理器针对流计算和批处理作业有不同的分支处理,流计算作业底层的 API 是 DataStream API, 批处理作业底层的 API 是 DataSet API Blink Query Processor查询处理器则实现流批作业接口的统一,底层的 API 都是Transformation,这就意味着我们和Dataset完全没有关系了 Flink1.11之后Blink Query Processor查询处理器已经是默认的了 1.4 两种planner(old & blink)的区别 l Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。 l Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。 l 旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。 l PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。 l Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。 l 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/ 2. 概念和通用API Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。本文介绍了 Table API 和 SQL 查询程序的通用结构、如何注册 Table 、如何查询 Table 以及如何输出 Table 。 2.1 需要引入的依赖 参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/overview/ <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- flink执行计划,这是1.9版本之前的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- blink执行计划,1.11+默认的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> l flink-table-common:这个包中主要是包含 Flink Planner 和 Blink Planner一些共用的代码。 l flink-table-api-java:这部分是用户编程使用的 API,包含了大部分的 API。 l flink-table-api-scala:这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。 l 两个 Planner:flink-table-planner 和 flink-table-planner-blink。 l 两个 Bridge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge, Flink Planner 和 Blink Planner 都会依赖于具体的 JavaAPI,也会依赖于具体的 Bridge,通过 Bridge 可以将 API 操作相应的转化为Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者Data Set 2.2 基本程序结构 所有用于批处理和流处理的 Table API 和 SQL 程序结构,与流式处理的程序结构类似;也可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。 具体操作流程如下: // 创建表的执行环境 TableEnvironment tableEnv = ...; // 创建一张表,用来读取数据 tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )"); //注册一张表,用来输出计算结果数据 tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )"); //通过Table API查询算子,得到一张结果表 Table table2 = tableEnv.from("table1").select(...); //通过SQL查询语句,得到一张结果表 Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... "); //将结果表写入到输出表中 TableResult tableResult = table2.executeInsert("outputTable"); tableResult... 2.3 创建 TableEnvironment TableEnvironment 是 Table API 和 SQL 的核心概念。它负责: l 注册catalog l 在内部 catalog 中注册表 l 执行 SQL 查询 l 注册用户自定义函数 l 将 DataStream转换为表 l 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用 在创建TableEnv的时候,可以多传入一个EnvironmentSettings或者TableConfig参数,可以用来配置 TableEnvironment 的一些特性。 比如: l 配置老版本的流式查询(Flink-Streaming-Query): // **********************// FLINK STREAMING QUERY// ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useOldPlanner() // 使用老版本planner .inStreamingMode() // 流处理模式 .build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); l 基于老版本的批处理环境(Flink-Batch-Query): // ******************// FLINK BATCH QUERY// ****************** import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); l 基于blink版本的流处理环境(Blink-Streaming-Query): // **********************// BLINK STREAMING QUERY// ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner()// 使用新版本planner .inStreamingMode()// 流处理模式 .build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); l 基于blink版本的批处理环境(Blink-Batch-Query): // ******************// BLINK BATCH QUERY// ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner()// 使用新版本planner .inBatchMode()// 批处理模式 .build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); 2.4 在Catalog中注册表 2.4.1 表(Table)的概念 TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:Catalog名、数据库(database)名和对象名(表名)。如果 catalog 或者数据库没有指明,就会使用当前默认值。 Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。 表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。 2.4.2 临时表(Temporary Table)和永久表(Permanent Table) 表可以是临时的,并与单个 Flink 会话(session)的生命周期相关 表也可以是永久的,并且在多个 Flink 会话和群集(cluster)中可见 永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。 另一方面,临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。 2.4.3 创建表 2.4.3.1 虚拟表 在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建: // get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...); // register the Table projTable as table "projectedTable" tableEnv.createTemporaryView("projectedTable", projTable); l 注意事项 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享(注:Blink 计划器的TableEnvironment会优化成只执行一次) 2.4.3.2 Connector Tables 另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。 tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .createTemporaryTable("MyTable") 2.4.3.3 扩展表标识符 表总是通过三元标识符注册,包括 catalog 名、数据库名和表名。 用户可以指定一个 catalog 和数据库作为 “当前catalog” 和"当前数据库"。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。 标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。 TableEnvironment tEnv = ...; tEnv.useCatalog("custom_catalog"); tEnv.useDatabase("custom_database"); Table table = ...; //在custom_catalog”的目录中注册名称“exampleView”的视图 //在“custom_database”的数据库中 tableEnv.createTemporaryView("exampleView", table); //在“custom_catalog”的目录中注册名称“exampleView”的视图 //在“other_database”的数据库中 tableEnv.createTemporaryView("other_database.exampleView", table); //在“custom_catalog”的目录中注册名称“example.view”的视图 //在“custom_database”的数据库中 tableEnv.createTemporaryView("`example.View`", table); //在“other_catalog”的目录中注册名称“exampleView”的视图 //在“other_database”的数据库中 tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table); 2.5 查询表 利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。 Flink给我们提供了两种查询方式:Table API和 SQL。 2.5.1 Table API的调用 Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段,filter(…)表示筛选条件。 注意: 通过导入可以启用Java Table API org.apache.flink.table.api.java.*。下面的示例演示如何构造Java Table API程序以及如何将表达式指定为字符串。对于Expression DSL,还必须导入静态org.apache.flink.table.api.Expressions.* import static org.apache.flink.table.api.Expressions.*; 代码中的实现如下: // 3. 表的查询转换 Table orderTable = tableEnv.from("inputTable"); // 3.1 简单查询转换 Table resultTable = orderTable .select($("id"),$("timestamp"),$("category"),$("areaName"),$("money")) .filter($("areaName").isEqual("北京")); 也可以加上聚合操作,比如我们统计每个城市订单数据出现的个数,做个count统计: // 3. 表的查询转换 Table orderTable = tableEnv.from("inputTable"); // 3.2 聚合转换 Table aggResultSqlTable = orderTable .groupBy($("areaName")) .select($("areaName"), $("id").count().as("cnt")); 2.5.2 SQL查询 Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。 l 下面的示例演示了如何指定查询并将结果作为 Table 对象返回 // 3. 表的查询转换 Table orderTable = tableEnv.from("inputTable"); // 3.1 简单查询转换 Table resultTable2 = tableEnv.sqlQuery("select id,`timestamp`,category,areaName,money from inputTable where areaName='北京'"); 也可以加上聚合操作,比如我们统计每个城市订单数据出现的个数,做个count统计: // 3. 表的查询转换 Table orderTable = tableEnv.from("inputTable"); // 3.2 聚合转换 Table aggResultSqlTable2 = tableEnv.sqlQuery("select areaName, count(1) as cnt from inputTable group by areaName"); l 如下的示例展示了如何指定一个更新查询,将查询的结果插入到已注册的表中 // get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // register "Orders" table // register "RevenueFrance" output table // compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.executeSql( "INSERT INTO RevenueFrance " + "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); 2.6 将DataStream 转换成表 Flink允许我们把Table和DataStream做转换:可以基于一个DataStream,先流式地读取数据源,然后map成样例类,再把它转成Table。Table的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义schema了。 2.6.1 代码表达 代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。 这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)。 代码具体如下: // 从文件读取数据 String filePath = TableApiTest.class.getClassLoader().getResource("order.csv").getPath(); DataStreamSource<String> inputStream = env.readTextFile(filePath); // map成样例类类型 SingleOutputStreamOperator<OrderInfo> dataStream = inputStream.map(new MapFunction<String, OrderInfo>() { @Override public OrderInfo map(String data) throws Exception { String[] dataArray = data.split(","); return new OrderInfo(dataArray[0], dataArray[1], Double.parseDouble(dataArray[2]), dataArray[3]); } }); // 2. 基于tableEnv,将流转换成表 Table dataTable = tableEnv.fromDataStream(dataStream); 2.6.2 数据类型与Table schema的对应 在上面的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。 另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。 l 基于名称的对应: Table dataTable = tableEnv.fromDataStream(dataStream,$("id").as("ts"), $("timestamp").as("myId"), $("money"), $("category")); l 基于位置的对应: Table dataTable3 = tableEnv.fromDataStream(dataStream,$("id"), $("timestamp"), $("money"), $("category")); Flink的DataStream和 DataSet API支持多种类型。 组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。 元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的: 元组类型,默认的名称是 “_1”, “_2”;而原子类型,默认名称是 ”f0”。 2.6.3 使用案例 /** * 将DataStream 转换成表 */ public class DataStreamToTable { public static void main(String[] args) throws Exception { // 0. 创建流执行环境,读取数据并转换成样例类类型 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env.setParallelism(1); // 从文件读取数据 String filePath = TableApiTest.class.getClassLoader().getResource("order.csv").getPath(); DataStreamSource<String> inputStream = env.readTextFile(filePath); // map成样例类类型 SingleOutputStreamOperator<OrderInfo> dataStream = inputStream.map(new MapFunction<String, OrderInfo>() { @Override public OrderInfo map(String data) throws Exception { String[] dataArray = data.split(",");

此文档下载收益归作者所有

下载文档
你可能关注的文档
收起
展开