使用Flink1.14.3与Kafka、Fine BI练习搜狗日志实时BI小项目

前言

概述

这年头IT发展很快,稍不留神,Flink已经1.14.4了,Fine BI居然能做实时BI了。。。遂拿经典的Sougoulogs小项目练练手,体验下一步一个坑的快感。

得益于Flink1.14实现了API层面真正的流批一体,批处理也可以用流的方式实现,Kappa架构运维起来还是要比流批分离的Lambda架构容易很多。当然也有软件厂将原有的Spark与Streaming任务切换为离线跑Hive(当然是跑Map Reduce,都不会跑Tez),好腾出一些内存条给Flink集群跑实时流计算。各有各的想法。

本小项目主要是从csv读取Log,sink到MQ Kafka中,模拟数据的采集生成部分。之后从MQ读取数据,处理后sink到MySQL。最终使用Fine BI制作实时BI实现数据可视化。适合像笔者这样用过老版Flink,会SQL,又不咋会Echarts的学徒工学习。

效果预览

B站直达

CDSN视频

使用Flink+Kafka+FineBI练手搜狗日志BI小项目

组件版本

使用之前搭建好的USDP2.0集群。
Flink:手动更新的1.14.3;
Kafka:自带的2.0.1;还有自带的KafkaEagle;
FineBI:5.1个人免费版。

准备工作

Kafka建Topic

[root@zhiyong2 lib]# kafka-topics.sh --zookeeper zhiyong2:2181 --delete --topic sougoulogs
Topic sougoulogs is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --create --topic sougoulogs --replication-factor 3 --partitions 3
Created topic "sougoulogs".
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list
sougoulogs

使用Kafka

Kafka是实时MQ使用最多的组件,还有个Pulsar最近也流行起来了。对广大学徒工来说,有GUI还是要友好很多。

启动Kafka及KafkaEagle

USDP的Web UI直接启动即可,灰常方便。还可以打开可视化类的KafkaEagle:

http://zhiyong4:8048/
admin,123456

测试Kafka功能

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1nyBX1sT-1649866898000)(E:\study\studyBigDataProj\sougouLogs\md\搜狗日志项目.assets\image-20220412205215698.png)]
可以直接在KafkaEagle的Mock中选取sougoulogs这个topic,然后造数据:

00:00:01 123456 [哈哈\] 2 3 www.baidu.com

Send之后还可以在KSQL中写SQL看到数据:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Hl62Q07O-1649866898002)(E:\study\studyBigDataProj\sougouLogs\md\搜狗日志项目.assets\image-20220412205358651.png)]
在框子里敲:

select * from sougoulogs where `partition` in (0,1,2) limit 5

当然也可以用命令行开启个Consumer查看数据:

[root@zhiyong2 ~]# kafka-console-consumer.sh --bootstrap-server zhiyong2:9092 --topic sougoulogs
00:00:01 123456 [哈哈\] 2 3 www.baidu.com

这年头大数据组件都SQL化了,实在是便宜了那些SQL Boy,只会SQL就能胜任数仓相关的岗位。主要还是数据源比较规整,都是结构化数据。处理半结构化、非结构化数据,或者搭建平台,只会SQL,技能栈就捉襟见肘了。

MySQL建库表

MySQL建的库表用于将运算后的数据展示到BI。

create database sougoulogs;
use sougoulogs;
create table if not exists newscount (
	name varchar(50) not null,
    count int(11) not null
);

create table if not exists periodcount(
	logtime varchar(50) not null,
    count int(11) not null
);

实时BI

总共3个组件,只用到2个表和3条SQL:

SELECT `name`,`count` FROM newscount ORDER by `count` desc LIMIT 10	--newscount
select count(1) from newscount --counts
select `logtime`,`count` from periodcount order by logtime desc limit 10 --periodcount

新版本Fine BI有点像Tableau,对广大学徒工很友好,官网也有详细教程,不赘述。大概流程:
添加连接→添加实时数据源→添加数据集→配置组件绘制Dashboard。

哪怕不会写Spring和Echarts也不影响展示数据。

编写Flink程序

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>sougoulogs</artifactId>
        <groupId>com.zhiyong</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
    </repositories>

    <artifactId>flinkDemo</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.14.3</flink.version>
        <scala.version>2.12.14</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink操作hadoop-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-filesystems</artifactId>
            <version>${flink.version}</version>
            <type>pom</type>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <!--            <version>1.14.4</version>-->
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--        可以使用Lombok的@注解-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
        </dependency>

        <!--        MySQL驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>

    </dependencies>




    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 可以设置jar包的入口类(可选) -->
                                    <!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

Mock数据

package com.zhiyong.flinkDemo;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;

/**
 * @program: sougoulogs
 * @description: 模拟生成数据的方法
 * @author: zhiyong
 * @create: 2022-04-13 18:01
 **/
public class MockData {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> data = env.addSource(new CsvSource());

        data.print("自定义数据源产生的数据:");

        env.execute("模拟数据");

    }

    static class CsvSource extends RichSourceFunction<String> {
        FileInputStream fis = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        String line = null;
        String csvPath = ConstConfig.csvPath;

        String lastStr = "";//上一条数据

        boolean needRun = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (needRun != false) {
                while (null != (line = br.readLine())) {
                    //System.out.println("读取到数据" + line);
                    ctx.collect(line);

                    if ( StringUtils.isNoneBlank(lastStr) && line.split(",")[0] != lastStr.split(",")[0]) {
                        Thread.sleep((Long.parseLong(line.split(",")[0])
                                - Long.parseLong(lastStr.split(",")[0])) * 1000);
                    }

                    lastStr = line;
                }
            }
        }

        @Override
        public void cancel() {
            needRun = false;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            fis = new FileInputStream(csvPath);
            isr = new InputStreamReader(fis, "UTF-8");
            br = new BufferedReader(isr);
        }

        @Override
        public void close() throws Exception {
            super.close();
            { // 最后要关流,防止文件持续占用
                if (null != br) {
                    br.close();
                }
                if (null != isr) {
                    isr.close();
                }
                if (null != fis) {
                    fis.close();
                }
            }
        }
    }
}

自定义数据源的目的是防止Flink按照批处理的模式,将CSV一次性全部读完,会自动退出,看不出流的过程。同时限速也能防止开发时出现内存溢出等资源不够的问题。

Sink数据到Kafka

package com.zhiyong.flinkDemo;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * @program: sougoulogs
 * @description: 从Log读数据写入Kafka
 * @author: zhiyong
 * @create: 2022-04-13 12:02
 **/
public class File2Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        String inputPath = "E:/study/flink/data/test1";
        //DataStreamSource<String> data = env.readTextFile(inputPath);//实际为批处理,跑完数据直接结束,太快

        DataStreamSource<String> data = env.addSource(new MockData.CsvSource());

        // 构建Kafka生产者
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10 + "");//需要<15分钟
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")
                //解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
                .setKafkaProducerConfig(kafkaProperties)
                //.setTransactionalIdPrefix("")
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("sougoulogs")

                        .setValueSerializationSchema(new SimpleStringSchema())

                        .build()
                )
                .build();

        data.sinkTo(kafkaSink);

        env.execute("File2Kafka");
    }
}

Sink数据到MySQL

package com.zhiyong.flinkDemo;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

/**
 * @program: sougoulogs
 * @description: 使用Flink读Kafka数据写入MySQL
 * @author: zhiyong
 * @create: 2022-04-06 22:49
 **/
public class Kafka2MySQL_Flink implements Serializable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

//        Properties prop = new Properties();
//        prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");
//        prop.setProperty("group.id","sougoulogs");
//
//        FlinkKafkaConsumer<String> kafkaConsumer =
//                new FlinkKafkaConsumer<>("sougoulogs",new SimpleStringSchema(),prop);//过时方法
//
//        DataStreamSource<String> data = env.addSource(kafkaConsumer);

//        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
//        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();


        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//在.后指定泛型
                .setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092")
                .setTopics("sougoulogs")
                .setGroupId("sougoulogs")
                .setStartingOffsets(OffsetsInitializer.earliest())
//                .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> source = env
                .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        System.out.println("构建环境完成");

        source.print();

        SingleOutputStreamOperator<String> cleanStream = source
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.split(",").length == 6;
                    }
                });

        SingleOutputStreamOperator<Tuple2<String, Integer>> newsCountsStream = cleanStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        out.collect(Tuple2.of(value.toLowerCase().split(",")[2], 1));
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> periodCountsStream = cleanStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        out.collect(new Tuple2<>(value.toLowerCase().split(",")[0], 1));
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);

        newsCountsStream.print("新闻计数流数据:");
        periodCountsStream.print("阶段汇总流数据:");

        // 落数据
        newsCountsStream.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            private Connection conn = null;
            private Statement stmt = null;

            // stream应用使用同一个连接持续写入
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                Class.forName(ConstConfig.JDBC_Driver);
                conn = DriverManager.getConnection(ConstConfig.URL, ConstConfig.user, ConstConfig.password);
            }

            // 自动生成,没什么用
//            @Override
//            public void setRuntimeContext(RuntimeContext t) {
//                super.setRuntimeContext(t);
//            }

            // 托管写数据的方法
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                try {
                    String name = value.f0.replaceAll("[\\[\\]]", "");
                    Integer count = value.f1;

                    String sql1 = "select 1 from newscount where name ='" + name + "'";
                    String sql2 = "update newscount set count =" + count + " where name ='" + name + "'";
                    String sql3 = "insert into newscount(name,count) values('" + name + "'," + count + ")";

                    stmt = conn.createStatement();
                    ResultSet resultSet = stmt.executeQuery(sql1);
                    if (resultSet.next()) {
                        stmt.execute(sql2);
                    } else {
                        stmt.execute(sql3);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            // stream应用正常情况不会关闭
            @Override
            public void close() throws Exception {
                super.close();
                if (null != stmt) {
                    stmt.close();
                }
                if (null != conn) {
                    conn.close();
                }
            }
        });

        periodCountsStream.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
            private Connection conn = null;
            private Statement stmt = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                Class.forName(ConstConfig.JDBC_Driver);
                conn = DriverManager.getConnection(ConstConfig.URL, ConstConfig.user, ConstConfig.password);
            }

            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                super.invoke(value, context);
                try {
                    String logtime = value.f0;
                    Integer count = value.f1;

                    String sql1 = "select 1 from periodcount where logtime ='" + logtime + "'";
                    String sql2 = "update periodcount set count =" + count + " where logtime ='" + logtime + "'";
                    String sql3 = "insert into periodcount(logtime,count) values('" + logtime + "'," + count + ")";

                    stmt = conn.createStatement();
                    ResultSet resultSet = stmt.executeQuery(sql1);
                    if (resultSet.next()) {
                        stmt.execute(sql2);
                    } else {
                        stmt.execute(sql3);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void close() throws Exception {
                super.close();
                if (null != stmt) {
                    stmt.close();
                }
                if (null != conn) {
                    conn.close();
                }
            }
        });

        env.execute("kafka2MySQL");

    }
}

配置类

package com.zhiyong.flinkDemo;

import java.io.Serializable;

/**
 * @program: sougoulogs
 * @description: 写死的配置类
 * @author: zhiyong
 * @create: 2022-04-06 22:43
 **/
public class ConstConfig  implements Serializable {
    public static final String JDBC_Driver = "com.mysql.cj.jdbc.Driver";
    public static final String URL = "jdbc:mysql://192.168.88.100:3306/sougoulogs";
    public static final String user = "root";
    public static final String password = "123456";
    public static final String csvPath = "E:/study/studyBigDataProj/sougouLogs/data/sougou500w_2.txt";
}

该类用于集中存储配置信息。方便修改。流计算一般是一次开启,跑很久都不关,指标计算也基本是事先定好的,修改次数不多,大部分情况写死都没啥大问题。像离线批处理那样需要动态传参,做组件或者搭平台会用得到。

遇到的问题

Flink1.14全新的Kafka连接器

版本太新,还得翻源码才能知道怎么使用,各种坑。之前的老版本可能是使用:

        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");
        prop.setProperty("group.id","sougoulogs");

        FlinkKafkaConsumer<String> kafkaConsumer =
                new FlinkKafkaConsumer<>("sougoulogs",new SimpleStringSchema(),prop);//过时方法

        DataStreamSource<String> data = env.addSource(kafkaConsumer);

不过,过时方法当然能不用就不用!!!

参照源码:

package org.apache.flink.connector.kafka.source;

/**
 * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link
 * KafkaSource}. The following example shows how to create a KafkaSource emitting records of <code>
 * String</code> type.
 *
 * <pre>{@code
 * KafkaSource<String> source = KafkaSource
 *     .<String>builder()
 *     .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
 *     .setGroupId("MyGroup")
 *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
 *     .setDeserializer(new TestingKafkaRecordDeserializationSchema())
 *     .setStartingOffsets(OffsetsInitializer.earliest())
 *     .build();
 * }</pre>
 *
 * <p>See {@link KafkaSourceBuilder} for more details.
 *
 * @param <OUT> the output type of the source.
 */
public class KafkaSource<OUT>
        implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
                ResultTypeQueryable<OUT> {
                }

还好注释有写怎么构建对象,照猫画虎抄过来魔改:

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//在.后指定泛型
                .setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092")
                .setTopics("sougoulogs")
                .setGroupId("sougoulogs")
                .setStartingOffsets(OffsetsInitializer.earliest())
//                .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

使用消费者对象时,Kafka有Exactly Once,当然是能用则用,然后就出现了报错:
Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
由于Kafka默认超时15分钟,先随便设置个10分钟防止报错:

Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10 + "");//需要<15分钟
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")
                //解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
                .setKafkaProducerConfig(kafkaProperties)
                //.setTransactionalIdPrefix("")
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("sougoulogs")

                        .setValueSerializationSchema(new SimpleStringSchema())

                        .build()
                )
                .build();

        data.sinkTo(kafkaSink);

MySQL字符编码

执行时会出现:

构建环境完成
00:00:01,123456,[哈哈],2,3,www.baidu.com
新闻计数流数据:> ([哈哈],1)
阶段汇总流数据:> ([哈哈],1)
java.sql.SQLException: Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE) for operation '='
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
	at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:184)
	at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:161)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:750)
java.sql.SQLException: Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE) for operation '='
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
	at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:137)
	at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:107)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:750)

这是因为USDP自带的MySQL有Latin和UTF-8、GBK混编。当然不能为了这个小项目影响集群稳定性。
执行:
执行:

[root@zhiyong1 usdp]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 171
Server version: 5.7.30 MySQL Community Server (GPL)

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> SHOW VARIABLES LIKE 'character_set_%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | latin1                     |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | latin1                     |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
8 rows in set (0.00 sec)

mysql>  SHOW VARIABLES LIKE 'collation_%';
+----------------------+-------------------+
| Variable_name        | Value             |
+----------------------+-------------------+
| collation_connection | utf8_general_ci   |
| collation_database   | latin1_swedish_ci |
| collation_server     | latin1_swedish_ci |
+----------------------+-------------------+
3 rows in set (0.00 sec)

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| db_hive_metastore  |
| db_hue             |
| db_ranger          |
| db_udp             |
| dolphinscheduler   |
| mysql              |
| performance_schema |
| sougoulogs         |
| sys                |
+--------------------+
10 rows in set (0.00 sec)

mysql> use sougoulogs;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql>
mysql> show tables;
+----------------------+
| Tables_in_sougoulogs |
+----------------------+
| newscount            |
| periodcount          |
+----------------------+
2 rows in set (0.00 sec)

mysql>
mysql> alter table `newscount` charset=utf8
    -> ;
Query OK, 0 rows affected (0.00 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> alter table `periodcount` charset=utf8;
Query OK, 0 rows affected (0.00 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> alter table `newscount` convert to character set utf8;
Query OK, 17 rows affected (0.03 sec)
Records: 17  Duplicates: 0  Warnings: 0

mysql> alter table `periodcount` convert to character set utf8;
Query OK, 12 rows affected (0.03 sec)
Records: 12  Duplicates: 0  Warnings: 0

只需要修改这2个ADS表的编码方式即可。

Fine BI不能自动刷新

本以为Fine BI有了实时数据源就能自动刷新,结果。。。还是太天真了。参照官网指导:
本地路径:

C:\FineBI5.1\webapps\webroot\WEB-INF\lib\fine-bi-adapter-5.1.jar
fine-bi-adapter-5.1.jar\com\finebi\web\html\show.html

在Jar包该文件的尾部插入:

</body>
<!-- 这个位置插入 --> 
<!-- 增加刷新功能 --> 
<script type="text/javascript" src="/webroot/refresh.js"></script>
<!-- 这个位置插入 --> 
</html>

在C:\FineBI5.1\webapps\webroot创建refresh.js:

setTimeout(function () {
 var b =document.title;
 var a =BI.designConfigure.reportId;//获取仪表板id
 //对仪表板id进行判断,实现指定仪表板刷新
 if (a=="7fcbb0a362314ca4b015ee26f39f5a79") {
  setInterval(function () {
   BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
   //Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
   BI.Utils.broadcastAllWidgets2Refresh(true);
  }, 5000);//定时刷新的频率,单位ms
 }
}, 2000)

刷新太快会导致占用内存和CPU比较多,GT730亮机卡凑合着能胜任。

古老的做法

当然还有很多古老的方式,也可以实现这种效果。但是迟早要随时间泯没。

以下是记录的是大势已去的历史。

Flume

采集日志可能还会用得上这货,最新版本是2018.12.18,太成(gu)熟(lao)了。。。

配置

编写一个Flume的File2Avro.properties配置文件,指定读取文件并且以Avro序列化落到另外的2个Flume节点。

[root@zhiyong2 flume]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume
[root@zhiyong2 flume]# ll
总用量 164
drwxrwxrwx. 2 hadoop hadoop    62 810 2021 bin
-rwxrwxrwx. 1 hadoop hadoop 85602 1218 2018 CHANGELOG
drwxrwxrwx. 2 hadoop hadoop   197 41 21:27 conf
-rwxrwxrwx. 1 hadoop hadoop  5681 1218 2018 DEVNOTES
-rwxrwxrwx. 1 hadoop hadoop  2873 1218 2018 doap_Flume.rdf
drwxrwxrwx. 2 hadoop hadoop  8192 810 2021 lib
-rwxrwxrwx. 1 hadoop hadoop 43405 1218 2018 LICENSE
-rwxrwxrwx. 1 hadoop hadoop   249 1218 2018 NOTICE
drwxr-xr-x  3 root   root      24 412 20:30 plugins.d
-rwxrwxrwx. 1 hadoop hadoop  2483 1218 2018 README.md
-rwxrwxrwx. 1 hadoop hadoop  1958 1218 2018 RELEASE-NOTES
drwxrwxrwx. 2 hadoop hadoop     6 31 23:10 run
drwxrwxrwx. 2 hadoop hadoop    68 726 2021 tools
[root@zhiyong2 flume]# cd conf/
[root@zhiyong2 conf]# ll
总用量 28
-rwxr-xr-x  1 root   root   1661 41 21:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 1661 1218 2018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 1455 31 23:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 1455 1218 2018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 1593 31 23:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 1568 1218 2018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 3117 31 23:10 log4j.properties
[root@zhiyong2 conf]# touch File2Avro.properties
[root@zhiyong2 conf]# ll
总用量 28
-rw-r--r--  1 root   root      0 413 00:00 File2Avro.properties
-rwxr-xr-x  1 root   root   1661 41 21:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 1661 1218 2018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 1455 31 23:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 1455 1218 2018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 1593 31 23:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 1568 1218 2018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 3117 31 23:10 log4j.properties
[root@zhiyong2 conf]# vim File2Avro.properties
#Properties文件的内容在下边
[root@zhiyong2 conf]# chmod 777 ./File2Avro.properties

这个Properties文件的内容:

agent1.sources = source1
agent1.channels = fileChannel
agent1.sinkgroups = sinkgroup1
agent1.sinks = sink1 sink2

agent1.sources.source1.type = TAILDIR
# 通过JSON记录偏移量
agent1.sources.source1.positionFile = /bigdataproj/data/source/logs/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /bigdataproj/data/source/logs/sogou.log
agent1.sources.source1.channels = fileChannel

# 配置channel的检查点
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /bigdataproj/data/channel/checkpointDir
agent1.channels.fileChannel.dataDirs = /bigdataproj/data/channel/dataDirs

# 配置sink为负载均衡,轮询
agent1.sinkgroups.sinkgroup1.sinks = sink1 sink2
agent1.sinkgroups.sinkgroup1.processor.type = load_balance
agent1.sinkgroups.sinkgroup1.processor.backoff = true
agent1.sinkgroups.sinkgroup1.processor.selector = round_robin
agent1.sinkgroups.sinkgroup1.processor.selector.maxTimeOut=10000

# 以Avro序列化落到zhiyong3
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.channel = fileChannel
agent1.sinks.sink1.batchSize = 1
agent1.sinks.sink1.hostname = zhiyong3
agent1.sinks.sink1.port = 12345

# 以Avro序列化落到zhiyong4
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.channel = fileChannel
agent1.sinks.sink2.batchSize = 1
agent1.sinks.sink2.hostname = zhiyong4
agent1.sinks.sink2.port = 12345

编写一个Flume的Avro2HBaseAndKafka.properties配置文件,读取Avro并且都把数据落入HBase和Kafka。

[root@zhiyong3 conf]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume/conf
[root@zhiyong3 conf]# touch Avro2HBaseAndKafka.properties
[root@zhiyong3 conf]# chmod 777 ./Avro2HBaseAndKafka.properties
[root@zhiyong3 conf]# ll
总用量 24
-rwxrwxrwx  1 root   root      0 413 00:04 Avro2HBaseAndKafka.properties
-rw-rw-r--. 1 hadoop hadoop 1661 1218 2018 flume-conf.properties.template
-rwxr-xr-x. 1 hadoop hadoop 1455 31 23:10 flume-env.ps1
-rw-rw-r--. 1 hadoop hadoop 1455 1218 2018 flume-env.ps1.template
-rwxr-xr-x. 1 hadoop hadoop 1593 31 23:10 flume-env.sh
-rw-rw-r--. 1 hadoop hadoop 1568 1218 2018 flume-env.sh.template
-rwxrwxr-x. 1 hadoop hadoop 3117 31 23:10 log4j.properties
[root@zhiyong3 conf]# vim Avro2HBaseAndKafka.properties
#Properties的内容在下边

[root@zhiyong3 conf]# scp ./Avro2HBaseAndKafka.properties root@zhiyong4:$PWD
Avro2HBaseAndKafka.properties                                                                                               100% 1617   466.5KB/s   00:00
[root@zhiyong3 conf]#

这个Properties文件的内容:

agent1.sources = source1
agent1.channels = HBaseChannel KafkaChannel 
agent1.sinks =  HBaseSink kafkaSink

# Define and configure an avro
agent1.sources.source1.type = avro
agent1.sources.source1.channels = KafkaChannel HBaseChannel
# 监听所有主机
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 12345

agent1.sources.source1.selector.type = replicating

# 配置HBase的channel
agent1.channels.HBaseChannel.type = memory
agent1.channels.HBaseChannel.capacity = 10000
agent1.channels.HBaseChannel.transactionCapacity = 10000
agent1.channels.HBaseChannel.keep-alive = 5

# Define and configure a  sink
agent1.sinks.HBaseSink.type = asynchbase
agent1.sinks.HBaseSink.channel = HBaseChannel
agent1.sinks.HBaseSink.table = sougoulogs
agent1.sinks.HBaseSink.serializer = org.apache.flume.sink.hbase.SougoulogsAsyncHbaseEventSerializer
agent1.sinks.HBaseSink.zookeeperQuorum = zhiyong2:2181,zhiyong3:2181,zhiyong4:2181
agent1.sinks.HBaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
agent1.sinks.HBaseSink.znodeParent = /hbase
agent1.sinks.HBaseSink.columnFamily = info

# 配置Kafka的channel
agent1.channels.KafkaChannel.type = memory
agent1.channels.KafkaChannel.capacity = 10000
agent1.channels.KafkaChannel.transactionCapacity = 10000
agent1.channels.KafkaChannel.keep-alive = 5

agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = sougoulogs
agent1.sinks.kafkaSink.brokerList = zhiyong2:9092,zhiyong3:9092,zhiyong4:9092
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.kafkaSink.producer.acks = 1
agent1.sinks.kafkaSink.producer.requiredAcks = 1
agent1.sinks.kafkaSink.channel = KafkaChannel

补上就可以成功跑起来:

[root@zhiyong3 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:18:08 INFO source.AvroSource: Avro source source1 started.


同样的方式启动另一个Flume:

[root@zhiyong3 ~]# ssh zhiyong4
Last login: Tue Apr 12 20:39:23 2022 from zhiyong3
[root@zhiyong4 ~]# date
2022年 04月 13日 星期三 00:11:17 CST
[root@zhiyong4 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:19:58 INFO source.AvroSource: Avro source source1 started.


之后启动采集数据的Flume:

[root@zhiyong2 ~]# mkdir -p /bigdataproj/data/source/logs
[root@zhiyong2 ~]# cd /bigdataproj/data/source/logs
[root@zhiyong2 logs]# touch sogou.log
[root@zhiyong2 logs]# chmod 777 ./sogou.log
[root@zhiyong2 logs]# cd
[root@zhiyong2 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/File2Avro.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1: Building RpcClient with hostname: zhiyong3, port: 12345
2022-04-13 00:25:59 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2022-04-13 00:25:59 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1 started.

Flink直接写HBase貌似比二开Flume还简单。Flume写HBase常用于HBase1,Flume1.9才能二开HBase2,HBase3都出现很久了。就让前浪拍x在沙滩上。采集日志Tail到到Kafka的功能还比较实用。

Spark

批处理用的挺多,2018年还有人用Streaming做简单的micro-batch处理,不过这都4年过去了。。。Structured Streaming又只能保证at least once,做不到Exactly Once。。。单独批处理当然还是优先Spark。

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>sougoulogs</artifactId>
        <groupId>com.zhiyong</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>sparkDemo</artifactId>

    <!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
    </repositories>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.0.1</spark.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- 添加spark依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <!--        可以使用Lombok的@注解-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
        </dependency>

<!--        MySQL驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 可以设置jar包的入口类(可选) -->
                                    <!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
                                </transformer>
                            </transformers>
                      </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

配置类

package com.zhiyong.sparkDemo

// 该类存储配置
object Constants {
  val url = "jdbc:mysql://192.168.88.100/sougoulogs"
  val userName = "root"
  val passWord = "123456"
  val kafkaServer = "zhiyong2:9092,zhiyong3:9092,zhiyong4:9092"
  val groupId = "sougoulogs"
  val offset = "earliest"
  val topic = "sougoulogs"
  val mysql8driver = "com.mysql.cj.jdbc.Driver"

}

定义连接池

package com.zhiyong.sparkDemo;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;

/**
 * @program: sougoulogs
 * @description: MySQL的连接池
 * @author: zhiyong
 * @create: 2022-04-06 19:46
 **/
public class MySQLPool implements Serializable {
    private String url ="";
    private String user ="";
    private String password ="";
    private int maxNum = 5;// 最大同时使用的连接数
    private int currentNum =0;//当前产生的连接数
    private LinkedList<Connection> connections = new LinkedList<>();//可用连接

    /**
     * 传参用的构造方法
     * @param url JDBC连接串
     * @param user  用户名
     * @param password 密码
     */
    public void init(String url, String user, String password){
        this.url=url;
        this.user=user;
        this.password=password;
    }

    /**
     * 获取连接池的连接对象
     * @return 返回连接对象
     * @throws SQLException 抛异常
     */
    public synchronized Connection getConn() throws Exception {
        if (connections.isEmpty()){
            addDrive();
            Connection connection = DriverManager.getConnection(url, user, password);
            connections.push(connection);//入栈
            currentNum++;
        }
        return connections.poll();//弹栈
    }

    /**
     * 加载驱动
     */
    private void addDrive() throws Exception {
        if (maxNum>currentNum&&(!connections.isEmpty())){
            Thread.sleep(1000);//等待获取连接
            addDrive();
        } else {
            Class.forName(Constants.mysql8driver());//加载驱动
        }
    }

    /**
     * 清除连接
     * @param conn 传入连接对象
     */
    private void clearConn(Connection conn){
        connections.push(conn);
    }


}

由于Spark的源码为Scala,实际上可以实现Java与Scala混编。

Streaming方式

package com.zhiyong.sparkDemo

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, DriverManager, Statement}

// 使用SparkStreaming:读取Kafka数据,处理后落入MySQL
object Kafka2MySQL_Streaming {
  // 统计新闻浏览量
  def newsCount(records: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null //声明SQL连接
    var statement: Statement = null //生命SQL执行对象

    try {
      conn = DriverManager.getConnection(Constants.url, Constants.userName, Constants.passWord) //获取连接对象

      records.foreach(data => {
        val name = data._1.replaceAll("[\\[\\]]", "")
        val count = data._2

        val sql1 = "select 1 from newscount where name = '" + name + "'"//用于判断是否已经有这条数据
        val sql2 = "update newscount set count = " + count + " where name = '" + name + "'"
        val sql3 = "insert into newscount(name,count) values('" + name + "'," + count + ")"

        statement = conn.createStatement()
        val resultSet = statement.executeQuery(sql1)
        if (resultSet.next()) {
          statement.executeUpdate(sql2) //有数据就更新
        } else {
          statement.execute(sql3) //没有数据就插入
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally { //最后要关流
      if (null != statement) {
        statement.close()
      }
      if (null != conn) {
        conn.close()
      }
    }

  }

  // 统计时段浏览量
  def periodCount(records: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var statement: Statement = null

    try {
      conn = DriverManager.getConnection(Constants.url, Constants.userName, Constants.passWord)
      records.foreach(data => {
        val logtime = data._1
        val count = data._2

        val sql1 = "select 1 from periodcount where logtime = '" + logtime + "'"
        val sql2 = "update periodcount set count = " + count + " where logtime='" + logtime + "'"
        val sql3 = "insert into periodcount(logtime,count) values('" + logtime + "'," + count + ")"

        statement = conn.createStatement()
        val resultSet = statement.executeQuery(sql1)

        if (resultSet.next()) {
          statement.execute(sql2)
        } else {
          statement.execute(sql3)
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (null != statement) {
        statement.close()
      }
      if (null != conn) {
        conn.close()
      }
    }
  }


  // 主方法
  def main(args: Array[String]): Unit = {
    print("start:Kafka2MySQL_Streaming")
    val sparkConf = new SparkConf().setAppName("sougoulogs_Streaming")
      .setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(1)) // 每秒做一次micro-batch

    // 统一配置Kafka的参数
    val kafkaParams = Map[String, Object] {
      "bootstrap.servers" -> Constants.kafkaServer
      "key.deserializer" -> classOf[StringDeserializer]
      "value.deserializer" -> classOf[StringDeserializer]
      "group.id" -> Constants.groupId
      "auto.offset.reset" -> Constants.offset
      "enable.auto.commit" -> java.lang.Boolean.TRUE
    }

    val topics = Array(Constants.topic)
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    //    val lines = stream.map(record => record.value)
    // 去除脏数据
    val cleanStream = stream.map(_.value())
      .map(_.split(","))
      .filter(_.length == 6)

    // 获取新闻浏览量指标的流
    val newsCountDStream = cleanStream
      .map(x => (x(2), 1))
      .reduceByKey(_ + _)

    // 获取时段浏览量的流
    val periodCountDStream = cleanStream
      .map(x => (x(0), 1))
      .reduceByKey(_ + _)

    // 处理流
    newsCountDStream.foreachRDD(rdd => {
      print("每个分区都执行读取Kafka数据计算新闻话题浏览量并将结果写入MySQL")
      rdd.foreachPartition(newsCount)
    })

    periodCountDStream.foreachRDD(rdd => {
      print("每个分区都执行读取Kafka数据计算时段浏览量并将结果写入MySQL")
      rdd.foreachPartition(periodCount)
    })

    //启动流
    ssc.start()
    // Spark的Streaming(DStream)必须有这句,否则会报错
    ssc.awaitTermination()

  }

}

Structured Streaming方式

package com.zhiyong.sparkDemo

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

// 使用SparkStructuredStreaming:读取Kafka数据,处理后写入MySQL
object Kafka2MySQL_StructuredStreaming {
  //

  // 主方法
  def main(args: Array[String]): Unit = {
    print("start:Kafka2MySQL_StructuredStreaming")
    val spark = SparkSession.builder()
      .appName("")
      .master("local[2]")
      .getOrCreate()

    import spark.implicits._//导入隐式推测防止报错


    val cleanSs = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Constants.kafkaServer)
      .option("subscribe", Constants.topic)
      .load()
      .selectExpr("cast(key as string)", "cast(value as string)") //Java版Spark可以传String[]
      .as[(String, String)]
      .map(_._2)
      .map(_.split(","))
      .filter(6 == _.length)

    val newsCountsSs = cleanSs.map(_ (2))
      .groupBy("value")
      .count().toDF("name", "count")

    val periodCountsSs = cleanSs.map(_ (0))
      .groupBy("value")
      .count()
      .toDF("logtime", "count")

    val sink1 = new Sink1(Constants.url, Constants.userName, Constants.passWord)
    val sink2 = new Sink2(Constants.url, Constants.userName, Constants.passWord)

    val query1 = newsCountsSs.writeStream
      .foreach(sink1)
      .outputMode(OutputMode.Update())
      .trigger(Trigger.ProcessingTime("1 seconds"))
      .start()

    val query2 = periodCountsSs.writeStream
      .foreach(sink2)
      .outputMode(OutputMode.Update())
      .trigger(Trigger.ProcessingTime("1 seconds"))
      .start()

    query1.awaitTermination()
    query2.awaitTermination()

  }

}

对应的2个Sink:

package com.zhiyong.sparkDemo

import org.apache.spark.sql.{ForeachWriter, Row}

import java.sql.{Connection, ResultSet, Statement}

/**
 * 自定义Sink,将数据写入MySQL
 *
 * @param url      连接串
 * @param user     用户名
 * @param password 密码
 */
class Sink1(url: String, user: String, password: String) extends ForeachWriter[Row] {
  var statement: Statement = _
  var resultset: ResultSet = _
  var conn: Connection = _


  /**
   * 初始化连接,从连接池获取连接对象
   *
   * @param partitionId
   * @param epochId
   * @return
   */
  override def open(partitionId: Long, epochId: Long): Boolean = {
    Class.forName(Constants.mysql8driver)
    val pool = new MySQLPool()
    pool.init(url, user, password)
    conn = pool.getConn
    statement = conn.createStatement()
    true
  }

  override def process(value: Row): Unit = {
    val name = value.getAs[String]("name").replace("[\\[\\]]", "")
    val count = value.getAs[Long]("count").asInstanceOf[Int]
    val sql1 = "select 1 from newscount where name = '" + name + "'"
    val sql2 = "insert into newscount(name,count) values('" + name + "'," + count + ")"
    val sql3 = "update newscount set count = " + count + " where name = '" + name + "'"

    try {
      resultset = statement.executeQuery(sql1)
      if (resultset.next()) {
        statement.execute(sql3)
      } else {
        statement.execute(sql2)
      }
    } catch {
      case e: Exception => print("出现异常“" + e.getMessage)
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    if (null != statement) {
      statement.close()
    }
    if (null != conn) {
      conn.close()
    }
  }
}

另一个半差不差:

package com.zhiyong.sparkDemo

import org.apache.spark.sql.{ForeachWriter, Row}

import java.sql.{Connection, ResultSet, Statement}

class Sink2(url: String, user: String, password: String) extends ForeachWriter[Row]{
  var statement: Statement = _
  var resultset: ResultSet = _
  var conn: Connection = _

  /**
   * 初始化连接,从连接池获取连接对象
   *
   * @param partitionId
   * @param epochId
   * @return
   */
  override def open(partitionId: Long, epochId: Long): Boolean = {
    Class.forName(Constants.mysql8driver)
    val pool = new MySQLPool()
    pool.init(url, user, password)
    conn = pool.getConn
    statement = conn.createStatement()
    true
  }

  override def process(value: Row): Unit = {
    val logtime = value.getAs[String]("logtime")
    val count = value.getAs[Long]("count").asInstanceOf[Int]
    val sql1 = "select 1 from periodcount where logtime = '" + logtime + "'"
    val sql2 = "insert into periodcount(logtime,count) values('" + logtime + "'," + count + ")"
    val sql3 = "update periodcount set count = " + count + " where logtime = '" + logtime + "'"

    try {
      resultset = statement.executeQuery(sql1)
      if (resultset.next()) {
        statement.execute(sql3)
      } else {
        statement.execute(sql2)
      }
    } catch {
      case e: Exception => print("出现异常“" + e.getMessage)
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    if (null != statement) {
      statement.close()
    }
    if (null != conn) {
      conn.close()
    }
  }
}

追加数据的方法

package com.zhiyong.sparkDemo;

import java.io.*;

/**
 * @program: sougoulogs
 * @description: 模拟数据生成
 * @author: zhiyong
 * @create: 2022-04-04 23:43
 **/
public class MockData {
    // 主方法
    public static void main(String[] args) {
        String inputPath = args[0];
        String outputPath = args[1];
        String interval = args[2];
        System.out.println("读数路径:" + interval);
        System.out.println("写log路径“" + outputPath);
        System.out.println("产生数据间隔:" + interval + "ms");
        try {
            mock(inputPath, outputPath, interval);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 模拟生成数据
     *
     * @param inputPath  读取文件的路径
     * @param outputPath 写入数据的路径
     * @param interval   mock数据的时间间隔
     */
    private static void mock(String inputPath, String outputPath, String interval) throws IOException {
        FileInputStream fis = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        String line = null;

        try {
            int counter = 1;//记录行号
            fis = new FileInputStream(inputPath);
            isr = new InputStreamReader(fis, "GBK");
            br = new BufferedReader(isr);
            while (null != (line = br.readLine())) {
                System.out.println("产生第" + counter + "条数据:" + line);
                wrieData(outputPath, line);
                counter++;
                Thread.sleep(Long.parseLong(interval));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally { // 最后要关流,防止文件持续占用
            if (null != br) {
                br.close();
            }
            if (null != isr) {
                isr.close();
            }
            if (null != fis) {
                fis.close();
            }
        }
    }

    /**
     * 写数据
     *
     * @param outputPath 输出路径
     * @param line       每行数据
     */
    private static void wrieData(String outputPath, String line) throws IOException {
        FileOutputStream fos = null;
        OutputStreamWriter osr = null;
        BufferedWriter bw = null;
        try {
            fos = new FileOutputStream(outputPath,true);//允许追加
            osr = new OutputStreamWriter(fos);
            bw = new BufferedWriter(osr);
            bw.write(line);
            bw.write("\n");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null!=bw){
                bw.close();
            }
            if (null!=osr){
                osr.close();
            }
            if(null!=fos){
                fos.close();
            }
        }
    }
}

为了使用Flume的Tail采集数据,需要写个自动的方法来替代手工echo并重定向到文件。

其实在Flink里使用各种富函数,重新定义方法就很方便,不需要这么麻烦。Flume、Sqoop这类工具,可能只是方便不会写Java的SQL Boy们吧。不过这年头已经有阿里云DataPhin,华为云FusionInsight之类的成熟产品,还有kyuubi这类可以直接配置就能跑数据集成的开源组件,大数据的门槛属实低了好多。

评论 3
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值