Flink使用Watermark处理延迟数据-侧输出流

一、关于Flink的Watermark

1.12版本之后默认时间语义为Event time(事件时间),并且实际使用也是以事件时间为主,故这边背景均以基于时间事件的来说明。

支持event time的流式处理框架需要一种能够测量event time 进度的方式;比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间, 从而在程序中去关闭这个窗口。

事件时间可以不依赖处理时间来表示时间的进度.例如,在程序中,即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间.另外一方面,使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据, 这些数据可以照样被正确处理,就像实时发生的一样能够进入正确的窗口。

这种在Flink中去测量事件时间的进度的机制就是watermark(水印/水位)。watermark作为数据流的一部分在流动, 并且携带一个时间戳t。

Flink内置了两种水印生成器:

  1. Monotonously Increasing Timestamps(时间戳单调增长:其实就是允许的延迟为0),看源码可知该生成器也是继承了允许固定时间延迟的类,只是设置允许延迟为0。
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
	return (ctx) -> new AscendingTimestampsWatermarks<>();
}

public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
	/**
	 * Creates a new watermark generator with for ascending timestamps.
	 */
	public AscendingTimestampsWatermarks() {
		super(Duration.ofMillis(0));
	}
}
  1. Fixed Amount of Lateness(允许固定时间的延迟)
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

二、乱序数据中对延迟数据的处理

已经添加了wartemark之后, 仍有数据会迟到怎么办? Flink的窗口, 也允许迟到数据.

当触发了窗口计算后, 会先计算当前的结果, 但是此时并不会关闭窗口.以后每来一条迟到数据, 则触发一次这条数据所在窗口计算(增量计算).

那么什么时候会真正的关闭窗口呢? wartermark 超过了窗口结束时间+等待时间

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

注意:允许迟到只能运用在event time上

举例文字说明:

如上图所画,数据流中数字代表事件时间戳,圆圈代表一个事件;一个乱序数据流从7秒开始,代码上设定10秒的滚动窗口,允许延迟时间为7秒,则Flink在对数据处理时的流程如下:
在这里插入图片描述

①7秒的事件时间会进入第一个0-10秒的窗口(window)
②14秒的事件时间会进入第二个10-20秒的窗口(window),第一个窗口并不会关闭
③8秒的事件时间会进入第一个0-10秒的窗口(window)
④17秒的事件时间会进入第二个10-20秒的窗口(window),水印超过窗口结束时间+等待时间(10+7),第一个窗口关闭并计算输出
⑤9秒的事件时间由于第一个窗口已经关闭,按代码设定从侧输出流输出

2.代码实例:

先上结果,运行Flink_Watermark_SideOutput类从hadoop102服务器9999端口接收数据,数据第二列为时间戳字段,7秒的数据进入第一个[0,10)的窗口,后续虽然进来14秒的数据进入第一个窗口,但第一个窗口并不会关闭,接着8秒进来的数据还是能进入了第一个[0,10)的窗口,当17秒的数据进来,触发了watermark的关闭条件,第一窗口关闭,最后一条9秒数据只能从侧输出流输出。
在这里插入图片描述
Flink_Watermark_SideOutput类代码:

package com.test;

import com.test.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;

public class Flink_Watermark_SideOutput {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> stream = env
                .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                });
        // 创建水印生产策略
        WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(0)) // 最大容忍的延迟时间
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { // 指定时间戳
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000;
                    }
                });

        SingleOutputStreamOperator<String> result = stream
                .assignTimestampsAndWatermarks(wms)
                .keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(7))
                .sideOutputLateData(new OutputTag<WaterSensor>("side_1") {
                }) // 设置侧输出流
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        String msg = "当前key: " + key
                                + " 窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
                                + elements.spliterator().estimateSize() + "条数据" +
                                "watermark: " + context.currentWatermark();
                        out.collect(context.window().toString());
                        out.collect(msg);
                    }
                });

        result.print();
        result.getSideOutput(new OutputTag<WaterSensor>("side_1"){}).print("side>>>");
        env.execute();
    }}

补充WaterSensor 实体类:

package com.test;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
 * 水位传感器:用于接收水位数据
 *
 * id:传感器编号
 * ts:时间戳
 * vc:水位
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;
}

学习交流,有任何问题还请随时评论指出交流。

<think>嗯,用户问Flinkwatermark是不是随着数据动的。首先,我需要确认用户对Flink的基本概念是否了解,比如数据、事件时间、处理时间,以及watermark的作用。用户可能是在学习Flink的时候遇到了watermark的概念,想更深入理解它的工作机制。 用户的问题看似简单,但需要明确watermark的生成和传播机制。首先,watermark是用来处理事件时间乱序问题的,它本身是一个特殊的时间戳,会被插入到数据中。接下来要考虑的是,watermark是随着数据一起动的吗?或者说,它是如何在不同算子之间传递的? 根据已有的知识,Flink数据数据元素和水位线组成,水位线会随着数据在算子之间传递。每个算子会根据接收到的watermark来更新自己的事件时间时钟,从而触发计算,比如窗口的闭合。这里需要解释清楚watermark是嵌入在数据中的,作为特殊记录动,并且会影响下游算子的时间处理逻辑。 可能用户真正想知道的是watermark的传播机制,以及它对程序的影响。比如,如果某个算子的处理速度慢,是否会影响下游watermark的推进?或者,在并行任务中,watermark是如何协调的?这时候需要说明Flinkwatermark对齐机制,即每个子任务会跟踪自己的watermark,而整个任务的watermark是所有子任务中最小的那个,这可能会引起处理延迟的问题。 另外,用户可能在实际应用中遇到了乱序数据处理的问题,想知道如何正确设置watermark的生成策略,比如周期性的生成还是根据特定事件生成。这时候需要提到不同的生成方式,比如AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks,以及它们适用的场景。 还需要注意用户可能的误区,比如认为watermark是全局唯一的,或者每个数据都带有自己的watermark。但实际上,watermark是周期性生成并传播的,每个算子维护自己的watermark状态,并根据上游传递的watermark来更新。 总结下来,回答需要涵盖以下几点: 1. Watermark的定义和作用:处理事件时间乱序,触发事件时间相关的计算。 2. Watermark的生成方式:周期性或基于事件生成。 3. Watermark的传播机制:随数据动,算子间传递并影响各自的事件时间时钟。 4. 并行环境下的处理:子任务的watermark对齐机制。 5. 实际应用中的配置建议:根据数据特性选择合适的生成策略,设置允许的延迟时间。 还需要给出一个例子,比如使用周期性生成watermark的代码片段,说明如何设置,并解释其效果。这样用户能更直观地理解watermark是如何在数据中工作的。 最后,确保回答结构清晰,分点说明,语言简洁,避免过于技术化的术语,让初学者也能理解。同时,检查是否有错误,比如是否每个算子都有自己的watermark,或者watermark的传播是否受数据顺序影响等,确保信息的准确性。</think>Flink 中的 **Watermark(水位线)** 确实会随着数据在算子(Operator)之间动,但其本质是一种**特殊的时间戳标记**,用于在事件时间(Event Time)语义下解决数据乱序问题。以下是详细解释: --- ### 一、Watermark 的核心作用 1. **事件时间处理** 当使用 `Event Time` 时,数据可能因网络延迟等原因乱序到达。Watermark 的作用是: - **标记事件时间进度**:表示“当前时间之前的数据理论上已到达”(允许一定的延迟)。 - **触发计算**:例如窗口(Window)的闭合、定时器(Timer)触发。 2. **乱序容忍机制** 通过设置 `allowedLateness`,Watermark 允许在窗口关闭后短暂保留数据,以处理延迟到达的数据--- ### 二、Watermark动机制 1. **生成方式** - **周期性生成**:通过 `AssignerWithPeriodicWatermarks`,按固定时间间隔生成(如每 200ms)。 - **事件驱动生成**:通过 `AssignerWithPunctuatedWatermarks`,根据特定数据生成(如带标记的数据)。 2. **嵌入数据** - Watermark 会被插入到数据中,作为**特殊记录**(非业务数据)向下游传递。 - **示例代码**(周期性生成): ```java DataStream<Event> stream = ...; stream .assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()) ); ``` 3. **传递规则** - **广播性动**:每个算子的输入分区(Input Partition)会收到来自上游的 Watermark- **对齐机制**:算子会跟踪所有输入分区的 Watermark,取 **最小值** 作为当前事件时间。 *(例如:上游有 3 个分区,Watermark 分别为 10:00、10:02、10:01,则当前算子的 Watermark 为 10:00)* --- ### 三、关键特性与影响 1. **全局进度推进** - Watermark动推动整个作业的**事件时间时钟**向前推进。 - 若某个分区的数据长期停滞(无 Watermark 更新),全局事件时间会被阻塞(需注意数据倾斜问题)。 2. **窗口触发条件** 当 Watermark ≥ **窗口结束时间** 时,窗口会触发计算。例如: - 窗口范围:`[10:00, 10:15)` - Watermark 达到 `10:15` 时,窗口关闭并输出结果。 3. **与数据的关系** - Watermark **不依赖数据内容**,仅依赖时间戳。 -数据的时间戳 ≤ 当前 Watermark,该数据可能被丢弃(除非配置了延迟容忍)。 --- ### 四、典型问题与调优 1. **Watermark 生成过慢** - 现象:窗口延迟触发。 - 调优:缩短周期性生成间隔,或优化事件驱动逻辑。 2. **数据倾斜导致对齐阻塞** - 现象:某个分区的 Watermark 停滞,拖慢全局进度。 - 调优:检查数据分布,修复倾斜源(如 `keyBy` 后的热点 Key)。 3. **乱序时间过长** - 现象:Watermark 触发窗口后仍有大量迟到数据- 调优:增大 `allowedLateness` 或调整 Watermark 生成策略(如 `forBoundedOutOfOrderness` 的时间阈值)。 --- ### 五、总结 - **Watermark 是随着数据动的**,但其动逻辑独立于业务数据,用于协调全局事件时间进度。 - 正确设置 Watermark 策略(生成频率、乱序容忍度)是保证 Flink 事件时间处理可靠性的关键。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值