1、集群部署
1.1、JDK
cat >> /etc/ansible/playbook/install-jdk.yml << EOF
- hosts: cluster
remote_user: root
tasks:
- name: 分发JDK
copy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz dest=/opt/software
- name: 解压JDK
shell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java
- name: 配置环境变量
blockinfile:
path: /etc/profile
block: |
export JAVA_HOME=/usr/local/java/jdk-21.0.5
export PATH=$JAVA_HOME/bin:$PATH
marker: "# {mark} JDK"
EOF
ansible-playbook install-jdk.yml
1.2、基础环境配置
cat >> /etc/ansible/playbook/modify_env.yml << EOF
- hosts: cluster
remote_user: root
tasks:
#设置主机名
- name: set hostname
shell: hostnamectl set-hostname {{hostname}}
- name: distribute hosts to nodes
copy:
src: /etc/hosts
dest: /etc
#关闭防火墙
- name: stop firewalld
service:
name: firewalld
state: stopped
enabled: no
#关闭selinux
- name: setenforce 0
shell: "setenforce 0"
failed_when: false
- name: set selinux disabled
replace:
path: /etc/selinux/config
regexp: '^SELINUX=enforcing'
replace: 'SELINUX=disabled'
- name: 设置最大文件句柄数
lineinfile:
path: /etc/security/limits.conf
insertafter: '### AFTER THIS LINE'
line: "{{ item }}"
state: present
with_items:
- '* soft noproc 65536'
- '* hard noproc 65536'
- '* soft nofile 131072'
- '* hard nofile 131072'
- '* hard memlock unlimited'
- '* soft memlock unlimited'
- name: 关闭 THP
lineinfile:
path: /etc/rc.local
line: |
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
- name: Change permissions
shell: chmod +x /etc/rc.d/rc.local
- name: 关闭swap
replace:
path: /etc/fstab
regexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'
replace: '#\1\2\3swap\4'
backup: yes
- name: Disable SWAP
shell: |
swapoff -a
EOF
ansible-playbook modify_env.yml
1.3、免密
cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF
- hosts: cluster
gather_facts: no
remote_user: root
tasks:
- name: 免密登录
authorized_key:
user: root
key: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"
state: present
EOF
ansible-playbook ssh-pubkey.yml
1.4、部署Kafka
cat >> /etc/ansible/playbook/install-kafka.yml << EOF
- hosts: cluster
remote_user: root
tasks:
- name: 分发 kafka
copy: src=/opt/software/kafka_2.13-4.0.0.tgz dest=/opt/software
- name: 解压 kafka
shell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/
- name: rename to kafka
shell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka
- name: 赋权starrocks
shell: chown -R starrocks:starrocks /opt/module/kafka
- name: 配置环境变量
blockinfile:
path: /home/starrocks/.bashrc
block: |
# kafka
export KAFKA_HOME=/opt/module/kafka
export PATH=$KAFKA_HOME/bin:$PATH
marker: "# {mark} KAFKA"
EOF
ansible-playbook install-kafka.yml
1.5、修改Kafka配置
vi /opt/module/kafka/config/server.properties
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
node.id=1
# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=kylin-01:9093
## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# socket请求的最大数值,防止OOM,大于message.max.bytes
socket.request.max.bytes=536870912
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# 最大消息大小512MB
message.max.bytes=536870912
# 最大请求字节大小1GB
max.request.size=1073741824
# replica每次获取数据的最大大小
replica.fetch.max.bytes=536870912
# leader复制时的socket缓存大小
replica.socket.receive.buffer.bytes=536870912
# 副本连接可靠性
replica.socket.timeout.ms=60000
replica.lag.time.max.ms=30000
1.6、修改其它节点配置
node.id=2
controller.quorum.bootstrap.servers=kylin-02:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
node.id=3
controller.quorum.bootstrap.servers=kylin-03:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
1.7、修改日志目录
vi bin/kafka-run-class.sh
# Log directory to use
# 修改日志目录
LOG_DIR=/data/kafka/log
if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"
fi
1.8、初始化集群
生成存储目录唯一ID
bin/kafka-storage.sh random-uuid
格式化 kafka 存储目录(每个节点都需要执行)
bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties
1.9、启动集群
每个节点都执行启动服务命令
bin/kafka-server-start.sh -daemon config/server.properties
查看服务日志
tail -f /data/kafka/log/server.log
查看 kafka 节点状态
bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092
查看 Kafka 的端口监听状态
netstat -tuln | grep 9092
使用 ps 命令查看 Kafka 进程
ps aux | grep kafka
使用 top 或 htop 查看 Kafka 进程
top -p <PID>
2、操作命令
2.1、基础操作命令
## 查看主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --list
## 查看主题明细
kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>
## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2
## 删除主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>
## 查看消费者列表--list
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list
## 查看指定消费组详情
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>
## 删除特定group
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>
## 打开一个生产者
kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>
## 打开一个消费者
kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --consumer-property group.id=<group-id> --from-beginning
## 查看所有消费组详情--all-groups
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups
查询消费者成员信息--members
## 所有消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members
## 指定消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>
## 修改到最新offset
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute
## 预演-重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run
## 重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute
## 获取指定时间戳的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000
## topic扩容
kafka-topic.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16
## 指定时间范围获取数据
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}'
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}'
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 | grep '1027729757'
2.2、积压监控
创建Doris表
create table kafka_topic_monitor(
exec_id bigint comment 'id',
begin_day_id bigint comment '当前日期',
topic string comment 'Kafka topic',
groups string comment 'Kafka消费者组',
offset_lag bigint comment 'topic积压总量',
exec_time datetime comment '执行时间',
in_db_time datetime comment '入库时间',
interval_time bigint comment '延迟时间/分钟'
)
unique key (exec_id,begin_day_id)
comment 'Kafka监控'
distributed by hash(exec_id,begin_day_id) buckets 3;
监控脚本
#! /bin/bash
TOPIC="pay_total_random"
GROUP_ID="etl-dw"
BOOTSTRAP_SERVER="kylin-01:9092,kylin-02:9092,kylin-03:9092"
LAG=100000
TABLE_NAME="b_user_event_attr_12"
TOTAL=100
# 获取Kafka当前LAG
sh /data/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --group $GROUP_ID --describe | grep $TOPIC | awk -F " " '{print $6}' > ./kafka-lag.txt
# 初始化积压LAG总和为0
TOTAL_LAG=0
while read line
do
TOTAL_LAG=$((TOTAL_LAG+ line))
done < ./kafka-lag.txt
# 判断消息挤压情况,是否重启程序
if [ $TOTAL_LAG -gt $LAG ]; then
echo "$(date +"%Y-%m-%d %H:%M:%S") Kafka Topic " $TOPIC " IN GROUP " $GROUP_ID "消息积压总量为:" $TOTAL_LAG " 大于 " $LAG ",停止DW模块"
sh /etl/etl-dw/shell/stop-dw.sh
# 判断是否执行成功
if [ $? -eq 0 ]; then
nohup sh /etl/etl-dw/shell/start-dw.sh &
echo "$(date +"%Y-%m-%d %H:%M:%S") 启动DW模块完成"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") 停止DW模块失败"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") Kafka Topic " $TOPIC " IN GROUP " $GROUP_ID "消息积压总量为:" $TOTAL_LAG " 小于 " $LAG ",停止DW模块"
fi
# 查询近3小时Doris入库情况
query=$(cat <<SQL
select
max(minute_time) minute_time
from
(
select
date_format(from_unixtime(begin_date,'%Y%m%d%H%i')) minute_time
,count(*) as total
from $TABLE_NAME
where begin_day_id = cast(date_format(now(),'%Y%m%d') as bigint)
and from_unixtime(begin_date) >= date_sub(now(),interval 3 hour)
group by date_format(from_unixtime(begin_date),'%Y%m%d%H%i')
having count(*) > $TOTAL
) a;
SQL
)
BEGIN_DATE=$(mysql -hkylin-01 -uroot -p123456 -P19030 dw -sN -e "$query")
# 将Kafka延迟情况写入Doris
insert=$(cat <<SQL
insert into kafka_topic_monitor
select
date_format(now(),'%Y%m%%d%H%i%s')
,date_format(now(),'%Y%m%d')
,"$TOPIC"
,"$GROUP_ID"
,"$TOTAL_LAG"
,now()
,str_to_date($BEGIN_DATE,'%Y%m%d%H%i%')
,minute_diff(now(),str_to_date($BEGIN_DATE,'%Y%m%d%H%i%'))
;
SQL
)
# echo $insert
mysql -hkylin-01 -uroot -p123456 -P19030 dw -sN -e "$insert"
配置定时任务
# Kafka监控,每小时执行一次
0 * * * * sh /etl/etl-dw/shell/check-kafka-lag.sh >> /etl/logs/etl-dw/check-kafka-lag.log 2>&1
2.3、开机自启动
配置开机自启参数
vi /etc/systemd/system/kafka.service
[Unit]
Description=Kafka Service
After=network.target
[Service]
Environment="JAVA_HOME=/usr/local/java/jdk-21.0.5"
Type=forking
ExecStart=/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
ExecStop=/opt/module/kafka/bin/kafka-server-stop.sh
PrivateTmp=true
# 普通用户名
User=starrocks
Group=starrocks
[Install]
WantedBy=multi-user.target
启动并执行服务管理命令
./kafka-server-stop.sh -daemon ../config/server.properties #关闭原用命令启动的kafka服务
systemctl daemon-reload #重新加载
systemctl start kafka.service #启动kafka
systemctl status kafka.service #查看kafka进程状态
systemctl enable kafka.service #设置开机自启
验证kafka开启自启
systemctl is-enabled kafka.service
返回了enabled说明开机自启功能配置成功了,但是需要严谨些,用以下命令模拟真正的服务器重启场景:reboot
3、调优
3.1、JVM 参数调优
1、默认启动的 Broker 进程只会使用 1G 内存,在实际使用中可能会导致进程频繁 GC,影响 Kafka 集群的性能和稳定性。要判断是否需要调整内存分配的大小,首先通过 jps 命令查看 kafka 进程 id
jps -m
2、然后通过如下命令根据进程 id 查看 kafka 进程 GC 情况(命令最后的 1000 表示每秒刷新一次)
jstat -gcutil <pid> 1000
或
jstat -gc <pid> 1000 10
3、主要看 YGC,YGCT,FGC,FGCT 这几个参数:
- 在JVM内存管理中,把Heap空间分为Young/Old两个分区,Young分区又包括一个Eden和两个Survivor分区。Eden区代表着对象的"诞生地",是新创建对象的初始存放区域。当Eden区满了之后,就会触发Minor GC,JVM将Eden区存活对象拷贝到Survivor 0(S0),然后S0—>S1,始终保持一个Survivor区为全空状态。如果一个对象拷贝多次还存活,就会进入Old Gen分区,当Old Gen分区没有足够空间时,GC会停止所有程序线程,进行Full GC。所有线程被暂停叫:Stop The World(STW),就是天塌了,所有Full GC对任务性能影响很大。
- Minor GC 主要针对新生代(Eden+S0+S1),通过JVM参数(-XX:MaxGCPauseMillis)来控制Minor
GC的频率;Minor GC的标记-复制-清空过程,判断对象是否存活,用的是可达性算法。 - S0/S1:Survivor区使用率
- E:Eden区使用率
- O:老年代使用率
- M:元空间使用率
- CCS:压缩类空间使用率
- YGC:young gc 发生的次数
- YGCT:young gc 消耗的时间
- FGC:full gc 发生的次数
- FGCT:full gc 消耗的时间
- 查看Eden区容量(EC列)及使用量(EU列)
4、发现 YGC 很频繁,或者 FGC 很频繁,就说明内存分配的少了。可以修改 kafka-server-start.sh 文件
5、修改其中的 KAFKA_HEAP_OPTS 参数,比如下面我们给给 kafka 分配了 10G 内存。如果服务器内存充裕,也可以分配 16G。
# JDK8
# 测试环境
export KAFKA_HEAP_OPTS="-Xmx2g -Xms2g -Xmn1G -XX:MetaspaceSize=256m -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=40 -XX:G1HeapRegionSize=4M"
# 生产环境
# 新生代分配12GB,Eden区占10GB(12GB × 5/6),每个Survivor区占1GB
export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn12G -XX:SurvivorRatio=5 -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=40 -XX:G1HeapRegionSize=16M"
- UseG1GC:G1适合大内存低延迟场景,是Kafka生产环境的推荐选择。
- MaxGCPauseMillis=50:目标最大GC停顿时间设为50ms,适用于对延迟敏感的场景(如实时数据处理)。若GC日志显示实际停顿时间频繁超过此阈值,需结合G1HeapRegionSize调整或增加堆内存。
- InitiatingHeapOccupancyPercent=40:堆占用率40%时启动并发GC周期,此值偏低可能导致GC频率过高。建议提升至40~45%以平衡GC频率与吞吐量。
- G1HeapRegionSize=16M:Region大小设为16MB适用于10GB堆内存,无需调整;若堆内存超过32GB,可设为32M以降低GC复杂度。
- -XX:SurvivorRatio=3 表示Eden区与单个Survivor区的容量比例为N:1; Eden:S0:S1 = 3:1:1,每个Survivor占新生代的1/(3+1+1)=20%
6、堆内存
生成堆内存报告
jmap -heap <pid>
关键输出字段解析
Heap Configuration:
MinHeapFreeRatio = 40 # 堆空闲比例下限(触发扩容阈值)
MaxHeapFreeRatio = 70 # 堆空闲比例上限(触发缩容阈值)
MaxHeapSize = 17179869184 (16384.0MB) # 堆最大容量(-Xmx值)
New Generation (Eden + Survivors):
NewSize = 1073741824 (1024.0MB) # 新生代初始大小(-Xmn或动态分配)
MaxNewSize = 1073741824 (1024.0MB) # 新生代最大容量
Old Generation:
OldSize = 16106127360 (15360.0MB) # 老年代当前容量
Heap Usage:
New Generation (Eden + Survivors):
Eden Space: capacity = 8589934592 (8192.0MB) # Eden区容量
used = 2147483648 (2048.0MB) # 已使用量
Survivor Space:
capacity = 1073741824 (1024.0MB) # 单个Survivor区容量
used = 0 (0.0MB) # 使用量
Old Generation:
capacity = 16106127360 (15360.0MB)
used = 5368709120 (5120.0MB)