Flume的安装及使用

Flume的安装

1、上传至虚拟机,并解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/

在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft
alias soft=‘cd /usr/local/soft/’

2、重命名目录,并配置环境变量
mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile
3、查看flume版本
flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 
4、测试flume
  • 监控一个目录,将数据打印出来

    • 配置文件
    # a1表示agent的名字 可以自定义
    # 给sources(在一个agent里可以定义多个source)取个名字
    a1.sources = r1
    # 给channel个名字
    a1.channels = c1
    # 给channel个名字
    a1.sinks = k1 
    
    # 对source进行配置
    # agent的名字.sources.source的名字.参数 = 参数值
    
    # source的类型 spoolDir(监控一个目录下的文件的变化)
    a1.sources.r1.type = spooldir 
    # 监听哪一个目录
    a1.sources.r1.spoolDir = /root/data 
    # 是否在event的headers中保存文件的绝对路径
    a1.sources.r1.fileHeader = true
    # 给拦截器取个名字 i1
    a1.sources.r1.interceptors = i1 
    # 使用timestamp拦截器,将处理数据的时间保存到event的headers中
    a1.sources.r1.interceptors.i1.type = timestamp
    
    # 配置channel
    a1.channels.c1.type = memory 
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 配置sink为logger
    # 直接打印到控制台
    a1.sinks.k1.type = logger
    
    # 将source、channel、sink组装成agent
    a1.sources.r1.channels = c1 
    a1.sinks.k1.channel = c1
    
    • 启动agent
    flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
    
    • 新建/root/data目录
    mkdir /root/data
    
    • 在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志
    # 随意在a.txt中加入一些内容
    vim /root/data/a.txt
    
5、flume的使用
  • spoolingToHDFS.conf

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = spooldir 
    a.sources.r1.spoolDir = /root/data 
    a.sources.r1.fileHeader = true 
    a.sources.r1.interceptors = i1 
    a.sources.r1.interceptors.i1.type = timestamp
    #指定sink的类型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /flume/data/dir1
    # 指定文件名前缀
    a.sinks.k1.hdfs.filePrefix = student
    # 指定达到多少数据量写一次文件 单位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少条写一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件类型为 流 来什么输出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件输出格式 为text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后缀
    a.sinks.k1.hdfs.fileSuffix = .txt
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 在 /root/data/目录下准备数据
    The Zen of Python, by Tim Peters
    
    Beautiful is better than ugly.
    Explicit is better than implicit.
    Simple is better than complex.
    Complex is better than complicated.
    Flat is better than nested.
    Sparse is better than dense.
    Readability counts.
    Special cases aren't special enough to break the rules.
    Although practicality beats purity.
    Errors should never pass silently.
    Unless explicitly silenced.
    In the face of ambiguity, refuse the temptation to guess.
    There should be one-- and preferably only one --obvious way to do it.
    Although that way may not be obvious at first unless you're Dutch.
    Now is better than never.
    Although never is often better than *right* now.
    If the implementation is hard to explain, it's a bad idea.
    If the implementation is easy to explain, it may be a good idea.
    Namespaces are one honking great idea -- let's do more of those!
    
    • 启动agent
    flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
    
  • hbaseLogToHDFS

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = exec 
    a.sources.r1.command = tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
    #指定sink的类型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /flume/data/dir2
    # 指定文件名前缀
    a.sinks.k1.hdfs.filePrefix = hbaselog
    # 指定达到多少数据量写一次文件 单位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少条写一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件类型为 流 来什么输出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件输出格式 为text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后缀
    a.sinks.k1.hdfs.fileSuffix = .txt
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
  • hbaselogToHBase

    • 在hbase中创建log表
    create 'log','cf1'
    
    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = exec 
    a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
    #指定sink的类型
    a.sinks.k1.type = hbase
    a.sinks.k1.table = log
    a.sinks.k1.columnFamily = cf1
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 100000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
  • netcatLogger

    监听telnet端口

    • 安装telnet
    yum install telnet
    
    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = netcat 
    a.sources.r1.bind = 0.0.0.0 
    a.sources.r1.port = 8888 
    
    #指定sink的类型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 启动

      • 先启动agent
      flume-ng agent -n a -f ./netcatToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 再启动telnet
      telnet master 8888
      
  • httpToLogger

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = http 
    a.sources.r1.port = 6666 
    
    #指定sink的类型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 启动

      • 先启动agent
      flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 再使用curl发起一个http请求
      curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://master:6
      
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐