Flink实践代码-FlinkSQL使用 Join 完成表关联

本文介绍了如何使用Flink SQL进行数据流的关联查询。首先阐述了思路,包括构建运行环境、接入数据流和进行关联查询。然后提供了核心代码,通过TableAPI将两个数据流转换为Table并进行字段裁剪,最后使用Join操作基于特定条件('bdid'等于'creator')关联两个表。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

1.代码与含义解释
1.1 思路

Flink 获取数据流后,需要做数据过滤那么首先就要有一下几个步骤:

  1. 构建运行环境
  2. 接入数据流
  3. 两个数据流关联查询结论
1.2 直接上代码
package com.youtree.flink;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static org.apache.flink.table.api.Expressions.$;

/******************************************************
 * @program        : com.youtree.flink
 * @ClassName      : Visit_Info_for_table_Join
 * @Author         : willcui
 * @Date           : 2024/4/1 12:04
 * @Description    :   
 * @Version        :  
/************************************************
### FlinkSQL 中维关联使用方法 #### MySQL 维关联Flink SQL中,可以利用`Table API & SQL`以及`Flink JDBC连接器`来实现MySQL维关联操作。通过定义一个带有特定选项的catalog或直接创建临时视图的方式引入外部MySQL作为维参与计算[^1]。 对于简单的单字段匹配场景可以直接指定key列完成join动作;而面对复杂键值组合的情况,则需借助字符串拼接(concat)或是自定义格函数(table function),尽管过程稍显繁琐却也提供了灵活性[^2]。 ```sql CREATE TABLE mysql_dim_table ( id BIGINT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/testdb', 'table-name' = 'dim_table' ); --与维Join示例 SELECT t1.order_id, t1.product_id, dim.name AS product_name FROM order_stream AS t1 LEFT JOIN mysql_dim_table FOR SYSTEM_TIME AS OF t1.proctime AS dim ON t1.product_id = dim.id; ``` #### Redis 维关联优化 考虑到性能因素,在某些应用场景下选用Redis作为缓存层存储频繁访问的小型维度信息不失为一种明智之举。这不仅能够加速查询效率还能减轻源端数据库的压力。不过值得注意的是当涉及到复合主键时确实存在一定的不便之处,此时可考虑采用哈希结构或其他高级特性简化处理逻辑。 #### 广播机制下的高效Join策略 为了进一步提升作业执行效能并减少网络传输开销,官方推荐针对规模较小且更新频率较低的静态维采取广播分发模式实施全量复制至各个TaskManager节点本地内存之中供即时查阅引用[^4]。 ```sql -- 创建广播维 CREATE TEMPORARY VIEW broadcasted_dimension AS WITH WATERMARK FOR proctime AS proctime - INTERVAL '5' SECOND SELECT * FROM kafka_source; -- 应用广播提示符进行优化后的Join语句 SELECT /*+ MAPJOIN(broadcasted_dimension) */ ... FROM stream_table LEFT JOIN broadcasted_dimension ON stream_table.key = broadcasted_dimension.key; ``` #### 时间旅行功能支持历史版本回溯 最后值得一提的是Temporal Table Function Join所提供的强大能力——允许基于事件时间戳精准定位到对应时间节点上的有效记录从而达成过去某一刻状态重现的目的,这对于审计追踪类需求尤为适用。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值