前言
Flink 除了可以在一条数据流上使用各种时间窗口算法来处理数据,还支持两条数据流上的一段时间窗口内的数据进行联接操作,类似于SQL中的 join 操作。
双流联接主要的应用场景有:
- 电商领域:将用户的浏览行为流和购买行为流进行 JOIN,以了解用户在浏览特定商品后是否进行了购买,从而分析用户的购买决策路径和偏好。
- 金融领域:将交易流和账户信息流进行 JOIN,实时监控账户的交易情况和余额变动,以便及时发现异常交易或风险。
- 物联网领域:将设备的状态流和传感器的监测数据流进行 JOIN,全面了解设备的运行状况和相关环境参数。
- 在线广告:将用户的点击流和广告投放流进行 JOIN,评估广告投放的效果和用户的响应情况。
双流联接的代码结构
Flink 中的双流链接的代码基本结构如下:
dataStream1.join(dataStream2)
.where(KeySelector)
.equalTo(KeySelector)
.window(WindowAssigner)
.apply(JoinFunction)
既然是双流联接,首先得有两条数据流,即两个 DataStream,用其中一个 DataStream 去 join 另外一个 DataStream,就可以得到 JoinedStreams。然后再通过 where 和 equalTo 指定联接的条件,只有两条流中元素对应的 KeySelector 返回的 key 相同的元素才会联接成功,联接成功的两个元素才会被 JoinFunction 处理。
没有联接条件的双流联接结果是笛卡尔积,数据量大的时候切记一定要避免。如下所示,数字流和字母流无条件联接,最终 JoinFunction 将处理九次。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream1 = environment.fromElements(1, 2, 3);
DataStream<String> dataStream2 = environment.fromElements("A", "B", "C");
dataStream1.join(dataStream2)
.where(IN -> 1)
.equalTo(IN -> 1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(3))
.apply(new JoinFunction<Integer, String, String>() {
@Override
public String join(Integer first, String second) throws Exception {
return first + "_" + second;
}
}).print();
environment.execute();
}
Window Join
Flink 双流联接的第一种,是窗口关联,即两条数据流上相同窗口之间数据的联接操作。这个窗口可以是Flink内置的各种时间窗口、计数窗口、甚至是你自定义的窗口。
窗口关联的操作是 Inner Join,必须要两个窗口内都有数据,如果其中一个窗口没有数据,关联的结果也是空的,JoinFunction 不会被触发。
举个例子,在电商系统中,要计算推荐商品的下单转换率,即 商品下单的次数 除以 系统推荐商品给用户的展示次数。有两条数据流,分别是商品展示流 showDataStream,和用户订单流 orderDataStream。
如下代码,让 showDataStream 去 join orderDataStream,联接条件是 itemId 相同,给两条数据流分配一秒钟的滚动窗口,处理关联结果时,用 MapState 记录商品对应的下单次数。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ItemShowModel> showDataStream = environment.addSource(new ItemShowSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
DataStream<UserOrderModel> orderDataStream = environment.addSource(new UserOrderSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
showDataStream.join(orderDataStream)
// 联接条件是itemId相同
.where(ItemShowModel::getItemId)
.equalTo(UserOrderModel::getItemId)
// 基于事件时间的滚动窗口
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1L)))
// 处理关联结果
.apply(new RichJoinFunction<ItemShowModel, UserOrderModel, Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> join(ItemShowModel first, UserOrderModel second) throws Exception {
// MapState记录itemId对应的下单次数
MapState<Long, Integer> counterMap = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Integer>("counterMap", Long.class, Integer.class));
int count = Optional.ofNullable(counterMap.get(second.getItemId())).orElse(0) + 1;
counterMap.put(second.getItemId(), count);
return Tuple2.of(second.getItemId(), count);
}
}).print();
environment.execute();
}
CoGroup
Flink 双流联接的第二种,是 CoGroup,它强调的是分组后再联接。
Window Join 的联接操作是 Inner Join,如果某个窗口内没有数据,窗口将不会被计算。但是 CoGroup 可以让我们实现 Outer Join、Left Join、Right Join、当然也可以是 Inner Join 操作,更加灵活。
CoGroup 会先将各自窗口内的元素按照 KeySelector 分组,相同 key 的元素会被分为一组,所以 Flink 会将这些元素封装到 Iterable,然后将分组进行联接操作,联接成功的两组元素会被 CoGroupFunction 处理,所以 CoGroupFunction 的处理方法的两个入参类型是 Iterable,分别代表两个窗口内的一组元素。有了这两组元素,我们就可以进行自由的联接操作了。
@FunctionalInterface
@Public
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> var1, Iterable<IN2> var2, Collector<O> var3) throws Exception;
}
还是以计算下单转化率为例,上一个程序有个缺陷,如果商品展示流有数据,但是没用户下单,窗口就不会计算。但我们希望的是,即使没用户下单,我们也要把这部分数据算进去,所以利用 CoGroup 操作重写一下。
如下代码,两条流通过 coGroup 联接,分配一秒的滚动窗口,把窗口内商品的展示次数和下单次数封装到 ShowOrderCount 对象并写入 MapState,然后交给下游算子计算转化率并输出。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ItemShowModel> showDataStream = environment.addSource(new ItemShowSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
DataStream<UserOrderModel> orderDataStream = environment.addSource(new UserOrderSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
showDataStream.coGroup(orderDataStream)
.where(ItemShowModel::getItemId)
.equalTo(UserOrderModel::getItemId)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(1L)))
.apply(new RichCoGroupFunction<ItemShowModel, UserOrderModel, ShowOrderCount>() {
@Override
public void coGroup(Iterable<ItemShowModel> first, Iterable<UserOrderModel> second, Collector<ShowOrderCount> out) throws Exception {
Iterator<ItemShowModel> showModelIterator = first.iterator();
if (showModelIterator.hasNext()) {
final Long itemId = showModelIterator.next().getItemId();
MapState<Long, ShowOrderCount> counterMap = getRuntimeContext().getMapState(new MapStateDescriptor<Long, ShowOrderCount>("counterMap", Long.class, ShowOrderCount.class));
ShowOrderCount showOrderCount = Optional.ofNullable(counterMap.get(itemId)).orElse(new ShowOrderCount(itemId, 0, 0));
showOrderCount.showCount += CollectionUtils.size(first);
showOrderCount.orderCount += CollectionUtils.size(second);
counterMap.put(itemId, showOrderCount);
out.collect(showOrderCount);
}
}
}).process(new ProcessFunction<ShowOrderCount, Object>() {
@Override
public void processElement(ShowOrderCount showClickCount, ProcessFunction<ShowOrderCount, Object>.Context context, Collector<Object> collector) throws Exception {
final Long itemId = showClickCount.getItemId();
final BigDecimal orderRate = BigDecimal.valueOf(showClickCount.getOrderCount())
.multiply(BigDecimal.valueOf(100))
.divide(BigDecimal.valueOf(showClickCount.getShowCount()), 2, RoundingMode.FLOOR);
System.err.println("itemId:" + itemId + " --> " + orderRate + "%");
}
});
environment.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class ShowOrderCount {
private Long itemId;
private Integer showCount;
private Integer orderCount;
}
运行Flink作业,控制台输出示例如下:
itemId:1 --> 13.33%
itemId:8 --> 10.00%
itemId:9 --> 40.00%
itemId:6 --> 21.42%
itemId:4 --> 25.00%
itemId:7 --> 14.28%
itemId:1 --> 18.75%
itemId:3 --> 72.72%
Interval Join
Flink 双流联接的第三种,是 Interval Join。
Window Join 和 CoGroup 两种联接操作都有一个局限性,只能实现两条流上相同窗口内的元素联接,无法跨窗口联接,Interval Join 正式要解决这个问题的。
Interval Join 要做的是双流在一段时间区间内的联接。还是以计算下单转化率为例,假设分配1分钟的滚动时间窗口,商品在 10:00:01 时间展示给用户,展示流元素被分配到 [10:00,10:01] 窗口,用户犹豫了一会儿,在 10:01:30 下单了,订单流元素被分配到 [10:01,10:02] 窗口,因为窗口不同,所以这两个元素无法关联上,但这笔订单的转化确实来自一分钟前的商品展示推荐,如此一来,计算的结果就不准确了。
Interval Join 是如何解决这个问题的呢?它去掉了时间窗口划分的边界,允许商品展示流去关联订单流一段时间范围内的数据,这个时间范围就是双流 intervalJoin 后 between 方法指定的上下边界。需要注意的是,Interval Join 也是 Inner Join 操作的,而且只能基于事件时间语义。
举个例子,商品展示流去 intervalJoin 订单流,关联的时间范围上边界延后五分钟,即商品展示后五分钟内产生的订单,都会被计入订单转化,这样结果就会更加准确了。
如下代码示例:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ItemShowModel> showDataStream = environment.addSource(new ItemShowSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
DataStream<UserOrderModel> orderDataStream = environment.addSource(new UserOrderSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
showDataStream.keyBy(ItemShowModel::getItemId)
.intervalJoin(orderDataStream.keyBy(UserOrderModel::getItemId))
.between(Duration.ZERO, Duration.ofSeconds(5L))
.process(new ProcessJoinFunction<ItemShowModel, UserOrderModel, Object>() {
@Override
public void processElement(ItemShowModel itemShowModel, UserOrderModel userOrderModel, ProcessJoinFunction<ItemShowModel, UserOrderModel, Object>.Context context, Collector<Object> collector) throws Exception {
......
}
});
environment.execute();
}
尾巴
Flink 提供了三种双流联接的方式,Window Join 是基于相同窗口内的 Inner Join 操作,CoGroup 强调的是先分组再联接,基于它可以实现各种各样的联接操作,Interval Join 的特点是打破了相同窗口联接的限制,可以联接给定时间范围内的元素。
双流联接 提供了一种高效且实时的数据整合能力,它能够将来自两个不同流的数据按照指定的关联条件进行实时匹配和整合,从而为后续的数据分析和处理提供了更全面、更准确的数据基础。