Elastic:使用 Kafka 部署 Elastic Stack

本文详述了如何利用Kafka作为消息代理,增强ElasticStack的日志处理能力,构建稳定的数据管道。通过Filebeat收集日志,经由Kafka传输至Logstash进行数据处理,最终存储于Elasticsearch中,实现高效的数据分析。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

在今天的文章中,我来讲述如何使用 Kafka 来部署 Elastic Stack。下面是我们最常用的一个 Elastic Stack 的部署方案图。 

在上面我们可以看出来 Beats 的数据可以直接传入到 Elasticsearch 中,但是我们也可以看到另外一条路径,也就是 Beats 的数据也可以传入到 Kafka 中,并传入到 Logstash 中,最终导入到 Elasticsearch 中。那么为什么我们需要这个 Kafka 呢?

日志是不可预测的。 在发生生产事件后,恰恰在你最需要它们时,日志可能突然激增并淹没你的日志记录基础结构。 为了保护 Logstash 和 Elasticsearch 免受此类数据突发攻击,用户部署了缓冲机制以充当消息代理。

在本文中,我将展示如何使用 ELK Stack 和 Kafka 部署建立弹性数据管道所需的所有组件:

  • Filebeat – 收集日志并将其转发到 Kafka 主题。
  • Kafka – 代理数据流并将其排队。
  • Logstash –汇总来自 Kafka 主题的数据,对其进行处理并将其发送到 Elasticsearch。
  • Elasticsearch –索引数据。
  • Kibana –用于分析数据。

环境

我的环境如下:

在我的配置中,我使用了一个 MacOS 安装 Elasticsearch 及 Kibana。另外我使用另外一个 Ubuntu 18.04 来安装我的 Kafaka, Filebeat 及 Logstash。他们的IP地址分别如上所示。我们可以使用一个VM来安装我们的 Ubuntu 18.04。

安装

为了能够完成我们的设置,我们做如下的安装:

安装 Elasticseach

如果大家还没安装好自己的 Elastic Stack 的话,那么请按照我之前的教程 “如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch” 安装好自己的 Elasticsearch。由于我们的 Elastic Stack 需要被另外一个 Ubuntu VM 来访问,我们需要对我们的 Elasticsearch 进行配置。首先使用一个编辑器打开在 config 目录下的 elasticsearch.yml 配置文件。我们需要修改 network.host 的 IP 地址。在你的 Mac 及 Linux 机器上,我们可以使用:

$ ifconfig

来查看到我们的机器的 IP 地址。针对我的情况,我的 macOS 机器的 IP 地址是:192.168.43.220。

在上面我们把 network.host 设置为 "_site",表明它绑定到我们的本地电脑的 IP 地址。详细说明请参阅 Elasticsearch 的 network.host 说明

我们也必须在 elasticsearch.yml 的最后加上 discovery.type: single-node,表明我们是单个 node。

等修改完我们的 IP 地址后,我们保存 elasticsearch.yml 文件。然后重新运行我们的 elasticsearch。我们可以在一个浏览器中输入刚才输入的 IP 地址并加上端口号 9200。这样可以查看一下我们的 elasticsearch 是否已经正常运行了。

安装 Kibana

我们可以按照 “如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana” 中介绍的那样来安装我们的 Kibana。由于我们的 Elasticsearch 的 IP 地址已经改变,所以我们必须修改我们的 Kibana 的配置文件。我们使用自己喜欢的编辑器打开在 config 目录下的 kibana.yml 文件,并找到 server.host。把它的值修改为自己的电脑的 IP 地址。针对我的情是:

同时找到 elasticsearch.hosts,并把自己的 IP 地址输入进去:

保存我们的 kibana.yml 文件,并运行我们的 Kibana。同时在浏览器的地址中输入自己的IP地址及 5601 端口:

如果配置成功的话,我们就可以看到上面的画面。

安装 Logstash

我们可以参考文章 “如何安装 Elastic 栈中的 Logstash” 来在 Ubuntu OS 上安装我们的 Logstash。

sudo apt-get install default-jre
curl -L -O https://artifacts.elastic.co/downloads/logstash/logstash-7.3.0.tar.gz
tar -xzvf logstash-7.3.0.tar.gz

检查 Java 的版本信息:

我们创建一个叫做 apache.conf 的 Logstash 配置文件,并且它的内容如下:

apache.conf

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => "apache"
    }
}

filter {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  geoip {
      source => "clientip"
    }
}

output {
  elasticsearch {
    hosts => ["192.168.43.220:9200"]
  }

如你所见-我们正在使用 Logstash Kafka 输入插件来定义 Kafka 主机和我们希望 Logstash 从中提取的主题。 我们将对日志进行一些过滤,并将数据发送到本地 Elasticsearch 实例。

安装 Filebeat

在 Ubuntu 上安装 Filebeat 也是非常直接的。我们可以先打开我们的 Kibana。

我们选择 “Apache logs”

我们可以看到如下的画面:

在上面,我们选择 DEB,因为 Ubuntu OS 是 debian 系统。按照上面的步骤,我们可以一步一步地安装 Filebeat。

简单地说:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.0-amd64.deb
sudo dpkg -i filebeat-7.5.0-amd64.deb

你可以在 /etc/filebeat 找到 filebeat.yml 配置文件。我们修改这个文件的内容为:

filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /etc/filebeat/access.log
output.kafka:
  codec.format:
    string: '%{[@timestamp]} %{[message]}'
  hosts: ["192.168.43.192:9092"]
  topic: apache
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

在输入部分,我们告诉 Filebeat 要收集哪些日志 - apache 访问日志。 在输出部分,我们告诉 Filebeat 将数据转发到本地 Kafka 服务器和相关主题 apache(将在下一步中安装)。 Kafka 将在 Ubuntu OS 上的 9092 口上接受数据。请注意我在上面配置 access.log 的路径为 /etc/filebeat/access.log。在实际的使用中,我们需要根据 apache 的实际路径来配置,比如 /var/log/apache2/access.log。
请注意 codec.format 指令的使用-这是为了确保正确提取 message 和 timestamp 字段。 否则,这些行将以 JSON 发送到 Kafka。

安装 Kafka

我们的最后也是最后一次安装涉及设置 Apache Kafka(我们的消息代理)。Kafka 使用 ZooKeeper 来维护配置信息和同步,因此在设置 Kafka 之前,我们需要先安装 ZooKeeper:

sudo apt-get install zookeeperd

接下来,让我们下载并解压缩 Kafka:

wget https://apache.mivzakim.net/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -xzvf kafka_2.13-2.4.0.tgz
sudo cp -r kafka_2.13-2.4.0 /opt/kafka

现在,我们准备运行 Kafka,我们将使用以下脚本进行操作:

sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

你应该开始在控制台中看到一些 INFO 消息:

我们接下来打开另外一个控制台中,并为 Apache 日志创建一个主题:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic apache

我们创建了一个叫做 apache 的 topic。上面的命令将返回如下的结果:

Created topic apache.

我们现在已经完全为开始管道做好了准备。

开始测试

为了测试方便,我们可以在地址下载一个 Apache access 的 log 文件,并命名为 access.log。我们可以把这个文件存于 /etc/filebeat 这个目录中,尽管这不是一个好的选择。在 access.log 的文档内容像如下文字:

80.135.37.131 - - [11/Sep/2019:23:56:45 +0000] "GET /item/giftcards/4852 HTTP/1.1" 200 91 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1"
68.186.206.172 - - [11/Sep/2019:23:56:50 +0000] "GET /category/cameras HTTP/1.1" 200 129 "/category/software" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"

启动 Kafka

在上面的步骤中,我们已经成功地启动了 Kafka,并同时创建了一个叫做 apache 的 topic

启动 Logstash

我们按照如下的方式来启动Logstash:

 ./bin/logstash -f apache.conf 

我们在 Logstash 的安装目录里运行上面的命令。

启动 Filebeat

在 /etc/filebeat 目录中,有如下的文件:

liuxg@liuxg:/etc/filebeat$ pwd
/etc/filebeat
liuxg@liuxg:/etc/filebeat$ ls
access.log  filebeat.reference.yml  filebeat.yml.org
fields.yml  filebeat.yml            modules.d

access.log 在我们当前的目录下。我们按照如下的方式来运行 filebeat:

sudo service filebeat start

如果我们想重新运行 filebeat,可以打入如下的命令:

sudo service restart filebeat

如果由于某种原因,我们想对 access.log 重新被 filebeat 来执行一遍,那么我可以删除如下的 registery 目录:

  cd /var/lib/filebeat/
  rm -rf registry/

在删除这个 registry 目录后,我们再执行上面的 restart 指令即可。

管道开始流式传输日志需要花费几分钟。 要查看 Kafka 的实际效果,请在单独的标签中输入以下命令:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server
192.168.43.192:9092 --topic apache --from-beginning

请注意上面的 192.168.43.192:9092 是我们在上面 filebeat 中配置的地址。

我们可以在 Kibana 中查看 logstash 的文档:

我们也可以通过创建一个 logstash  的 index pattern 即可对我们的数据进行分析了。

评论 6
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值