Flink Table API及Flink SQL
相关博客:
Flink-Table API 和 Flink SQL简介 | 新老版本Flink批流处理对比 | 读取文件和Kafka消费数据 | API 和 SQL查询表
一、概述
- Flink 对批处理和流处理,提供了统一的上层 API
- Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
- Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite
导入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- 写入文件以及jdbc依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
简单使用代码:
package com.root.table;
import com.root.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/7 16:31
*/
public class TableTest1_Example {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.创建一个Table环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2.读取文件,并格式化
String path = "data/sensor.txt";
DataStreamSource<String> inputStream = env.readTextFile(path);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 3.创建Table
Table dataTable = tableEnv.fromDataStream(dataStream);
// 4.简单查询
Table resultTable = dataTable.select("id, temperature")
.where("id = 'sensor_1'");
// 5. 创建临时视图
tableEnv.createTemporaryView("sensor", dataTable);
// 6.使用sql查询
String sql = "select id, temperature from sensor where id = 'sensor_1'";
Table resultSqlTable = tableEnv.sqlQuery(sql);
// 7.Table转换为DataStream,并打印输出
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql"