Flink 函数

Flink 函数

相关博客:

Flink-函数 | 用户自定义函数(UDF)标量函数 | 表函数 | 聚合函数 | 表聚合函数

一、Flink Table API 和 SQL 内置函数

Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。

SQL中支持的很多函数,Table API 和 SQL都已经做了实现

  • 比较函数

    • SQL:

      value1 = value2

      value1 > value2

    • Table API

      ANY1 === ANY2

      ANY1 > ANY2

  • 逻辑函数

    • SQL:

      boolean1= boolean2

      boolean IS FALSE

      NOT boolean

      boolean IS FALSE

      NOT boolean

    • Table API

      BOOLEAN1 || BOOLEAN2

      BOOLEAN.isFalse

      !BOOLEAN

  • 算数函数

    • SQL:

      numeric1 + numeric2

      POWER(numeric1, numeric2)

    • Table API

      NUMERIC1 + NUMERIC2

      NUMERIC1.POWER(NUMERIC2)

  • 字符串函数

    • SQL:

      string1 + string2

      UPPER(string)

      CHAR_LENGTH(string)

    • Table API

      STRING1 + STRING2

      STRING.upperCase()

      STRING.charLength()

  • 时间函数

    • SQL:

      DATE string

      TIMESTAMP string

      CURRENT_TIME

      interval string range

    • Table API

      STRING.toDate

      STRING.toTimestamp

      currentTime()

      NUMERIC.days

  • 聚合函数

    • SQL:

      COUNT()

      SUM(expression)

      RANK()

      ROW_NUMBER()

    • Table API

      FIELD.count

      FIELD.sum

二、用户自定义函数(UDF)

用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力.

一些系统内置函数无法解决的需求,可以用UDF来自定义实现

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用

函数通过调用 registerFunction() 方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它。

sql函数有两大类型:

  • scalar Function类似于map,一对一
  • Table Function类似与flatMap,一对多

2.1 标量函数(Scalar Functions)

定义标量函数,可以将0、1或多个标量值,映射到新的标量值

为了定义标量函数,必须扩展基类ScalarFunction,并实现求值(eval)方法。

标量函数的行为由求值方法决定,求值方法必须public公开声明并命名为eval

例如:

package com.root.udf;

import com.root.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

/**
 * @author Kewei
 * @Date 2022/3/9 15:44
 */

public class UDFTest1_Scalar {
   
    public static void main(String[] args) throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");

        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
   
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        });

        Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");

        // 自定义标量函数,实现求id的hash值
        HashCode hashCode = new HashCode(23);
        // 注册UDF函数
        tableEnv.registerFunction("hashcode",hashCode);

        // Table API
        Table resultTable = table.select("id, ts, hashcode(id)");

        // SQL
        tableEnv.createTemporaryView("sensor",table);
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashcode(id) from sensor");

        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");

        env
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

努力生活的黄先生

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值