Flink实时数仓项目—ODS层日志数据到DWD层

本文介绍了如何使用Flink处理日志数据,包括识别新老用户、日志数据分类及发送到Kafka。通过读取Kafka中的ODS层日志,转化数据格式,根据日志类型分流并写入对应DWD层主题。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >


前言

前面已经将日志数据和业务数据采集到了Kafka中,Kafka中的ods_xx主题就作为了实时数仓的ODS层。
行为日志分为三类,页面日志、启动日志和曝光日志,这三类日志的格式不一样,我们需要分别进行处理,然后将处理完的数据再写入到Kafka中,作为日志的DWD层。


一、日志数据需要做的处理

1.识别新老用户

本身客户端业务有新老用户的标识,但是不够准确(比如有些用户卸载了软件,然后再次下载了回来,is_new仍然为1),这就需要实时计算再次确认,如下图:
在这里插入图片描述

2.日志数据的处理

前言中提到日志数据分为三类,页面日志、启动日志和曝光日志。我们要将这三种不同的日志区分开来,分别发送到不同的Kafka的主题中。
Flink中可以使用测输出流这个方法将不同类型的数据放到不同的流里,这里设计将页面日志输出到主流,启动日志输出到启动日志的测输出流,曝光日志输出到曝光日志的测输出流。

3.发送数据到Kafka

在日志数据拆分之后,分别将不同类型的日志数据发送到不同的Kafka的主题中。

二、功能实现

1.读取Kafka数据并转换数据格式

首先,要从Kafka的ods_base_log主题中读取日志数据,但是读取出来的数据是String类型的,我们要将数据转化为jsonObject类型的,然后才方便从里面取出对应的数据。
因为是Flink从Kafka中读取数据,所以我们可以将获取KafkaConsumer也封装成一个方法,MyKafkaUtil方法如下:

public class MyKafkaUtil {
   
    //Kafka链接地址
    private static String KAFKA_SERVE="hadoop102:9092,hadoop103:9092,hadoop104:9092";

    //Kafka消费者的配置
    static Properties sinkProperties=new Properties();

    //Kafka生产者的配置
    static Properties sourceProperties=new Properties();

    static{
   
        sinkProperties.setProperty("bootstrap.servers",KAFKA_SERVE);

        sourceProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVE);
    }


    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
   
        return new FlinkKafkaProducer<String>(topic,new SimpleStringSchema(),sinkProperties);
    }

    public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){
   
        sourceProperties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),sourceProperties);

    }
}

接下来,编写主程序,消费Kafka里的数据(注意:可能会出现异常):

        //2、消费ods_vase_log的数据
        String sourceTopic="ods_base_log";
        String group
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值