Flink实时数仓项目—ODS层日志数据到DWD层
前言
前面已经将日志数据和业务数据采集到了Kafka中,Kafka中的ods_xx主题就作为了实时数仓的ODS层。
行为日志分为三类,页面日志、启动日志和曝光日志,这三类日志的格式不一样,我们需要分别进行处理,然后将处理完的数据再写入到Kafka中,作为日志的DWD层。
一、日志数据需要做的处理
1.识别新老用户
本身客户端业务有新老用户的标识,但是不够准确(比如有些用户卸载了软件,然后再次下载了回来,is_new仍然为1),这就需要实时计算再次确认,如下图:
2.日志数据的处理
前言中提到日志数据分为三类,页面日志、启动日志和曝光日志。我们要将这三种不同的日志区分开来,分别发送到不同的Kafka的主题中。
Flink中可以使用测输出流这个方法将不同类型的数据放到不同的流里,这里设计将页面日志输出到主流,启动日志输出到启动日志的测输出流,曝光日志输出到曝光日志的测输出流。
3.发送数据到Kafka
在日志数据拆分之后,分别将不同类型的日志数据发送到不同的Kafka的主题中。
二、功能实现
1.读取Kafka数据并转换数据格式
首先,要从Kafka的ods_base_log主题中读取日志数据,但是读取出来的数据是String类型的,我们要将数据转化为jsonObject类型的,然后才方便从里面取出对应的数据。
因为是Flink从Kafka中读取数据,所以我们可以将获取KafkaConsumer也封装成一个方法,MyKafkaUtil方法如下:
public class MyKafkaUtil {
//Kafka链接地址
private static String KAFKA_SERVE="hadoop102:9092,hadoop103:9092,hadoop104:9092";
//Kafka消费者的配置
static Properties sinkProperties=new Properties();
//Kafka生产者的配置
static Properties sourceProperties=new Properties();
static{
sinkProperties.setProperty("bootstrap.servers",KAFKA_SERVE);
sourceProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVE);
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
return new FlinkKafkaProducer<String>(topic,new SimpleStringSchema(),sinkProperties);
}
public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){
sourceProperties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),sourceProperties);
}
}
接下来,编写主程序,消费Kafka里的数据(注意:可能会出现异常):
//2、消费ods_vase_log的数据
String sourceTopic="ods_base_log";
String group