在今天的文章中,我来讲述如何使用 Kafka 来部署 Elastic Stack。下面是我们最常用的一个 Elastic Stack 的部署方案图。
在上面我们可以看出来 Beats 的数据可以直接传入到 Elasticsearch 中,但是我们也可以看到另外一条路径,也就是 Beats 的数据也可以传入到 Kafka 中,并传入到 Logstash 中,最终导入到 Elasticsearch 中。那么为什么我们需要这个 Kafka 呢?
日志是不可预测的。 在发生生产事件后,恰恰在你最需要它们时,日志可能突然激增并淹没你的日志记录基础结构。 为了保护 Logstash 和 Elasticsearch 免受此类数据突发攻击,用户部署了缓冲机制以充当消息代理。
在本文中,我将展示如何使用 ELK Stack 和 Kafka 部署建立弹性数据管道所需的所有组件:
- Filebeat – 收集日志并将其转发到 Kafka 主题。
- Kafka – 代理数据流并将其排队。
- Logstash –汇总来自 Kafka 主题的数据,对其进行处理并将其发送到 Elasticsearch。
- Elasticsearch –索引数据。
- Kibana –用于分析数据。
环境
我的环境如下:
在我的配置中,我使用了一个 MacOS 安装 Elasticsearch 及 Kibana。另外我使用另外一个 Ubuntu 18.04 来安装我的 Kafaka, Filebeat 及 Logstash。他们的IP地址分别如上所示。我们可以使用一个VM来安装我们的 Ubuntu 18.04。
安装
为了能够完成我们的设置,我们做如下的安装:
安装 Elasticseach
如果大家还没安装好自己的 Elastic Stack 的话,那么请按照我之前的教程 “如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch” 安装好自己的 Elasticsearch。由于我们的 Elastic Stack 需要被另外一个 Ubuntu VM 来访问,我们需要对我们的 Elasticsearch 进行配置。首先使用一个编辑器打开在 config 目录下的 elasticsearch.yml 配置文件。我们需要修改 network.host 的 IP 地址。在你的 Mac 及 Linux 机器上,我们可以使用:
$ ifconfig
来查看到我们的机器的 IP 地址。针对我的情况,我的 macOS 机器的 IP 地址是:192.168.43.220。
在上面我们把 network.host 设置为 "_site",表明它绑定到我们的本地电脑的 IP 地址。详细说明请参阅 Elasticsearch 的 network.host 说明。
我们也必须在 elasticsearch.yml 的最后加上 discovery.type: single-node,表明我们是单个 node。
等修改完我们的 IP 地址后,我们保存 elasticsearch.yml 文件。然后重新运行我们的 elasticsearch。我们可以在一个浏览器中输入刚才输入的 IP 地址并加上端口号 9200。这样可以查看一下我们的 elasticsearch 是否已经正常运行了。
安装 Kibana
我们可以按照 “如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana” 中介绍的那样来安装我们的 Kibana。由于我们的 Elasticsearch 的 IP 地址已经改变,所以我们必须修改我们的 Kibana 的配置文件。我们使用自己喜欢的编辑器打开在 config 目录下的 kibana.yml 文件,并找到 server.host。把它的值修改为自己的电脑的 IP 地址。针对我的情是:
同时找到 elasticsearch.hosts,并把自己的 IP 地址输入进去:
保存我们的 kibana.yml 文件,并运行我们的 Kibana。同时在浏览器的地址中输入自己的IP地址及 5601 端口:
如果配置成功的话,我们就可以看到上面的画面。
安装 Logstash
我们可以参考文章 “如何安装 Elastic 栈中的 Logstash” 来在 Ubuntu OS 上安装我们的 Logstash。
sudo apt-get install default-jre
curl -L -O https://artifacts.elastic.co/downloads/logstash/logstash-7.3.0.tar.gz
tar -xzvf logstash-7.3.0.tar.gz
检查 Java 的版本信息:
我们创建一个叫做 apache.conf 的 Logstash 配置文件,并且它的内容如下:
apache.conf
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => "apache"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => ["192.168.43.220:9200"]
}
如你所见-我们正在使用 Logstash Kafka 输入插件来定义 Kafka 主机和我们希望 Logstash 从中提取的主题。 我们将对日志进行一些过滤,并将数据发送到本地 Elasticsearch 实例。
安装 Filebeat
在 Ubuntu 上安装 Filebeat 也是非常直接的。我们可以先打开我们的 Kibana。
我们选择 “Apache logs”
我们可以看到如下的画面:
在上面,我们选择 DEB,因为 Ubuntu OS 是 debian 系统。按照上面的步骤,我们可以一步一步地安装 Filebeat。
简单地说:
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.0-amd64.deb
sudo dpkg -i filebeat-7.5.0-amd64.deb
你可以在 /etc/filebeat 找到 filebeat.yml 配置文件。我们修改这个文件的内容为:
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /etc/filebeat/access.log
output.kafka:
codec.format:
string: '%{[@timestamp]} %{[message]}'
hosts: ["192.168.43.192:9092"]
topic: apache
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
在输入部分,我们告诉 Filebeat 要收集哪些日志 - apache 访问日志。 在输出部分,我们告诉 Filebeat 将数据转发到本地 Kafka 服务器和相关主题 apache(将在下一步中安装)。 Kafka 将在 Ubuntu OS 上的 9092 口上接受数据。请注意我在上面配置 access.log 的路径为 /etc/filebeat/access.log。在实际的使用中,我们需要根据 apache 的实际路径来配置,比如 /var/log/apache2/access.log。
请注意 codec.format 指令的使用-这是为了确保正确提取 message 和 timestamp 字段。 否则,这些行将以 JSON 发送到 Kafka。
安装 Kafka
我们的最后也是最后一次安装涉及设置 Apache Kafka(我们的消息代理)。Kafka 使用 ZooKeeper 来维护配置信息和同步,因此在设置 Kafka 之前,我们需要先安装 ZooKeeper:
sudo apt-get install zookeeperd
接下来,让我们下载并解压缩 Kafka:
wget https://apache.mivzakim.net/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -xzvf kafka_2.13-2.4.0.tgz
sudo cp -r kafka_2.13-2.4.0 /opt/kafka
现在,我们准备运行 Kafka,我们将使用以下脚本进行操作:
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
你应该开始在控制台中看到一些 INFO 消息:
我们接下来打开另外一个控制台中,并为 Apache 日志创建一个主题:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic apache
我们创建了一个叫做 apache 的 topic。上面的命令将返回如下的结果:
Created topic apache.
我们现在已经完全为开始管道做好了准备。
开始测试
为了测试方便,我们可以在地址下载一个 Apache access 的 log 文件,并命名为 access.log。我们可以把这个文件存于 /etc/filebeat 这个目录中,尽管这不是一个好的选择。在 access.log 的文档内容像如下文字:
80.135.37.131 - - [11/Sep/2019:23:56:45 +0000] "GET /item/giftcards/4852 HTTP/1.1" 200 91 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1"
68.186.206.172 - - [11/Sep/2019:23:56:50 +0000] "GET /category/cameras HTTP/1.1" 200 129 "/category/software" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"
启动 Kafka
在上面的步骤中,我们已经成功地启动了 Kafka,并同时创建了一个叫做 apache 的 topic
启动 Logstash
我们按照如下的方式来启动Logstash:
./bin/logstash -f apache.conf
我们在 Logstash 的安装目录里运行上面的命令。
启动 Filebeat
在 /etc/filebeat 目录中,有如下的文件:
liuxg@liuxg:/etc/filebeat$ pwd
/etc/filebeat
liuxg@liuxg:/etc/filebeat$ ls
access.log filebeat.reference.yml filebeat.yml.org
fields.yml filebeat.yml modules.d
access.log 在我们当前的目录下。我们按照如下的方式来运行 filebeat:
sudo service filebeat start
如果我们想重新运行 filebeat,可以打入如下的命令:
sudo service restart filebeat
如果由于某种原因,我们想对 access.log 重新被 filebeat 来执行一遍,那么我可以删除如下的 registery 目录:
cd /var/lib/filebeat/
rm -rf registry/
在删除这个 registry 目录后,我们再执行上面的 restart 指令即可。
管道开始流式传输日志需要花费几分钟。 要查看 Kafka 的实际效果,请在单独的标签中输入以下命令:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server
192.168.43.192:9092 --topic apache --from-beginning
请注意上面的 192.168.43.192:9092 是我们在上面 filebeat 中配置的地址。
我们可以在 Kibana 中查看 logstash 的文档:
我们也可以通过创建一个 logstash 的 index pattern 即可对我们的数据进行分析了。