flume收集数据的详细过程?
3个月前 (02-04)
0 点赞
0 收藏
0 评论
1 已阅读
一、启动zookeeper
zkServer.sh start
二、启动kafka集群
kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
三、在kafka中通过命令创建一个主题 myorders
kafka-topics.sh --create --bootstrap-server master:9092 --replication-factor 1 --partitions 1 --topic myorders
四、在kafka中查看创建的主题,确认是否创建成功
kafka-topics.sh --list --bootstrap-server master:9092
五、启动kafka的消费者,查看myorders主题中是否有数据
kafka-console-consumer.sh --bootstrap-server master:9092 --topic myorders -from-beginning
六、安装flume
(1)解压flume到指定的目录,如/opt/module/
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(2)更改一下名字
cd /opt/module/mv apache-flume-1.9.0-bin/ flume
(3)配置flume环境变量(只配置master机器上的即可)
vi /root/.bash_profile添加下面信息 export FLUME_HOME=/opt/module/flume export PATH=$PATH:$FLUME_HOME/bin使文件生效source /root/.bash_profile
七、配置采集端口的采集器(配置文件)
cd /opt/module/flume/conf vi dk_cjq.conf
将下面的配置写到监听器文件dk_cjq.conf中(来源、缓存、去哪):
#给3个组件起名字a1.sources=r1a1.channels=c1a1.sinks=k1#说明数据来源于端口数据,机器,端口号a1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=10050#说明缓存的位置a1.channels.c1.type=memory#说明采集的数据最终到哪去a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092a1.sinks.k1.kafka.topic=myorders#将3个组件关联起来a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
八、启动flume(占用你端口采集器所声明的端口)
flume-ng agent -c conf -f /opt/module/flume/conf/dk_cjq.conf --name a1 -DFlume.root.logger=INFO,console
九、测试
新开一个master终端,输入:nc localhost 10050,随便输入数据,此时注意kafka的消费者是否出现数据
十、测试采集文件的flume配置
(1)编写一个采集文件变化的采集器配置
cd /opt/module/flume/confvi wj_cjq.conf
配置文件中加入下面这些信息
#给3个组件起名字a1.sources=r1a1.channels=c1a1.sinks=k1#说明数据来源于文件的变化a1.sources.r1.type=execa1.sources.r1.command=tail -F /opt/mylog.log#说明缓存的位置a1.channels.c1.type=memory#说明采集的数据最终到哪去a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092a1.sinks.k1.kafka.topic=myorders#将3个组件关联起来a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
(2)启动flume并指定使用这个配置文件
flume-ng agent -c conf -f /opt/module/flume/conf/wj_cjq.conf --name a1 -DFlume.root.logger=INFO,console
(3)修改监听文件的内容,查看是否有数据出现在kafka中,如果mylog.log文件不存在,要先创建。
echo "1111122222" >> mylog.log
还没有任何评论,你来说两句吧