注意:如果是做kafka链接flume实验的小伙伴,可以直接从第五步开始

一,Windows下配置环境变量

此电脑->右键属性->系统->高级系统设置->环境变量

(1) 环境变量名:FLUME_HOME

变量值:就是安装的flume的路径

例如我自己的是:FLUME_HOME;E:\flume\apache-flume-1.9.0-bin

(2)在path变量中添加 %FLUME_HOME%\conf;%FLUME_HOME%\bin;然后依次保存我们所做的修改即可。 编辑 apache-flume-1.9.0-bin\conf 下的flume-env.sh(如果没有,复制flume-env.sh.template重命名为flume-env.sh) 在最后行输入:export JAVA_HOME = 安装的jdk路径

(3)打开cmd输入:flume-ng version,显示相对应的版本号,则flume安装成功

二,测试运行flume

我们可以配置一个agent,然后保存为example.conf文件,把它放在apache-flume-1.9.0-bin\conf目录下面

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

接下来就是运行这个agent了,我们可以直接在cmd的格式下进入conf文件夹,出现下面两个效果,则说明成功运行了flume

命令:flume-ng agent --conf ../conf --conf-file ../conf/example.conf --name a1 -property flume.root.logger=INFO,console

 运行完后,我们可以看到localhost端口号为44444,可以通过telent客户端测试一下flume

注意:Windows下开启Telnet服务

步骤:控制面板->程序->程序和功能->启用或关闭Windows功能->Telnet客户端   勾选 确定即可。

三,Telnet客户端测试

打开一个新的cmd 进行:telnet localhost 44444

当我们在telnet客户端,输入一些信息后,flume端会监听到并打印日志

四 ,安装过程中不显示版本号,报错,出现下面这样的情况

  Did not find E:\flume\apache-flume-1.9.0-bin\conf\flume-env.ps1
  Test-Path : 路径中具有非法字符。

解决方法:进入conf/flume-env.ps1下,然后在该文件中ctrl+F,发现Test-Path共出现在三个地方:GetHadoopHome、GetHbaseHome、GetHiveHome。尝试注释掉这些代码行(339-405)或直接删除,再次运行后发现flume可以正常显示version了。
 

五,flume和kafka组合使用

(1)打开第1个cmd窗口,在kafka的解压路径下,启动Zookeeper服务: 

 .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

(2)打开第2个cmd窗口,在kafka的解压路径下,启动Kafka服务:

  .\bin\windows\kafka-server-start.bat .\config\server.properties

(3)打开第3个cmd窗口,在kafka的解压路径下,创建一个名为tests的Topic:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tests

(4)我们在Flume的安装目录的conf子目录下创建一个配置文件kafka.conf,内容如下:

#设置名称
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#配置Source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = localhost:9092
a1.sources.r1.kafka.topics = tests    

#配置Sink
a1.sinks.k1.type = logger

#配置channels
a1.channels.c1.type=memory
a1.channels.c1.capacity=500000
a1.channels.c1.transactionCapacity=600

#绑定sink source到channels上
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

(5)打开第4个cmd窗口,在flume的解压路径下,启动Flume:

 .\bin\flume-ng.cmd agent --conf ./conf --conf-file ./conf/kafka.conf --name a1 -property flume.root.logger=INFO,console

(6)打开第5个cmd窗口,执行以下命令:

  telnet localhost 44444

注意:Windows下开启Telnet服务

步骤:控制面板->程序->程序和功能->启用或关闭Windows功能->Telnet客户端   勾选 确定即可。

执行上面命令以后,我们可以在该窗口内用键盘任意输入一些单词,比如“this is my valentines”。这个单词会发送给Flume,然后,Flume发送给Kafka。

(7)打开第6个cmd窗口,在flume的解压路径下,执行如下命令:

 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic tests --from-beginning

这个时候如果你的屏幕上显示“this is my valentines”这几个单词,就说明kafka成功收到的数据,证明你的flume和kafka操作没有出现问题。

注意:安装Python第三方包,可以使用国内豆瓣源下载,这样会快一些

python -m pip install (库名) --force-reinstall -i https://pypi.doubanio.com/simple/ pip

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐