Flink 函数
文章目录
相关博客:
Flink-函数 | 用户自定义函数(UDF)标量函数 | 表函数 | 聚合函数 | 表聚合函数
一、Flink Table API 和 SQL 内置函数
Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。
SQL中支持的很多函数,Table API 和 SQL都已经做了实现
-
比较函数
-
SQL:
value1 = value2
value1 > value2
-
Table API
ANY1 === ANY2
ANY1 > ANY2
-
-
逻辑函数
-
SQL:
boolean1= boolean2
boolean IS FALSE
NOT boolean
boolean IS FALSE
NOT boolean
-
Table API
BOOLEAN1 || BOOLEAN2
BOOLEAN.isFalse
!BOOLEAN
-
-
算数函数
-
SQL:
numeric1 + numeric2
POWER(numeric1, numeric2)
-
Table API
NUMERIC1 + NUMERIC2
NUMERIC1.POWER(NUMERIC2)
-
-
字符串函数
-
SQL:
string1 + string2
UPPER(string)
CHAR_LENGTH(string)
-
Table API
STRING1 + STRING2
STRING.upperCase()
STRING.charLength()
-
-
时间函数
-
SQL:
DATE string
TIMESTAMP string
CURRENT_TIME
interval string range
-
Table API
STRING.toDate
STRING.toTimestamp
currentTime()
NUMERIC.days
-
-
聚合函数
-
SQL:
COUNT()
SUM(expression)
RANK()
ROW_NUMBER()
-
Table API
FIELD.count
FIELD.sum
-
二、用户自定义函数(UDF)
用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力.
一些系统内置函数无法解决的需求,可以用UDF来自定义实现
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
函数通过调用 registerFunction()
方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它。
sql函数有两大类型:
- scalar Function类似于map,一对一
- Table Function类似与flatMap,一对多
2.1 标量函数(Scalar Functions)
定义标量函数,可以将0、1或多个标量值,映射到新的标量值
为了定义标量函数,必须扩展基类ScalarFunction
,并实现求值(eval)方法。
标量函数的行为由求值方法决定,求值方法必须public公开声明并命名为eval
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
/**
* @author Kewei
* @Date 2022/3/9 15:44
*/
public class UDFTest1_Scalar {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
// 自定义标量函数,实现求id的hash值
HashCode hashCode = new HashCode(23);
// 注册UDF函数
tableEnv.registerFunction("hashcode",hashCode);
// Table API
Table resultTable = table.select("id, ts, hashcode(id)");
// SQL
tableEnv.createTemporaryView("sensor",table);
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashcode(id) from sensor");
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env