前言
概述
这年头IT发展很快,稍不留神,Flink已经1.14.4了,Fine BI居然能做实时BI了。。。遂拿经典的Sougoulogs小项目练练手,体验下一步一个坑的快感。
得益于Flink1.14实现了API层面真正的流批一体,批处理也可以用流的方式实现,Kappa架构运维起来还是要比流批分离的Lambda架构容易很多。当然也有软件厂将原有的Spark与Streaming任务切换为离线跑Hive(当然是跑Map Reduce,都不会跑Tez),好腾出一些内存条给Flink集群跑实时流计算。各有各的想法。
本小项目主要是从csv读取Log,sink到MQ Kafka中,模拟数据的采集生成部分。之后从MQ读取数据,处理后sink到MySQL。最终使用Fine BI制作实时BI实现数据可视化。适合像笔者这样用过老版Flink,会SQL,又不咋会Echarts的学徒工学习。
效果预览
B站直达。
使用Flink+Kafka+FineBI练手搜狗日志BI小项目
组件版本
使用之前搭建好的USDP2.0集群。
Flink:手动更新的1.14.3;
Kafka:自带的2.0.1;还有自带的KafkaEagle;
FineBI:5.1个人免费版。
准备工作
Kafka建Topic
[root@zhiyong2 lib]# kafka-topics.sh --zookeeper zhiyong2:2181 --delete --topic sougoulogs
Topic sougoulogs is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --create --topic sougoulogs --replication-factor 3 --partitions 3
Created topic "sougoulogs".
[root@zhiyong2 ~]# kafka-topics.sh --zookeeper zhiyong2:2181 --list
sougoulogs
使用Kafka
Kafka是实时MQ使用最多的组件,还有个Pulsar最近也流行起来了。对广大学徒工来说,有GUI还是要友好很多。
启动Kafka及KafkaEagle
USDP的Web UI直接启动即可,灰常方便。还可以打开可视化类的KafkaEagle:
http://zhiyong4:8048/
admin,123456
测试Kafka功能
可以直接在KafkaEagle的Mock中选取sougoulogs这个topic,然后造数据:
00:00:01 123456 [哈哈\] 2 3 www.baidu.com
Send之后还可以在KSQL中写SQL看到数据:
在框子里敲:
select * from sougoulogs where `partition` in (0,1,2) limit 5
当然也可以用命令行开启个Consumer查看数据:
[root@zhiyong2 ~]# kafka-console-consumer.sh --bootstrap-server zhiyong2:9092 --topic sougoulogs
00:00:01 123456 [哈哈\] 2 3 www.baidu.com
这年头大数据组件都SQL化了,实在是便宜了那些SQL Boy,只会SQL就能胜任数仓相关的岗位。主要还是数据源比较规整,都是结构化数据。处理半结构化、非结构化数据,或者搭建平台,只会SQL,技能栈就捉襟见肘了。
MySQL建库表
MySQL建的库表用于将运算后的数据展示到BI。
create database sougoulogs;
use sougoulogs;
create table if not exists newscount (
name varchar(50) not null,
count int(11) not null
);
create table if not exists periodcount(
logtime varchar(50) not null,
count int(11) not null
);
实时BI
总共3个组件,只用到2个表和3条SQL:
SELECT `name`,`count` FROM newscount ORDER by `count` desc LIMIT 10 --newscount
select count(1) from newscount --counts
select `logtime`,`count` from periodcount order by logtime desc limit 10 --periodcount
新版本Fine BI有点像Tableau,对广大学徒工很友好,官网也有详细教程,不赘述。大概流程:
添加连接→添加实时数据源→添加数据集→配置组件绘制Dashboard。
哪怕不会写Spring和Echarts也不影响展示数据。
编写Flink程序
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sougoulogs</artifactId>
<groupId>com.zhiyong</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<artifactId>flinkDemo</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.3</flink.version>
<scala.version>2.12.14</scala.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink操作hadoop-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-filesystems</artifactId>
<version>${flink.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!-- <version>1.14.4</version>-->
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 可以使用Lombok的@注解-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
<!-- MySQL驱动包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 可以设置jar包的入口类(可选) -->
<!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Mock数据
package com.zhiyong.flinkDemo;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
/**
* @program: sougoulogs
* @description: 模拟生成数据的方法
* @author: zhiyong
* @create: 2022-04-13 18:01
**/
public class MockData {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> data = env.addSource(new CsvSource());
data.print("自定义数据源产生的数据:");
env.execute("模拟数据");
}
static class CsvSource extends RichSourceFunction<String> {
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String line = null;
String csvPath = ConstConfig.csvPath;
String lastStr = "";//上一条数据
boolean needRun = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (needRun != false) {
while (null != (line = br.readLine())) {
//System.out.println("读取到数据" + line);
ctx.collect(line);
if ( StringUtils.isNoneBlank(lastStr) && line.split(",")[0] != lastStr.split(",")[0]) {
Thread.sleep((Long.parseLong(line.split(",")[0])
- Long.parseLong(lastStr.split(",")[0])) * 1000);
}
lastStr = line;
}
}
}
@Override
public void cancel() {
needRun = false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
fis = new FileInputStream(csvPath);
isr = new InputStreamReader(fis, "UTF-8");
br = new BufferedReader(isr);
}
@Override
public void close() throws Exception {
super.close();
{ // 最后要关流,防止文件持续占用
if (null != br) {
br.close();
}
if (null != isr) {
isr.close();
}
if (null != fis) {
fis.close();
}
}
}
}
}
自定义数据源的目的是防止Flink按照批处理的模式,将CSV一次性全部读完,会自动退出,看不出流的过程。同时限速也能防止开发时出现内存溢出等资源不够的问题。
Sink数据到Kafka
package com.zhiyong.flinkDemo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
* @program: sougoulogs
* @description: 从Log读数据写入Kafka
* @author: zhiyong
* @create: 2022-04-13 12:02
**/
public class File2Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String inputPath = "E:/study/flink/data/test1";
//DataStreamSource<String> data = env.readTextFile(inputPath);//实际为批处理,跑完数据直接结束,太快
DataStreamSource<String> data = env.addSource(new MockData.CsvSource());
// 构建Kafka生产者
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10 + "");//需要<15分钟
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")
//解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
.setKafkaProducerConfig(kafkaProperties)
//.setTransactionalIdPrefix("")
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("sougoulogs")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
data.sinkTo(kafkaSink);
env.execute("File2Kafka");
}
}
Sink数据到MySQL
package com.zhiyong.flinkDemo;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
/**
* @program: sougoulogs
* @description: 使用Flink读Kafka数据写入MySQL
* @author: zhiyong
* @create: 2022-04-06 22:49
**/
public class Kafka2MySQL_Flink implements Serializable {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Properties prop = new Properties();
// prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");
// prop.setProperty("group.id","sougoulogs");
//
// FlinkKafkaConsumer<String> kafkaConsumer =
// new FlinkKafkaConsumer<>("sougoulogs",new SimpleStringSchema(),prop);//过时方法
//
// DataStreamSource<String> data = env.addSource(kafkaConsumer);
// SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
// SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//在.后指定泛型
.setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092")
.setTopics("sougoulogs")
.setGroupId("sougoulogs")
.setStartingOffsets(OffsetsInitializer.earliest())
// .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> source = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
System.out.println("构建环境完成");
source.print();
SingleOutputStreamOperator<String> cleanStream = source
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.split(",").length == 6;
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> newsCountsStream = cleanStream
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(Tuple2.of(value.toLowerCase().split(",")[2], 1));
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.sum(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> periodCountsStream = cleanStream
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value.toLowerCase().split(",")[0], 1));
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.sum(1);
newsCountsStream.print("新闻计数流数据:");
periodCountsStream.print("阶段汇总流数据:");
// 落数据
newsCountsStream.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
private Connection conn = null;
private Statement stmt = null;
// stream应用使用同一个连接持续写入
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(ConstConfig.JDBC_Driver);
conn = DriverManager.getConnection(ConstConfig.URL, ConstConfig.user, ConstConfig.password);
}
// 自动生成,没什么用
// @Override
// public void setRuntimeContext(RuntimeContext t) {
// super.setRuntimeContext(t);
// }
// 托管写数据的方法
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
try {
String name = value.f0.replaceAll("[\\[\\]]", "");
Integer count = value.f1;
String sql1 = "select 1 from newscount where name ='" + name + "'";
String sql2 = "update newscount set count =" + count + " where name ='" + name + "'";
String sql3 = "insert into newscount(name,count) values('" + name + "'," + count + ")";
stmt = conn.createStatement();
ResultSet resultSet = stmt.executeQuery(sql1);
if (resultSet.next()) {
stmt.execute(sql2);
} else {
stmt.execute(sql3);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// stream应用正常情况不会关闭
@Override
public void close() throws Exception {
super.close();
if (null != stmt) {
stmt.close();
}
if (null != conn) {
conn.close();
}
}
});
periodCountsStream.addSink(new RichSinkFunction<Tuple2<String, Integer>>() {
private Connection conn = null;
private Statement stmt = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(ConstConfig.JDBC_Driver);
conn = DriverManager.getConnection(ConstConfig.URL, ConstConfig.user, ConstConfig.password);
}
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
super.invoke(value, context);
try {
String logtime = value.f0;
Integer count = value.f1;
String sql1 = "select 1 from periodcount where logtime ='" + logtime + "'";
String sql2 = "update periodcount set count =" + count + " where logtime ='" + logtime + "'";
String sql3 = "insert into periodcount(logtime,count) values('" + logtime + "'," + count + ")";
stmt = conn.createStatement();
ResultSet resultSet = stmt.executeQuery(sql1);
if (resultSet.next()) {
stmt.execute(sql2);
} else {
stmt.execute(sql3);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
super.close();
if (null != stmt) {
stmt.close();
}
if (null != conn) {
conn.close();
}
}
});
env.execute("kafka2MySQL");
}
}
配置类
package com.zhiyong.flinkDemo;
import java.io.Serializable;
/**
* @program: sougoulogs
* @description: 写死的配置类
* @author: zhiyong
* @create: 2022-04-06 22:43
**/
public class ConstConfig implements Serializable {
public static final String JDBC_Driver = "com.mysql.cj.jdbc.Driver";
public static final String URL = "jdbc:mysql://192.168.88.100:3306/sougoulogs";
public static final String user = "root";
public static final String password = "123456";
public static final String csvPath = "E:/study/studyBigDataProj/sougouLogs/data/sougou500w_2.txt";
}
该类用于集中存储配置信息。方便修改。流计算一般是一次开启,跑很久都不关,指标计算也基本是事先定好的,修改次数不多,大部分情况写死都没啥大问题。像离线批处理那样需要动态传参,做组件或者搭平台会用得到。
遇到的问题
Flink1.14全新的Kafka连接器
版本太新,还得翻源码才能知道怎么使用,各种坑。之前的老版本可能是使用:
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","zhiyong2:9092,zhiyong3:9092,zhiyong4:9092");
prop.setProperty("group.id","sougoulogs");
FlinkKafkaConsumer<String> kafkaConsumer =
new FlinkKafkaConsumer<>("sougoulogs",new SimpleStringSchema(),prop);//过时方法
DataStreamSource<String> data = env.addSource(kafkaConsumer);
不过,过时方法当然能不用就不用!!!
参照源码:
package org.apache.flink.connector.kafka.source;
/**
* The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link
* KafkaSource}. The following example shows how to create a KafkaSource emitting records of <code>
* String</code> type.
*
* <pre>{@code
* KafkaSource<String> source = KafkaSource
* .<String>builder()
* .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
* .setGroupId("MyGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(new TestingKafkaRecordDeserializationSchema())
* .setStartingOffsets(OffsetsInitializer.earliest())
* .build();
* }</pre>
*
* <p>See {@link KafkaSourceBuilder} for more details.
*
* @param <OUT> the output type of the source.
*/
public class KafkaSource<OUT>
implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
ResultTypeQueryable<OUT> {
}
还好注释有写怎么构建对象,照猫画虎抄过来魔改:
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//在.后指定泛型
.setBootstrapServers("zhiyong2:9092,zhiyong3:9092,zhiyong4:9092")
.setTopics("sougoulogs")
.setGroupId("sougoulogs")
.setStartingOffsets(OffsetsInitializer.earliest())
// .setValueOnlyDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
使用消费者对象时,Kafka有Exactly Once,当然是能用则用,然后就出现了报错:
Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
由于Kafka默认超时15分钟,先随便设置个10分钟防止报错:
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("transaction.timeout.ms",1000*60*10 + "");//需要<15分钟
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092")
//解决报错Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
.setKafkaProducerConfig(kafkaProperties)
//.setTransactionalIdPrefix("")
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("sougoulogs")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
data.sinkTo(kafkaSink);
MySQL字符编码
执行时会出现:
构建环境完成
00:00:01,123456,[哈哈],2,3,www.baidu.com
新闻计数流数据:> ([哈哈],1)
阶段汇总流数据:> ([哈哈],1)
java.sql.SQLException: Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE) for operation '='
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:184)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$7.invoke(Kafka2MySQL_Flink.java:161)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
java.sql.SQLException: Illegal mix of collations (latin1_swedish_ci,IMPLICIT) and (utf8mb4_general_ci,COERCIBLE) for operation '='
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1201)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:137)
at com.zhiyong.flinkDemo.Kafka2MySQL_Flink$6.invoke(Kafka2MySQL_Flink.java:107)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
这是因为USDP自带的MySQL有Latin和UTF-8、GBK混编。当然不能为了这个小项目影响集群稳定性。
执行:
执行:
[root@zhiyong1 usdp]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 171
Server version: 5.7.30 MySQL Community Server (GPL)
Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> SHOW VARIABLES LIKE 'character_set_%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
8 rows in set (0.00 sec)
mysql> SHOW VARIABLES LIKE 'collation_%';
+----------------------+-------------------+
| Variable_name | Value |
+----------------------+-------------------+
| collation_connection | utf8_general_ci |
| collation_database | latin1_swedish_ci |
| collation_server | latin1_swedish_ci |
+----------------------+-------------------+
3 rows in set (0.00 sec)
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| db_hive_metastore |
| db_hue |
| db_ranger |
| db_udp |
| dolphinscheduler |
| mysql |
| performance_schema |
| sougoulogs |
| sys |
+--------------------+
10 rows in set (0.00 sec)
mysql> use sougoulogs;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql>
mysql> show tables;
+----------------------+
| Tables_in_sougoulogs |
+----------------------+
| newscount |
| periodcount |
+----------------------+
2 rows in set (0.00 sec)
mysql>
mysql> alter table `newscount` charset=utf8
-> ;
Query OK, 0 rows affected (0.00 sec)
Records: 0 Duplicates: 0 Warnings: 0
mysql> alter table `periodcount` charset=utf8;
Query OK, 0 rows affected (0.00 sec)
Records: 0 Duplicates: 0 Warnings: 0
mysql> alter table `newscount` convert to character set utf8;
Query OK, 17 rows affected (0.03 sec)
Records: 17 Duplicates: 0 Warnings: 0
mysql> alter table `periodcount` convert to character set utf8;
Query OK, 12 rows affected (0.03 sec)
Records: 12 Duplicates: 0 Warnings: 0
只需要修改这2个ADS表的编码方式即可。
Fine BI不能自动刷新
本以为Fine BI有了实时数据源就能自动刷新,结果。。。还是太天真了。参照官网指导:
本地路径:
C:\FineBI5.1\webapps\webroot\WEB-INF\lib\fine-bi-adapter-5.1.jar
fine-bi-adapter-5.1.jar\com\finebi\web\html\show.html
在Jar包该文件的尾部插入:
</body>
<!-- 这个位置插入 -->
<!-- 增加刷新功能 -->
<script type="text/javascript" src="/webroot/refresh.js"></script>
<!-- 这个位置插入 -->
</html>
在C:\FineBI5.1\webapps\webroot创建refresh.js:
setTimeout(function () {
var b =document.title;
var a =BI.designConfigure.reportId;//获取仪表板id
//对仪表板id进行判断,实现指定仪表板刷新
if (a=="7fcbb0a362314ca4b015ee26f39f5a79") {
setInterval(function () {
BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
BI.Utils.broadcastAllWidgets2Refresh(true);
}, 5000);//定时刷新的频率,单位ms
}
}, 2000)
刷新太快会导致占用内存和CPU比较多,GT730亮机卡凑合着能胜任。
古老的做法
当然还有很多古老的方式,也可以实现这种效果。但是迟早要随时间泯没。
以下是记录的是大势已去的历史。
Flume
采集日志可能还会用得上这货,最新版本是2018.12.18,太成(gu)熟(lao)了。。。
配置
编写一个Flume的File2Avro.properties配置文件,指定读取文件并且以Avro序列化落到另外的2个Flume节点。
[root@zhiyong2 flume]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume
[root@zhiyong2 flume]# ll
总用量 164
drwxrwxrwx. 2 hadoop hadoop 62 8月 10 2021 bin
-rwxrwxrwx. 1 hadoop hadoop 85602 12月 18 2018 CHANGELOG
drwxrwxrwx. 2 hadoop hadoop 197 4月 1 21:27 conf
-rwxrwxrwx. 1 hadoop hadoop 5681 12月 18 2018 DEVNOTES
-rwxrwxrwx. 1 hadoop hadoop 2873 12月 18 2018 doap_Flume.rdf
drwxrwxrwx. 2 hadoop hadoop 8192 8月 10 2021 lib
-rwxrwxrwx. 1 hadoop hadoop 43405 12月 18 2018 LICENSE
-rwxrwxrwx. 1 hadoop hadoop 249 12月 18 2018 NOTICE
drwxr-xr-x 3 root root 24 4月 12 20:30 plugins.d
-rwxrwxrwx. 1 hadoop hadoop 2483 12月 18 2018 README.md
-rwxrwxrwx. 1 hadoop hadoop 1958 12月 18 2018 RELEASE-NOTES
drwxrwxrwx. 2 hadoop hadoop 6 3月 1 23:10 run
drwxrwxrwx. 2 hadoop hadoop 68 7月 26 2021 tools
[root@zhiyong2 flume]# cd conf/
[root@zhiyong2 conf]# ll
总用量 28
-rwxr-xr-x 1 root root 1661 4月 1 21:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 1661 12月 18 2018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 1455 3月 1 23:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 1455 12月 18 2018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 1593 3月 1 23:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 1568 12月 18 2018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 3117 3月 1 23:10 log4j.properties
[root@zhiyong2 conf]# touch File2Avro.properties
[root@zhiyong2 conf]# ll
总用量 28
-rw-r--r-- 1 root root 0 4月 13 00:00 File2Avro.properties
-rwxr-xr-x 1 root root 1661 4月 1 21:25 flume-conf.properties
-rwxrwxrwx. 1 hadoop hadoop 1661 12月 18 2018 flume-conf.properties.template
-rwxrwxrwx. 1 hadoop hadoop 1455 3月 1 23:10 flume-env.ps1
-rwxrwxrwx. 1 hadoop hadoop 1455 12月 18 2018 flume-env.ps1.template
-rwxrwxrwx. 1 hadoop hadoop 1593 3月 1 23:10 flume-env.sh
-rwxrwxrwx. 1 hadoop hadoop 1568 12月 18 2018 flume-env.sh.template
-rwxrwxrwx. 1 hadoop hadoop 3117 3月 1 23:10 log4j.properties
[root@zhiyong2 conf]# vim File2Avro.properties
#Properties文件的内容在下边
[root@zhiyong2 conf]# chmod 777 ./File2Avro.properties
这个Properties文件的内容:
agent1.sources = source1
agent1.channels = fileChannel
agent1.sinkgroups = sinkgroup1
agent1.sinks = sink1 sink2
agent1.sources.source1.type = TAILDIR
# 通过JSON记录偏移量
agent1.sources.source1.positionFile = /bigdataproj/data/source/logs/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /bigdataproj/data/source/logs/sogou.log
agent1.sources.source1.channels = fileChannel
# 配置channel的检查点
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /bigdataproj/data/channel/checkpointDir
agent1.channels.fileChannel.dataDirs = /bigdataproj/data/channel/dataDirs
# 配置sink为负载均衡,轮询
agent1.sinkgroups.sinkgroup1.sinks = sink1 sink2
agent1.sinkgroups.sinkgroup1.processor.type = load_balance
agent1.sinkgroups.sinkgroup1.processor.backoff = true
agent1.sinkgroups.sinkgroup1.processor.selector = round_robin
agent1.sinkgroups.sinkgroup1.processor.selector.maxTimeOut=10000
# 以Avro序列化落到zhiyong3
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.channel = fileChannel
agent1.sinks.sink1.batchSize = 1
agent1.sinks.sink1.hostname = zhiyong3
agent1.sinks.sink1.port = 12345
# 以Avro序列化落到zhiyong4
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.channel = fileChannel
agent1.sinks.sink2.batchSize = 1
agent1.sinks.sink2.hostname = zhiyong4
agent1.sinks.sink2.port = 12345
编写一个Flume的Avro2HBaseAndKafka.properties配置文件,读取Avro并且都把数据落入HBase和Kafka。
[root@zhiyong3 conf]# pwd
/opt/usdp-srv/srv/udp/2.0.0.0/flume/conf
[root@zhiyong3 conf]# touch Avro2HBaseAndKafka.properties
[root@zhiyong3 conf]# chmod 777 ./Avro2HBaseAndKafka.properties
[root@zhiyong3 conf]# ll
总用量 24
-rwxrwxrwx 1 root root 0 4月 13 00:04 Avro2HBaseAndKafka.properties
-rw-rw-r--. 1 hadoop hadoop 1661 12月 18 2018 flume-conf.properties.template
-rwxr-xr-x. 1 hadoop hadoop 1455 3月 1 23:10 flume-env.ps1
-rw-rw-r--. 1 hadoop hadoop 1455 12月 18 2018 flume-env.ps1.template
-rwxr-xr-x. 1 hadoop hadoop 1593 3月 1 23:10 flume-env.sh
-rw-rw-r--. 1 hadoop hadoop 1568 12月 18 2018 flume-env.sh.template
-rwxrwxr-x. 1 hadoop hadoop 3117 3月 1 23:10 log4j.properties
[root@zhiyong3 conf]# vim Avro2HBaseAndKafka.properties
#Properties的内容在下边
[root@zhiyong3 conf]# scp ./Avro2HBaseAndKafka.properties root@zhiyong4:$PWD
Avro2HBaseAndKafka.properties 100% 1617 466.5KB/s 00:00
[root@zhiyong3 conf]#
这个Properties文件的内容:
agent1.sources = source1
agent1.channels = HBaseChannel KafkaChannel
agent1.sinks = HBaseSink kafkaSink
# Define and configure an avro
agent1.sources.source1.type = avro
agent1.sources.source1.channels = KafkaChannel HBaseChannel
# 监听所有主机
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 12345
agent1.sources.source1.selector.type = replicating
# 配置HBase的channel
agent1.channels.HBaseChannel.type = memory
agent1.channels.HBaseChannel.capacity = 10000
agent1.channels.HBaseChannel.transactionCapacity = 10000
agent1.channels.HBaseChannel.keep-alive = 5
# Define and configure a sink
agent1.sinks.HBaseSink.type = asynchbase
agent1.sinks.HBaseSink.channel = HBaseChannel
agent1.sinks.HBaseSink.table = sougoulogs
agent1.sinks.HBaseSink.serializer = org.apache.flume.sink.hbase.SougoulogsAsyncHbaseEventSerializer
agent1.sinks.HBaseSink.zookeeperQuorum = zhiyong2:2181,zhiyong3:2181,zhiyong4:2181
agent1.sinks.HBaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
agent1.sinks.HBaseSink.znodeParent = /hbase
agent1.sinks.HBaseSink.columnFamily = info
# 配置Kafka的channel
agent1.channels.KafkaChannel.type = memory
agent1.channels.KafkaChannel.capacity = 10000
agent1.channels.KafkaChannel.transactionCapacity = 10000
agent1.channels.KafkaChannel.keep-alive = 5
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = sougoulogs
agent1.sinks.kafkaSink.brokerList = zhiyong2:9092,zhiyong3:9092,zhiyong4:9092
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.kafkaSink.producer.acks = 1
agent1.sinks.kafkaSink.producer.requiredAcks = 1
agent1.sinks.kafkaSink.channel = KafkaChannel
补上就可以成功跑起来:
[root@zhiyong3 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:18:08 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:18:08 INFO source.AvroSource: Avro source source1 started.
同样的方式启动另一个Flume:
[root@zhiyong3 ~]# ssh zhiyong4
Last login: Tue Apr 12 20:39:23 2022 from zhiyong3
[root@zhiyong4 ~]# date
2022年 04月 13日 星期三 00:11:17 CST
[root@zhiyong4 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/Avro2HBaseAndKafka.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: kafkaSink: Successfully registered new MBean.
2022-04-13 00:19:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: kafkaSink started
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-04-13 00:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
2022-04-13 00:19:58 INFO source.AvroSource: Avro source source1 started.
之后启动采集数据的Flume:
[root@zhiyong2 ~]# mkdir -p /bigdataproj/data/source/logs
[root@zhiyong2 ~]# cd /bigdataproj/data/source/logs
[root@zhiyong2 logs]# touch sogou.log
[root@zhiyong2 logs]# chmod 777 ./sogou.log
[root@zhiyong2 logs]# cd
[root@zhiyong2 ~]# flume-ng agent -n agent1 -c conf -f /opt/usdp-srv/srv/udp/2.0.0.0/flume/conf/File2Avro.properties -Dflume.root.logger=INFO,console
Info: 。。。
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2022-04-13 00:25:59 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1: Building RpcClient with hostname: zhiyong3, port: 12345
2022-04-13 00:25:59 INFO sink.AvroSink: Attempting to create Avro Rpc client.
2022-04-13 00:25:59 INFO api.NettyAvroRpcClient: Using default maxIOWorkers
2022-04-13 00:25:59 INFO sink.AbstractRpcSink: Rpc sink sink1 started.
Flink直接写HBase貌似比二开Flume还简单。Flume写HBase常用于HBase1,Flume1.9才能二开HBase2,HBase3都出现很久了。就让前浪拍x在沙滩上。采集日志Tail到到Kafka的功能还比较实用。
Spark
批处理用的挺多,2018年还有人用Streaming做简单的micro-batch处理,不过这都4年过去了。。。Structured Streaming又只能保证at least once,做不到Exactly Once。。。单独批处理当然还是优先Spark。
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sougoulogs</artifactId>
<groupId>com.zhiyong</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sparkDemo</artifactId>
<!-- 指定仓库位置,依次为aliyun、cloudera、apache仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.12</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.1</spark.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 添加spark依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 可以使用Lombok的@注解-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
<!-- MySQL驱动包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 可以设置jar包的入口类(可选) -->
<!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
配置类
package com.zhiyong.sparkDemo
// 该类存储配置
object Constants {
val url = "jdbc:mysql://192.168.88.100/sougoulogs"
val userName = "root"
val passWord = "123456"
val kafkaServer = "zhiyong2:9092,zhiyong3:9092,zhiyong4:9092"
val groupId = "sougoulogs"
val offset = "earliest"
val topic = "sougoulogs"
val mysql8driver = "com.mysql.cj.jdbc.Driver"
}
定义连接池
package com.zhiyong.sparkDemo;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;
/**
* @program: sougoulogs
* @description: MySQL的连接池
* @author: zhiyong
* @create: 2022-04-06 19:46
**/
public class MySQLPool implements Serializable {
private String url ="";
private String user ="";
private String password ="";
private int maxNum = 5;// 最大同时使用的连接数
private int currentNum =0;//当前产生的连接数
private LinkedList<Connection> connections = new LinkedList<>();//可用连接
/**
* 传参用的构造方法
* @param url JDBC连接串
* @param user 用户名
* @param password 密码
*/
public void init(String url, String user, String password){
this.url=url;
this.user=user;
this.password=password;
}
/**
* 获取连接池的连接对象
* @return 返回连接对象
* @throws SQLException 抛异常
*/
public synchronized Connection getConn() throws Exception {
if (connections.isEmpty()){
addDrive();
Connection connection = DriverManager.getConnection(url, user, password);
connections.push(connection);//入栈
currentNum++;
}
return connections.poll();//弹栈
}
/**
* 加载驱动
*/
private void addDrive() throws Exception {
if (maxNum>currentNum&&(!connections.isEmpty())){
Thread.sleep(1000);//等待获取连接
addDrive();
} else {
Class.forName(Constants.mysql8driver());//加载驱动
}
}
/**
* 清除连接
* @param conn 传入连接对象
*/
private void clearConn(Connection conn){
connections.push(conn);
}
}
由于Spark的源码为Scala,实际上可以实现Java与Scala混编。
Streaming方式
package com.zhiyong.sparkDemo
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.sql.{Connection, DriverManager, Statement}
// 使用SparkStreaming:读取Kafka数据,处理后落入MySQL
object Kafka2MySQL_Streaming {
// 统计新闻浏览量
def newsCount(records: Iterator[(String, Int)]): Unit = {
var conn: Connection = null //声明SQL连接
var statement: Statement = null //生命SQL执行对象
try {
conn = DriverManager.getConnection(Constants.url, Constants.userName, Constants.passWord) //获取连接对象
records.foreach(data => {
val name = data._1.replaceAll("[\\[\\]]", "")
val count = data._2
val sql1 = "select 1 from newscount where name = '" + name + "'"//用于判断是否已经有这条数据
val sql2 = "update newscount set count = " + count + " where name = '" + name + "'"
val sql3 = "insert into newscount(name,count) values('" + name + "'," + count + ")"
statement = conn.createStatement()
val resultSet = statement.executeQuery(sql1)
if (resultSet.next()) {
statement.executeUpdate(sql2) //有数据就更新
} else {
statement.execute(sql3) //没有数据就插入
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally { //最后要关流
if (null != statement) {
statement.close()
}
if (null != conn) {
conn.close()
}
}
}
// 统计时段浏览量
def periodCount(records: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var statement: Statement = null
try {
conn = DriverManager.getConnection(Constants.url, Constants.userName, Constants.passWord)
records.foreach(data => {
val logtime = data._1
val count = data._2
val sql1 = "select 1 from periodcount where logtime = '" + logtime + "'"
val sql2 = "update periodcount set count = " + count + " where logtime='" + logtime + "'"
val sql3 = "insert into periodcount(logtime,count) values('" + logtime + "'," + count + ")"
statement = conn.createStatement()
val resultSet = statement.executeQuery(sql1)
if (resultSet.next()) {
statement.execute(sql2)
} else {
statement.execute(sql3)
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != statement) {
statement.close()
}
if (null != conn) {
conn.close()
}
}
}
// 主方法
def main(args: Array[String]): Unit = {
print("start:Kafka2MySQL_Streaming")
val sparkConf = new SparkConf().setAppName("sougoulogs_Streaming")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1)) // 每秒做一次micro-batch
// 统一配置Kafka的参数
val kafkaParams = Map[String, Object] {
"bootstrap.servers" -> Constants.kafkaServer
"key.deserializer" -> classOf[StringDeserializer]
"value.deserializer" -> classOf[StringDeserializer]
"group.id" -> Constants.groupId
"auto.offset.reset" -> Constants.offset
"enable.auto.commit" -> java.lang.Boolean.TRUE
}
val topics = Array(Constants.topic)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// val lines = stream.map(record => record.value)
// 去除脏数据
val cleanStream = stream.map(_.value())
.map(_.split(","))
.filter(_.length == 6)
// 获取新闻浏览量指标的流
val newsCountDStream = cleanStream
.map(x => (x(2), 1))
.reduceByKey(_ + _)
// 获取时段浏览量的流
val periodCountDStream = cleanStream
.map(x => (x(0), 1))
.reduceByKey(_ + _)
// 处理流
newsCountDStream.foreachRDD(rdd => {
print("每个分区都执行读取Kafka数据计算新闻话题浏览量并将结果写入MySQL")
rdd.foreachPartition(newsCount)
})
periodCountDStream.foreachRDD(rdd => {
print("每个分区都执行读取Kafka数据计算时段浏览量并将结果写入MySQL")
rdd.foreachPartition(periodCount)
})
//启动流
ssc.start()
// Spark的Streaming(DStream)必须有这句,否则会报错
ssc.awaitTermination()
}
}
Structured Streaming方式
package com.zhiyong.sparkDemo
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
// 使用SparkStructuredStreaming:读取Kafka数据,处理后写入MySQL
object Kafka2MySQL_StructuredStreaming {
//
// 主方法
def main(args: Array[String]): Unit = {
print("start:Kafka2MySQL_StructuredStreaming")
val spark = SparkSession.builder()
.appName("")
.master("local[2]")
.getOrCreate()
import spark.implicits._//导入隐式推测防止报错
val cleanSs = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Constants.kafkaServer)
.option("subscribe", Constants.topic)
.load()
.selectExpr("cast(key as string)", "cast(value as string)") //Java版Spark可以传String[]
.as[(String, String)]
.map(_._2)
.map(_.split(","))
.filter(6 == _.length)
val newsCountsSs = cleanSs.map(_ (2))
.groupBy("value")
.count().toDF("name", "count")
val periodCountsSs = cleanSs.map(_ (0))
.groupBy("value")
.count()
.toDF("logtime", "count")
val sink1 = new Sink1(Constants.url, Constants.userName, Constants.passWord)
val sink2 = new Sink2(Constants.url, Constants.userName, Constants.passWord)
val query1 = newsCountsSs.writeStream
.foreach(sink1)
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()
val query2 = periodCountsSs.writeStream
.foreach(sink2)
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()
query1.awaitTermination()
query2.awaitTermination()
}
}
对应的2个Sink:
package com.zhiyong.sparkDemo
import org.apache.spark.sql.{ForeachWriter, Row}
import java.sql.{Connection, ResultSet, Statement}
/**
* 自定义Sink,将数据写入MySQL
*
* @param url 连接串
* @param user 用户名
* @param password 密码
*/
class Sink1(url: String, user: String, password: String) extends ForeachWriter[Row] {
var statement: Statement = _
var resultset: ResultSet = _
var conn: Connection = _
/**
* 初始化连接,从连接池获取连接对象
*
* @param partitionId
* @param epochId
* @return
*/
override def open(partitionId: Long, epochId: Long): Boolean = {
Class.forName(Constants.mysql8driver)
val pool = new MySQLPool()
pool.init(url, user, password)
conn = pool.getConn
statement = conn.createStatement()
true
}
override def process(value: Row): Unit = {
val name = value.getAs[String]("name").replace("[\\[\\]]", "")
val count = value.getAs[Long]("count").asInstanceOf[Int]
val sql1 = "select 1 from newscount where name = '" + name + "'"
val sql2 = "insert into newscount(name,count) values('" + name + "'," + count + ")"
val sql3 = "update newscount set count = " + count + " where name = '" + name + "'"
try {
resultset = statement.executeQuery(sql1)
if (resultset.next()) {
statement.execute(sql3)
} else {
statement.execute(sql2)
}
} catch {
case e: Exception => print("出现异常“" + e.getMessage)
}
}
override def close(errorOrNull: Throwable): Unit = {
if (null != statement) {
statement.close()
}
if (null != conn) {
conn.close()
}
}
}
另一个半差不差:
package com.zhiyong.sparkDemo
import org.apache.spark.sql.{ForeachWriter, Row}
import java.sql.{Connection, ResultSet, Statement}
class Sink2(url: String, user: String, password: String) extends ForeachWriter[Row]{
var statement: Statement = _
var resultset: ResultSet = _
var conn: Connection = _
/**
* 初始化连接,从连接池获取连接对象
*
* @param partitionId
* @param epochId
* @return
*/
override def open(partitionId: Long, epochId: Long): Boolean = {
Class.forName(Constants.mysql8driver)
val pool = new MySQLPool()
pool.init(url, user, password)
conn = pool.getConn
statement = conn.createStatement()
true
}
override def process(value: Row): Unit = {
val logtime = value.getAs[String]("logtime")
val count = value.getAs[Long]("count").asInstanceOf[Int]
val sql1 = "select 1 from periodcount where logtime = '" + logtime + "'"
val sql2 = "insert into periodcount(logtime,count) values('" + logtime + "'," + count + ")"
val sql3 = "update periodcount set count = " + count + " where logtime = '" + logtime + "'"
try {
resultset = statement.executeQuery(sql1)
if (resultset.next()) {
statement.execute(sql3)
} else {
statement.execute(sql2)
}
} catch {
case e: Exception => print("出现异常“" + e.getMessage)
}
}
override def close(errorOrNull: Throwable): Unit = {
if (null != statement) {
statement.close()
}
if (null != conn) {
conn.close()
}
}
}
追加数据的方法
package com.zhiyong.sparkDemo;
import java.io.*;
/**
* @program: sougoulogs
* @description: 模拟数据生成
* @author: zhiyong
* @create: 2022-04-04 23:43
**/
public class MockData {
// 主方法
public static void main(String[] args) {
String inputPath = args[0];
String outputPath = args[1];
String interval = args[2];
System.out.println("读数路径:" + interval);
System.out.println("写log路径“" + outputPath);
System.out.println("产生数据间隔:" + interval + "ms");
try {
mock(inputPath, outputPath, interval);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 模拟生成数据
*
* @param inputPath 读取文件的路径
* @param outputPath 写入数据的路径
* @param interval mock数据的时间间隔
*/
private static void mock(String inputPath, String outputPath, String interval) throws IOException {
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String line = null;
try {
int counter = 1;//记录行号
fis = new FileInputStream(inputPath);
isr = new InputStreamReader(fis, "GBK");
br = new BufferedReader(isr);
while (null != (line = br.readLine())) {
System.out.println("产生第" + counter + "条数据:" + line);
wrieData(outputPath, line);
counter++;
Thread.sleep(Long.parseLong(interval));
}
} catch (Exception e) {
e.printStackTrace();
} finally { // 最后要关流,防止文件持续占用
if (null != br) {
br.close();
}
if (null != isr) {
isr.close();
}
if (null != fis) {
fis.close();
}
}
}
/**
* 写数据
*
* @param outputPath 输出路径
* @param line 每行数据
*/
private static void wrieData(String outputPath, String line) throws IOException {
FileOutputStream fos = null;
OutputStreamWriter osr = null;
BufferedWriter bw = null;
try {
fos = new FileOutputStream(outputPath,true);//允许追加
osr = new OutputStreamWriter(fos);
bw = new BufferedWriter(osr);
bw.write(line);
bw.write("\n");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null!=bw){
bw.close();
}
if (null!=osr){
osr.close();
}
if(null!=fos){
fos.close();
}
}
}
}
为了使用Flume的Tail采集数据,需要写个自动的方法来替代手工echo并重定向到文件。
其实在Flink里使用各种富函数,重新定义方法就很方便,不需要这么麻烦。Flume、Sqoop这类工具,可能只是方便不会写Java的SQL Boy们吧。不过这年头已经有阿里云DataPhin,华为云FusionInsight之类的成熟产品,还有kyuubi这类可以直接配置就能跑数据集成的开源组件,大数据的门槛属实低了好多。