多路复用:实时监听 HTTP 端口数据到控制台案例分析
案例需求:假设有一个生产场景,Flume1 在持续监控一个 HTTP 端口的数据流,要求根据 Event 中 Header 的某个 key 的值,将不同的 Event 发送到不同的 Channel 中,然后 Flume2 和 Flume3 将获取的数据通过 Sink 端分别写出到控制台。
需求分析:在实际开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的 Channel。此时会用到 Flume 拓扑结构中的 Multiplexing(多路复用)结构,Multiplexing 结构的原理是, 根据 Event 中 Header 的某个 key 的值,将不同的 Event 发送到不同的 Channel 中,所以我们需要使用 Multiplexing Channel Selector(多路复用通道选择器)。
在该实训中,Flume1 监听本机 4141 端口的数据流,我们以端口数据模拟日志,如果 Event headers 中 key 的值为“Flume”就发往 mc1 通道,如果 Event headers 中 key 的值为“Hadoop”就发往 mc2 通道,然后 Flume2 和 Flume3 分别获取 mc1 和 mc2 的数据并分别写出到控制台。过程如下图所示:
要求:配置 HTTP Source 监控端口 4141 数据流,配置 Avro Sink 输出数据到下一级 Flume。
flume1
-
创建 Agent 配置文件(HTTP Source + File Channel + Avro Sink)
进入 /root/software/apache-flume-1.9.0-bin/conf 目录,使用 touch 命令创建一个名为 flume1_http_avro.conf 的配置文件。 -
查看端口
使用 netstat -nlp | grep 4141 命令查看 4141、6666 和 6667 端口是否被占用。 -
编辑配置文件 flume1_http_avro.conf
使用 vim 命令打开 flume1_http_avro.conf 文件,在里面添加如下配置:
(1)配置 Flume Agent——a1
首先,我们需要为 a1 命名/列出组件,将数据源(Source)、缓冲通道(Channel)和接收器(Sink)分别命名为 httpSource、 mc1 、 mc2 、 avroSink1 和 avroSink2。
(2)描述和配置 HTTP Source
HTTP Source 可以通过 HTTP POST 和 GET 请求方式接收 Events 数据,GET 通常只能用于测试使用。HTTP 请求会被实现了 HTTPSourceHandler 接口的 handler(处理器)可插拔插件转成 Flume Events,这个 handler 接收一个 HttpServletRequest 后并返回一个 Flume Events 列表。从一次 HTTP 请求解析出来的若干个 Events 会以一个事务提交到 Channel,从而在诸如 File Channel 的一些 Channel 上提高效率。如果 handler 抛出异常,这个 HTTP 的响应状态码是 400。如果 Channel 满了或者无法发送 Events 到 Channel,此时会返回 HTTP 状态码503(服务暂时不可用)。
我们需要为 httpSource 设置以下属性:
属性名称 描述 设置值
channels 与 Source 绑定的 Channel mc1 mc2
type 数据源的类型 http
port 要监听的服务端口 4141(任意可用端口)
bind 要监听的主机名或 IP 地址 localhost(本机)
selector.type 配置 Channel 选择器。可选值:replicating 或 multiplexing,分别表示: 复制、多路复用 multiplexing
selector.header 配置 Event headers 中的 key subject
selector.mapping.Flume 配置 Event headers 中 key 对应的 value,如果是“Flume”就发往 mc1 mc1
selector.mapping.Hadoop 配置 Event headers 中 key 对应的 value,如果是“Hadoop”就发往 mc2 mc2
(3)描述和配置 Avro Sink
Avro Sink 可以作为 Flume 分层收集特性的下半部分。发送到此 Sink 的 Flume Event 将转换为 Avro Event 发送到配置的主机/端口上。Event 从配置的 Channel 中批量获取,数量根据配置的 batch-size 而定。
我们需要为 avroSink1 设置以下属性:
属性名称 描述 设置值
channel 与 Sink 绑定的 Channel mc1
type 接收器的类型 avro
hostname 要监听的主机名或 IP 地址 localhost(本机)
port 要监听的服务端口 6666(任意可用端口)
我们需要为 avroSink2 设置以下属性:
属性名称 描述 设置值
channel 与 Sink 绑定的 Channel mc2
type 接收器的类型 avro
hostname 要监听的主机名或 IP 地址 localhost(本机)
port 要监听的服务端口 6667(任意可用端口)
(4)描述和配置 File Channel
File Channel 是 Flume 的持久通道,它将所有的 Event 写入磁盘,因此不会丢失进程或机器关机、崩溃时的数据,可以很好的保证数据的完整性和一致性。File Channel 通过在一次事务中提交多个 Event 来提高吞吐量,做到了只要事务被提交,那么数据就不会有丢失。在具体配置 File Channel 时,建议 File Channel 设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
我们需要为 mc1 设置以下属性:
属性名称 描述 设置值
type 缓冲通道的类型 file
checkpointDir 存储检查点文件的目录,默认值:~/.flume/file-channel/checkpoint /root/flumedata/mc1_checkpoint
dataDirs 用于存储日志文件的目录,可以使用多个,多目录之间使用逗号分隔。在不同物理磁盘上使用多个目录可以提高文件通道的性能。默认值:~/.flume/file-channel/data /root/flumedata/mc1_data
我们需要为 mc2 设置以下属性:
属性名称 描述 设置值
type 缓冲通道的类型 file
checkpointDir 存储检查点文件的目录,默认值:~/.flume/file-channel/checkpoint /root/flumedata/mc2_checkpoint
dataDirs 用于存储日志文件的目录,可以使用多个,多目录之间使用逗号分隔。在不同物理磁盘上使用多个目录可以提高文件通道的性能。默认值:~/.flume/file-channel/data /root/flumedata/mc2_data
a1.sources = httpSource
a1.channels = mc1 mc2
a1.sinks = avroSink1 avroSink2
a1.sources.httpSource.channels = mc1 mc2
a1.sources.httpSource.type = http
a1.sources.httpSource.bind = localhost
a1.sources.httpSource.port = 4141
a1.sources.httpSource.selector.type = multiplexing
a1.sources.httpSource.selector.header = subject
a1.sources.httpSource.selector.mapping.Flume = mc1
a1.sources.httpSource.selector.mapping.Hadoop = mc2
a1.sinks.avroSink1.channel = mc1
a1.sinks.avroSink1.type = avro
a1.sinks.avroSink1.hostname = localhost
a1.sinks.avroSink1.port = 6666
a1.sinks.avroSink2.channel = mc2
a1.sinks.avroSink2.type = avro
a1.sinks.avroSink2.hostname = localhost
a1.sinks.avroSink2.port = 6667
a1.channels.mc1.type = file
a1.channels.mc1.checkpointDir = /root/flumedata/mc1_checkpoint
a1.channels.mc1.dataDirs = /root/flumedata/mc1_data
a1.channels.mc2.type = file
a1.channels.mc2.checkpointDir = /root/flumedata/mc2_checkpoint
a1.channels.mc2.dataDirs = /root/flumedata/mc2_data
vim flume2_avro_logger.conf
a2.sources = avroSource
a2.channels = memoryChannel
a2.sinks = loggerSink
a2.sources.avroSource.channels = memoryChannel
a2.sources.avroSource.type = avro
a2.sources.avroSource.bind = localhost
a2.sources.avroSource.port = 6666
a2.sinks.loggerSink.channel = memoryChannel
a2.sinks.loggerSink.type = logger
a2.channels.memoryChannel.type = memory
a2.channels.memoryChannel.capacity = 1000
a2.channels.memoryChannel.transactionCapacity = 100
vim flume3_avro_logger.conf
a3.sources = avroSource
a3.channels = memoryChannel
a3.sinks = loggerSink
a3.sources.avroSource.channels = memoryChannel
a3.sources.avroSource.type = avro
a3.sources.avroSource.bind = localhost
a3.sources.avroSource.port = 6667
a3.sinks.loggerSink.channel = memoryChannel
a3.sinks.loggerSink.type = logger
a3.channels.memoryChannel.type = memory
a3.channels.memoryChannel.capacity = 1000
a3.channels.memoryChannel.transactionCapacity = 100
flume-ng agent -c conf/ -f conf/flume3_avro_logger.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c conf/ -f conf/flume2_avro_logger.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c conf/ -f conf/flume1_http_avro.conf -n a1 -Dflume.root.logger=INFO,console
- 往监听端口发送数据
打开另一个终端,使用 curl -X POST -d 命令发起 POST 请求并往监听端口发送数据。具体命令如下所示:
curl -X POST -d '[{ "headers" :{"subject" : "Flume"},"body" : "shiny"}]' http://localhost:4141
curl -X POST -d '[{ "headers" :{"subject" : "Flume"},"body" : "angel"}]' http://localhost:4141
curl -X POST -d '[{ "headers" :{"subject" : "Hadoop"},"body" : "mark"}]' http://localhost:4141
curl -X POST -d '[{ "headers" :{"subject" : "Hadoop"},"body" : "cendy"}]' http://localhost:4141
curl -X POST -d '[{ "headers" :{"subject" : "BigData"},"body" : "judy"}]' http://localhost:4141
- 查看采集结果
(1)Flume Agent a1 终端
在启动源发送的代理终端查看控制台输出。如下图所示:
image.png
从上图可以看出,Flume 已经准确监听并采集到了监听应用发送的 HTTP 请求数据,并成功将接收的所有 Events 写入磁盘。
(2)Flume Agent a2 终端
在启动源第一个接事件的代理终端查看控制台输出。如下图所示:
image.png
这里可以清楚的看到,这个接事件代理只收到了2个事件,因为我们设置了多路复用,只接收 Event headers 中 value 为 “Flume”的事件。
(3)Flume Agent a3 终端
在启动源第二个接事件的代理终端查看控制台输出。如下图所示:
image.png
从上图可以看出,这个接事件代理接收了 Event headers 中 value 为 “Hadoop”的事件。另外,Event headers 中 value 为 “BigData”的事件2个接事件代理都没有接收。