flume收集数据的详细过程?

3个月前 (02-04) 0 点赞 0 收藏 0 评论 1 已阅读

一、启动zookeeper

zkServer.sh start

二、启动kafka集群

kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

三、在kafka中通过命令创建一个主题 myorders

kafka-topics.sh --create --bootstrap-server master:9092 --replication-factor 1 --partitions 1 --topic myorders

四、在kafka中查看创建的主题,确认是否创建成功

kafka-topics.sh --list --bootstrap-server master:9092

五、启动kafka的消费者,查看myorders主题中是否有数据

kafka-console-consumer.sh --bootstrap-server master:9092 --topic myorders -from-beginning

六、安装flume

(1)解压flume到指定的目录,如/opt/module/

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

(2)更改一下名字

cd /opt/module/mv apache-flume-1.9.0-bin/ flume

(3)配置flume环境变量(只配置master机器上的即可)

vi /root/.bash_profile添加下面信息   export FLUME_HOME=/opt/module/flume   export PATH=$PATH:$FLUME_HOME/bin使文件生效source /root/.bash_profile

七、配置采集端口的采集器(配置文件)

cd /opt/module/flume/conf vi dk_cjq.conf

将下面的配置写到监听器文件dk_cjq.conf中(来源、缓存、去哪):

#给3个组件起名字a1.sources=r1a1.channels=c1a1.sinks=k1#说明数据来源于端口数据,机器,端口号a1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=10050#说明缓存的位置a1.channels.c1.type=memory#说明采集的数据最终到哪去a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092a1.sinks.k1.kafka.topic=myorders#将3个组件关联起来a1.sources.r1.channels=c1a1.sinks.k1.channel=c1

八、启动flume(占用你端口采集器所声明的端口)

flume-ng agent -c conf -f /opt/module/flume/conf/dk_cjq.conf --name a1 -DFlume.root.logger=INFO,console

九、测试

新开一个master终端,输入:nc localhost 10050,随便输入数据,此时注意kafka的消费者是否出现数据

十、测试采集文件的flume配置

(1)编写一个采集文件变化的采集器配置

cd /opt/module/flume/confvi wj_cjq.conf

配置文件中加入下面这些信息

#给3个组件起名字a1.sources=r1a1.channels=c1a1.sinks=k1#说明数据来源于文件的变化a1.sources.r1.type=execa1.sources.r1.command=tail -F /opt/mylog.log#说明缓存的位置a1.channels.c1.type=memory#说明采集的数据最终到哪去a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092a1.sinks.k1.kafka.topic=myorders#将3个组件关联起来a1.sources.r1.channels=c1a1.sinks.k1.channel=c1

(2)启动flume并指定使用这个配置文件

flume-ng agent -c conf -f /opt/module/flume/conf/wj_cjq.conf --name a1 -DFlume.root.logger=INFO,console

(3)修改监听文件的内容,查看是否有数据出现在kafka中,如果mylog.log文件不存在,要先创建。

echo "1111122222" >> mylog.log


本文收录在
0评论

登录

忘记密码 ?

切换登录

注册