Kafka 4.0入门到熟练

1、集群部署

1.1、JDK

cat >> /etc/ansible/playbook/install-jdk.yml << EOF 
- hosts: cluster
  remote_user: root
  tasks:
  - name: 分发JDK
    copy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz  dest=/opt/software
  - name: 解压JDK
    shell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java
  - name: 配置环境变量
    blockinfile:
      path: /etc/profile
      block: |
        export JAVA_HOME=/usr/local/java/jdk-21.0.5
        export PATH=$JAVA_HOME/bin:$PATH
      marker: "# {mark} JDK"
EOF
ansible-playbook install-jdk.yml

1.2、基础环境配置

cat >> /etc/ansible/playbook/modify_env.yml << EOF 
- hosts: cluster
  remote_user: root
  tasks:
    #设置主机名
    - name: set hostname
      shell: hostnamectl set-hostname {{hostname}}

    - name: distribute hosts to nodes
      copy:
        src: /etc/hosts
        dest: /etc

    #关闭防火墙
    - name: stop firewalld
      service:
        name: firewalld
        state: stopped
        enabled: no

    #关闭selinux
    - name: setenforce 0
      shell: "setenforce 0"
      failed_when: false

    - name: set selinux disabled
      replace:
        path: /etc/selinux/config
        regexp: '^SELINUX=enforcing'
        replace: 'SELINUX=disabled'

    - name: 设置最大文件句柄数
      lineinfile:
        path: /etc/security/limits.conf
        insertafter: '### AFTER THIS LINE'
        line: "{{ item }}"
        state: present
      with_items:
        - '*       soft   noproc  65536'
        - '*       hard   noproc  65536'
        - '*       soft   nofile  131072'
        - '*       hard   nofile  131072'
        - '*       hard memlock unlimited'
        - '*       soft memlock unlimited'

    - name: 关闭 THP
      lineinfile:
        path: /etc/rc.local
        line: |
          echo never > /sys/kernel/mm/transparent_hugepage/enabled
          echo never > /sys/kernel/mm/transparent_hugepage/defrag

    - name: Change permissions
      shell: chmod +x /etc/rc.d/rc.local

    - name: 关闭swap
      replace:
        path: /etc/fstab
        regexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'
        replace: '#\1\2\3swap\4'
        backup: yes
    - name: Disable SWAP
      shell: |
        swapoff -a
EOF
ansible-playbook modify_env.yml

1.3、免密

cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF 
- hosts: cluster
  gather_facts: no
  remote_user: root
  tasks:
  - name: 免密登录
    authorized_key:
      user: root
      key: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"
      state: present
EOF
ansible-playbook ssh-pubkey.yml

1.4、部署Kafka

cat >> /etc/ansible/playbook/install-kafka.yml << EOF 
- hosts: cluster
  remote_user: root
  tasks:
  - name: 分发 kafka
    copy: src=/opt/software/kafka_2.13-4.0.0.tgz  dest=/opt/software
  - name: 解压 kafka
    shell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/
  - name: rename to kafka
    shell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka
  - name: 赋权starrocks
    shell: chown -R starrocks:starrocks /opt/module/kafka
  - name: 配置环境变量
    blockinfile:
      path: /home/starrocks/.bashrc
      block: |
        # kafka
        export KAFKA_HOME=/opt/module/kafka
        export PATH=$KAFKA_HOME/bin:$PATH
      marker: "# {mark} KAFKA"

EOF
ansible-playbook install-kafka.yml

1.5、修改Kafka配置

vi /opt/module/kafka/config/server.properties

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
node.id=1

# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=kylin-01:9093

## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093

listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093

# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# socket请求的最大数值,防止OOM,大于message.max.bytes
socket.request.max.bytes=536870912

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# 最大消息大小512MB
message.max.bytes=536870912
# 最大请求字节大小1GB
max.request.size=1073741824
# replica每次获取数据的最大大小
replica.fetch.max.bytes=536870912
# leader复制时的socket缓存大小
replica.socket.receive.buffer.bytes=536870912
# 副本连接可靠性
replica.socket.timeout.ms=60000
replica.lag.time.max.ms=30000

1.6、修改其它节点配置

node.id=2
controller.quorum.bootstrap.servers=kylin-02:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
node.id=3
controller.quorum.bootstrap.servers=kylin-03:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093

1.7、修改日志目录

vi bin/kafka-run-class.sh

# Log directory to use
# 修改日志目录
LOG_DIR=/data/kafka/log

if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi

1.8、初始化集群

生成存储目录唯一ID

bin/kafka-storage.sh random-uuid

格式化 kafka 存储目录(每个节点都需要执行)

bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties

1.9、启动集群

每个节点都执行启动服务命令

bin/kafka-server-start.sh -daemon config/server.properties

查看服务日志

 tail -f /data/kafka/log/server.log

查看 kafka 节点状态

bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092

查看 Kafka 的端口监听状态

netstat -tuln | grep 9092

使用 ps 命令查看 Kafka 进程

ps aux | grep kafka

使用 top 或 htop 查看 Kafka 进程

top -p <PID>

2、操作命令

2.1、基础操作命令

## 查看主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --list
 
## 查看主题明细
kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>
 
## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2
 
## 删除主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>
 
## 查看消费者列表--list
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list
 
## 查看指定消费组详情
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>
 
## 删除特定group
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>
 
## 打开一个生产者
kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>
 
## 打开一个消费者
kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>  --consumer-property group.id=<group-id>  --from-beginning 
 
## 查看所有消费组详情--all-groups
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups
 
查询消费者成员信息--members
 
## 所有消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members
 
## 指定消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>
 
## 修改到最新offset
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute

## 预演-重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run

## 重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute

## 获取指定时间戳的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000

## topic扩容
kafka-topic.sh  --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16

## 指定时间范围获取数据
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}' 
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}' 
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 | grep '1027729757'

2.2、积压监控

创建Doris表

create table kafka_topic_monitor(
exec_id bigint comment 'id',
begin_day_id bigint comment '当前日期',
topic string comment 'Kafka topic',
groups string comment 'Kafka消费者组',
offset_lag bigint comment 'topic积压总量',
exec_time datetime comment '执行时间',
in_db_time datetime comment '入库时间',
interval_time bigint comment '延迟时间/分钟'
)
unique key (exec_id,begin_day_id)
comment 'Kafka监控' 
distributed by hash(exec_id,begin_day_id) buckets 3;

监控脚本

#! /bin/bash
TOPIC="pay_total_random"
GROUP_ID="etl-dw"
BOOTSTRAP_SERVER="kylin-01:9092,kylin-02:9092,kylin-03:9092"
LAG=100000
TABLE_NAME="b_user_event_attr_12"
TOTAL=100
# 获取Kafka当前LAG
sh /data/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --group $GROUP_ID --describe | grep $TOPIC | awk -F " " '{print $6}' > ./kafka-lag.txt
# 初始化积压LAG总和为0
TOTAL_LAG=0
while read line 
do
	TOTAL_LAG=$((TOTAL_LAG+ line))
done < ./kafka-lag.txt
# 判断消息挤压情况,是否重启程序
if [ $TOTAL_LAG -gt $LAG ]; then 
	echo "$(date +"%Y-%m-%d %H:%M:%S") Kafka Topic " $TOPIC " IN GROUP " $GROUP_ID "消息积压总量为:" $TOTAL_LAG " 大于 " $LAG ",停止DW模块"
	sh /etl/etl-dw/shell/stop-dw.sh
	# 判断是否执行成功
	if [ $? -eq 0 ]; then 
		nohup sh /etl/etl-dw/shell/start-dw.sh &
		echo "$(date +"%Y-%m-%d %H:%M:%S") 启动DW模块完成"
	else 
		echo "$(date +"%Y-%m-%d %H:%M:%S") 停止DW模块失败"
else
	echo "$(date +"%Y-%m-%d %H:%M:%S") Kafka Topic " $TOPIC " IN GROUP " $GROUP_ID "消息积压总量为:" $TOTAL_LAG " 小于 " $LAG ",停止DW模块"
fi

# 查询近3小时Doris入库情况
query=$(cat <<SQL
select 
max(minute_time) minute_time
from 
(
select 
date_format(from_unixtime(begin_date,'%Y%m%d%H%i')) minute_time
,count(*) as total
from $TABLE_NAME 
where begin_day_id = cast(date_format(now(),'%Y%m%d') as bigint) 
and from_unixtime(begin_date) >= date_sub(now(),interval 3 hour)
group by date_format(from_unixtime(begin_date),'%Y%m%d%H%i')
having count(*) > $TOTAL
) a;
SQL
)

BEGIN_DATE=$(mysql -hkylin-01 -uroot -p123456 -P19030 dw -sN -e "$query")

# 将Kafka延迟情况写入Doris
insert=$(cat <<SQL
insert into kafka_topic_monitor
select 
  date_format(now(),'%Y%m%%d%H%i%s')
 ,date_format(now(),'%Y%m%d')
 ,"$TOPIC"
 ,"$GROUP_ID"
 ,"$TOTAL_LAG"
 ,now()
 ,str_to_date($BEGIN_DATE,'%Y%m%d%H%i%')
 ,minute_diff(now(),str_to_date($BEGIN_DATE,'%Y%m%d%H%i%')) 
 ;
SQL
)

# echo $insert
mysql -hkylin-01 -uroot -p123456 -P19030 dw -sN -e "$insert"

配置定时任务

# Kafka监控,每小时执行一次
0 * * * * sh /etl/etl-dw/shell/check-kafka-lag.sh >> /etl/logs/etl-dw/check-kafka-lag.log 2>&1

2.3、开机自启动

配置开机自启参数
vi /etc/systemd/system/kafka.service

[Unit]
Description=Kafka Service
After=network.target

[Service]
Environment="JAVA_HOME=/usr/local/java/jdk-21.0.5"
Type=forking
ExecStart=/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
ExecStop=/opt/module/kafka/bin/kafka-server-stop.sh
PrivateTmp=true
# 普通用户名
User=starrocks
Group=starrocks

[Install]
WantedBy=multi-user.target

启动并执行服务管理命令

./kafka-server-stop.sh  -daemon ../config/server.properties  #关闭原用命令启动的kafka服务
 systemctl daemon-reload                                 #重新加载
 systemctl start kafka.service                           #启动kafka
 systemctl status kafka.service                          #查看kafka进程状态
 systemctl enable kafka.service                          #设置开机自启

验证kafka开启自启

systemctl is-enabled kafka.service

返回了enabled说明开机自启功能配置成功了,但是需要严谨些,用以下命令模拟真正的服务器重启场景:reboot

3、调优

3.1、JVM 参数调优

1、默认启动的 Broker 进程只会使用 1G 内存,在实际使用中可能会导致进程频繁 GC,影响 Kafka 集群的性能和稳定性。要判断是否需要调整内存分配的大小,首先通过 jps 命令查看 kafka 进程 id

jps -m

2、然后通过如下命令根据进程 id 查看 kafka 进程 GC 情况(命令最后的 1000 表示每秒刷新一次)

jstat -gcutil <pid> 1000
或
jstat -gc <pid> 1000 10

3、主要看 YGC,YGCT,FGC,FGCT 这几个参数:

  • 在JVM内存管理中,把Heap空间分为Young/Old两个分区,Young分区又包括一个Eden和两个Survivor分区。Eden区代表着对象的"诞生地",是新创建对象的初始存放区域。当Eden区满了之后,就会触发Minor GC,JVM将Eden区存活对象拷贝到Survivor 0(S0),然后S0—>S1,始终保持一个Survivor区为全空状态。如果一个对象拷贝多次还存活,就会进入Old Gen分区,当Old Gen分区没有足够空间时,GC会停止所有程序线程,进行Full GC。所有线程被暂停叫:Stop The World(STW),就是天塌了,所有Full GC对任务性能影响很大。
  • Minor GC 主要针对新生代(Eden+S0+S1),通过JVM参数(-XX:MaxGCPauseMillis)来控制Minor
    GC的频率;Minor GC的标记-复制-清空过程,判断对象是否存活,用的是可达性算法。
  • S0/S1:Survivor区使用率
  • E:Eden区使用率
  • O:老年代使用率
  • M:元空间使用率
  • CCS:压缩类空间使用率
  • YGC:young gc 发生的次数
  • YGCT:young gc 消耗的时间
  • FGC:full gc 发生的次数
  • FGCT:full gc 消耗的时间
  • 查看Eden区容量(EC列)及使用量(EU列)

在这里插入图片描述

4、发现 YGC 很频繁,或者 FGC 很频繁,就说明内存分配的少了。可以修改 kafka-server-start.sh 文件

5、修改其中的 KAFKA_HEAP_OPTS 参数,比如下面我们给给 kafka 分配了 10G 内存。如果服务器内存充裕,也可以分配 16G。

# JDK8
# 测试环境
export KAFKA_HEAP_OPTS="-Xmx2g -Xms2g -Xmn1G -XX:MetaspaceSize=256m -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=40 -XX:G1HeapRegionSize=4M"

# 生产环境
# 新生代分配12GB,Eden区占10GB(12GB × 5/6),每个Survivor区占1GB‌
export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn12G -XX:SurvivorRatio=5 -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=40 -XX:G1HeapRegionSize=16M"

  • ‌UseG1GC‌:G1适合大内存低延迟场景,是Kafka生产环境的推荐选择‌。
  • ‌MaxGCPauseMillis=50‌:目标最大GC停顿时间设为50ms,适用于对延迟敏感的场景(如实时数据处理)。若GC日志显示实际停顿时间频繁超过此阈值,需结合G1HeapRegionSize调整或增加堆内存‌。
  • ‌InitiatingHeapOccupancyPercent=40:堆占用率40%时启动并发GC周期,此值偏低可能导致GC频率过高。建议提升至40~45%以平衡GC频率与吞吐量‌。
  • ‌G1HeapRegionSize=16M‌:Region大小设为16MB适用于10GB堆内存,无需调整;若堆内存超过32GB,可设为32M以降低GC复杂度‌。
  • -XX:SurvivorRatio=3 表示Eden区与单个Survivor区的容量比例为N:1; Eden:S0:S1 = 3:1:1,每个Survivor占新生代的1/(3+1+1)=20%

6、堆内存
生成堆内存报告

jmap -heap <pid>

关键输出字段解析

Heap Configuration:  
   MinHeapFreeRatio         = 40      # 堆空闲比例下限(触发扩容阈值)  
   MaxHeapFreeRatio         = 70      # 堆空闲比例上限(触发缩容阈值)  
   MaxHeapSize              = 17179869184 (16384.0MB)  # 堆最大容量(-Xmx值)  
   New Generation (Eden + Survivors):  
      NewSize               = 1073741824 (1024.0MB)    # 新生代初始大小(-Xmn或动态分配)  
      MaxNewSize            = 1073741824 (1024.0MB)    # 新生代最大容量  
   Old Generation:  
      OldSize               = 16106127360 (15360.0MB)   # 老年代当前容量  
Heap Usage:  
   New Generation (Eden + Survivors):  
      Eden Space:           capacity = 8589934592 (8192.0MB)  # Eden区容量  
         used               = 2147483648 (2048.0MB)           # 已使用量  
      Survivor Space:  
         capacity           = 1073741824 (1024.0MB)           # 单个Survivor区容量  
         used               = 0 (0.0MB)                        # 使用量  
   Old Generation:  
      capacity              = 16106127360 (15360.0MB)  
      used                  = 5368709120 (5120.0MB)  

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值