flume抓取数据到kafka(整合)
1、flume-apache.conf --不需要修改
** 监控apache web应用的日志文件
2、flume-hive.conf --不需要修改
** 监控hive日志文件
$ sbin/start-dfs.sh ;sbin/start-yarn.sh ;mr-jobhistory-daemon.sh start historyserver
3、修改flume-collector.conf ------------
a8.sources = r1
a8.channels = c1a8.sinks = k1
# Describe/configure the source
a8.sources.r1.type = exec
a8.sources.r1.command=tail -F /var/log/httpd/access_log
a8.sources.r1.shell =/bin/bash -c
a8.channels.c1.type = memory
a8.channels.c1.capacity = 1000
a8.channels.c1.transactionCapacity = 100
# Describe the sink
a8.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a8.sinks.k1.brokerList = vampire04:9092
a8.sinks.k1.topic = testTopic
# Bind the source and sink to the channel
a8.sources.r1.channels = c1
a8.sinks.k1.channel = c1
测试:(接下来的操作要打开一堆窗口)
1、刷新网页
$ su -
# service httpd start
$ su - tom
$ tail -F /var/log/httpd/access_log
$ bin/flume-ng agent --conf conf/ --name agent8 --conf-file conf/a8.conf
2、启动CDH Hadoop,启动hive
$ tail -F /opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log
> show databases;
$ bin/flume-ng agent --conf conf/ --name agent8 --conf-file conf/a8.conf
3、启动agent8:
$ bin/flume-ng agent --conf conf/ --name agent8 --conf-file conf/a8.conf
进入CDH Hadoop,监控日志变化,注意:路径要修改(监控.temp文件效果会明显点)
4、启动zookeeper
$ sbin/zkServer.sh start
启动kafka
$ bin/kafka-server-start.sh config/server.properties
启动kafka--消费者(消息打印到控制台):
$ bin/kafka-console-consumer.sh --zookeeper blue01.mydomain:2181 --topic testTopic --from-beginning