一、Flume是什么?

官方网站: http://flume.apache.org/
用户文档: http://flume.apache.org/FlumeUserGuide.html
开发文档: http://flume.apache.org/FlumeDeveloperGuide.html

特点:
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,
flume支持接收各类数据发送方,来收集数据,并对收集的数据做简单的处理,如arvo,exec等
flume支持将数据写入各类数据接收方,如文本,hdfs,hbase等

flume的数据流是由时间(Event)贯穿始终,事件是Flume的基本数据单位,它携带日志数据并且携带有头信息,这些Event由Agent外部的Source生成。
Source捕获事件后会进行特定的格式化,然后Source会把事件推入Channel中
Channel可以看作一个缓冲区,它会保存事件直到Sink处理完该事件。
Sink则负责持久化日志或者把事件推向另一个Source

flume具有可靠性和可恢复性

  1. Flume 使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份文件 channel 作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。
    2)flume的可恢复性
    还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。

二、Flume的核心概念

Client:
客户端产生数据,运行在一个独立的线程。

Event:
一个数据单元,消息头和消息体组成(可以是日志记录,avro对象)

Flow:
从源头到达目的点的迁移的抽象

Agent:
一个独立的Flume进程,包含组件Source,Channel,Sink。一个agent可以包含多个sources和sinks。
在这里插入图片描述

Source:
数据收集组件。(source从Client收集数据,传递给Channel)
负责将数据捕获后进行特殊的格式化,将数据封装到事件(event)里,然后再将数据传送到Channel中。Flume中提供了很多内置Source,支持读取的文件格式又Avro,log4j,syslog,和http post(body为json格式)等。这样应用程序可以直接和Source对接。如果内置Source无法满足需求,Flume支持自定义Source。
在这里插入图片描述

Channel:
中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)
 Channel是连接Source和Sink的组件,可以看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。两个较为常用的Channel是MemoryChannel和FileChannel。
 在这里插入图片描述

Sink:
从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)
 Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。
 在这里插入图片描述

三、Flume使用场景

3.1 多个agent顺序连接

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中
在这里插入图片描述

3.2 多个Agent的数据汇聚到同一个Agent

应用场景有:比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
在这里插入图片描述

3.3 多级流

Flume还支持多级流,什么多级流?结合在云开发中的应用来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。
  在这里插入图片描述

3.4 load balance

Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上 。
在这里插入图片描述

四、Flume配置以及使用

4.1 列出两种source的配置

Source的类型有:
Avro Source、Thrift Source、Exec Source、JMS Source、Spooling Directory Source、Taildir Source、Twitter 1% firehose
Source (实验性的)、Kafka Source、NetCat TCP Source、NetCat UDP
Source、Sequence Generator Source、Syslog Sources、Syslog TCP
Source、Multiport Syslog TCP Source、Syslog UDP Source、HTTP
Source、Stress Source、Legacy Sources、Avro Legacy Source、Thrift Legacy
Source、Custom Source、Scribe Source需要的时候可查阅官网参数配置

第一种:Spooling Directory Source
这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。

属性名默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: spooldir.
spoolDirFlume Source监控的文件夹目录,该目录下的文件会被Flume收集
fileSuffix.COMPLETED被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED
deletePolicynever是否删除已完成收集的文件,可选值: never 或 immediate
fileHeaderfalse是否添加文件的绝对路径名(绝对路径+文件名)到header中。
fileHeaderKeyfile添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用)
basenameHeaderfalse是否添加文件名(只是文件名,不包括路径)到header 中
basenameHeaderKeybasename添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用)
includePattern^.*$指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高
ignorePattern^$指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePattern 和 includePattern 两个正则都匹配到,这个文件会被忽略。
trackerDir.flumespool用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建
consumeOrderoldest设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldest 、 youngest 和 random 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。 当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集
pollDelay500Flume监视目录内新文件产生的时间间隔,单位:毫秒
recursiveDirectorySearchfalse是否收集子目录下的日志文件
maxBackoff4000等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。
batchSize100每次批量传输到channel时的size大小
inputCharsetUTF-8解析器读取文件时使用的编码(解析器会把所有文件当做文本读取)
decodeErrorPolicyFAIL当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD; IGNORE :忽略无法解析的字符。
deserializerLINE指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口
deserializer.*解析器的相关属性,根据解析器不同而不同
bufferMaxLines(已废弃)
bufferMaxLineLength5000(已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替
selector.typereplicating可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配置

示例:

user_friends.sources=user_friendsSource
user_friends.channels=user_friendsChannel
user_friends.sinks=user_friendsSink

//----------------------------------------------------------------------
// Source部分的配置,具体作用参考上标中标红字段
user_friends.sources.user_friendsSource.type=spooldir
user_friends.sources.user_friendsSource.spoolDir=/root/software/flumlogfile/                            user_friends
user_friends.sources.user_friendsSource.deserializer=LINE
//提示deserializer.maxLineLength 的默认值是2048,这个数值对于日志行来说有点小,如果实际使用中日志每行字符数可能超过2048,超出的部分会被截断,需要根据自己的日志长度调大这个值。
user_friends.sources.user_friendsSource.deserializer.maxLineLength=32000
user_friends.sources.user_friendsSource.includePattern=user_friends_[0-9]{4}                            -[0-9]{2}-[0-9]{2}.csv
//----------------------------------------------------------------------

user_friends.channels.user_friendsChannel.type=file
user_friends.channels.user_friendsChannel.checkpointDir=/root/software/fluml                            ogfile/checkpoint/user_friends
user_friends.channels.user_friendsChannel.dataDirs=/root/software/flumlogfil                            e/data/user_friends

user_friends.sinks.user_friendsSink.type=hdfs
user_friends.sinks.user_friendsSink.hdfs.fileType=DataStream
user_friends.sinks.user_friendsSink.hdfs.filePrefix=userfriend
user_friends.sinks.user_friendsSink.hdfs.fileSuffix=.csv
user_friends.sinks.user_friendsSink.hdfs.path=hdfs://192.168.21.2:9000/kb11f                            ile/userfriend/%Y-%m-%d
user_friends.sinks.user_friendsSink.hdfs.useLocalTimeStamp=true
user_friends.sinks.user_friendsSink.hdfs.batchSize=640
user_friends.sinks.user_friendsSink.hdfs.rollCount=0
user_friends.sinks.user_friendsSink.hdfs.rollSize=6400000
user_friends.sinks.user_friendsSink.hdfs.rollInterval=30

user_friends.sources.user_friendsSource.channels=user_friendsChannel

第二种:Kafka Source
Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。

属性名默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.serversSource使用的Kafka集群实例列表
kafka.consumer.group.idflume消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组
kafka.topics将要读取消息的目标 Kafka topic 列表,多个用逗号分隔
kafka.topics.regex会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。
batchSize1000
batchDurationMillis1000一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。
backoffSleepIncrement1000当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。
maxBackoffSleep5000Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。
useFlumeEventFormatfalse默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。
setTopicHeadertrue当设置为true时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。
topicHeadertopic如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。
migrateZookeeperOffsetstrue如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。
kafka.consumer.security.protocolPLAINTEXT设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXT、SASL_SSL 和 SSL有关安全设置的其他信息,请参见下文。
more consumer security props如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置
Other Kafka Consumer Properties其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如:kafka.consumer.auto.offset.reset

4.2 列出两种Channel的配置

channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。

Channel的类型有: Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel、Pseudo Transaction Channel、Custom
Channel channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。

第一种:Memory Channel
内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。

属性默认值解释
type组件类型,这个是: memory
capacity100内存中存储 Event 的最大数
transactionCapacity100source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)
keep-alive3添加或删除一个 Event 的超时时间(秒)
byteCapacityBufferPercentage20指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比
byteCapacityChannel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

示例:

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

第二种:File Channel

示例:

user_friends.channels = c1
user_friends.channels.user_friendsChannel.type=file
user_friends.channels.user_friendsChannel.checkpointDir=/root/software/fluml                            ogfile/checkpoint/user_friends
user_friends.channels.user_friendsChannel.dataDirs=/root/software/flumlogfil                            e/data/user_friends

4.3 列出三种Flume Sinks的配置

Sink的类型有:HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、IRC Sink、File Roll Sink、Null Sink、HBaseSinks、HBaseSink、AsyncHBaseSink、MorphlineSolrSink、ElasticSearchSink、Kite Dataset Sink、Kafka Sink、HTTP Sink、Custom Sink

HDFS Sink
Sink将Event写入Hadoop分布式文件系统。
Sink可以根据写入时间、文件大小和Event数量定期滚动文件
Sink可以根据Event自带的时间戳或系统时间等属性对数据进行分区

属性名默认值解释
channel与 Sink 连接的 channel
type组件类型,这个是: hdfs
hdfs.pathHDFS目录路径(例如:hdfs://namenode/flume/webdata/)
hdfs.filePrefixFlumeDataFlume在HDFS文件夹下创建新文件的固定前缀
hdfs.fileSuffixFlume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefixFlume正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix.tmpFlume正在写入的临时文件后缀
hdfs.rollInterval30当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize1024当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount10当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout0关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize100向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.codeC压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop` 、 ``snappy
hdfs.fileTypeSequenceFile文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
hdfs.maxOpenFiles5000允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。
hdfs.writeFormatWritable文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。
hdfs.callTimeout10000允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
hdfs.threadsPoolSize10每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.rollTimerPoolSize1每个HDFS Sink实例调度定时文件滚动的线程数
hdfs.kerberosPrincipal用于安全访问 HDFS 的 Kerberos 用户主体
hdfs.kerberosKeytab用于安全访问 HDFS 的 Kerberos keytab 文件
hdfs.proxyUser代理名
hdfs.roundfalse是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
hdfs.roundValue1向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
hdfs.roundUnitsecond向下舍入的单位,可选值: second 、 minute 、 hour
hdfs.timeZoneLocal Time解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai
hdfs.useLocalTimeStampfalse使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)
hdfs.closeTries0开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。
hdfs.retryInterval180连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。
serializerTEXTEvent 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。
serializer.*根据上面 serializer 配置的类型来根据需要添加序列化器的参数

示例:

user_friends.sinks.user_friendsSink.type=hdfs
user_friends.sinks.user_friendsSink.hdfs.fileType=DataStream
user_friends.sinks.user_friendsSink.hdfs.filePrefix=userfriend
user_friends.sinks.user_friendsSink.hdfs.fileSuffix=.csv
user_friends.sinks.user_friendsSink.hdfs.path=hdfs://192.168.21.2:9000/kb11file/userfriend/%Y-%m-%d
user_friends.sinks.user_friendsSink.hdfs.useLocalTimeStamp=true
user_friends.sinks.user_friendsSink.hdfs.batchSize=640
user_friends.sinks.user_friendsSink.hdfs.rollCount=0
user_friends.sinks.user_friendsSink.hdfs.rollSize=6400000
user_friends.sinks.user_friendsSink.hdfs.rollInterval=30

Logger Sink
Sink将Event输出到日志中,一般用于测试、调用
提示:
一般情况下不会将输出数据流中的原始数据输入到运行日志中,但是为了满足某些特殊情况,可以借助Logger Sink将其输出。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: logger
maxBytesToLog16Event body 输出到日志的最大字节数,超出的部分会被丢弃

配置:

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

Kafka Sink
Sink可以将数据发送到Kafka topic上,从而实现Flume与Kafka集成,以便于处理系统可以处理来自各种Flume Source的数据
1.9版本:

属性默认值解释
type组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.serversKafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔
kafka.topicdefault-flume-topic用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性, Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换, 比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。
flumeBatchSize100一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。
kafka.producer.acks1在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。 设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。
useFlumeEventFormatfalse默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。 与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。
defaultPartitionId指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发
partitionIdHeader设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。 如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID
allowTopicOverridetrue如果设置为 true,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。
topicHeadertopic与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称
kafka.producer.security.protocolPLAINTEXT设置使用哪种安全协议写入 Kafka。可选值:SASL_PLAINTEXT 、 SASL_SSL 和 SSL, 有关安全设置的其他信息,请参见下文。
more producer security props如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置
Other Kafka Producer Properties其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:kafka.producer.linger.ms

1.6版本:(1.9中已弃用)

属性默认值解释
brokerList改用 kafka.bootstrap.servers
topicdefault-flume-topic改用 kafka.topic
batchSize100改用 kafka.flumeBatchSize
requiredAcks1改用 kafka.producer.acks

配置:

interceptor.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptor.sinks.hisink.batchSize=640
interceptor.sinks.hisink.brokerList=192.168.21.2:9092
interceptor.sinks.hisink.topic=hi2

五、课堂笔记

5.1 安装配置以及测试

第一步:解压flume压缩包,并重命名:

# tar xf  flume-ng-1.6.0-cdh5.14.0.tar.gz
# mv apache-flume-1.6.0-cdh5.14.0-bin/  flume160

在这里插入图片描述
第二步:安装nc和telnet:
nc(服务器端Server):可以实现监听服务器端口,并与客户端(最多只能接受一个客户端)对指定服务器进行端口扫描,作为客户端连接到远程服务器进行通信
telnet(客户端Client):连接服务器端口,并进行通信
测试:(在两个session中分别输入)

nc -lk 44444
telnet localhost 44444

5.2 示例1:source从控制台输入、sink从控制台输出

配置文件:

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume agent:

[root@hadoop2 flume]# ./bin/flume-ng agent --name a1 --conf ./conf/ --conf-file ./conf/kb11Job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
控制台输入:
[root@hadoop2 kb11Job]# telnet 127.0.0.1 44444
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hhhhh
OK

flume监控输出:
2021-05-25 10:14:54,576 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]  Event: { headers:{} body: 68 68 68 68 68 0D hhhhh. }

5.3 示例2:source来源于文件、sink从控制台输出

读取 xxx.csv文件

interceptor.sources.interceptorSource.type=spooldir
interceptor.sources.interceptorSource.spoolDir=/root/software/flumlogfile/interceptor
interceptor.sources.interceptorSource.deserializer=LINE
interceptor.sources.interceptorSource.deserializer.maxLineLength=32000
interceptor.sources.interceptorSource.includePattern=interceptor_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

读取 xxx.txt文件

a2.sources.r1.type=exec
a2.sources.r1.command=tail -f /opt/flume160/conf/jobkb09/tmp/tmp.txt

控制台输出:

interceptor.sinks.othersink.type=logger

5.4 示例3:加载csv文件,并sink到hdfs或kafka

sink输出到hdfs:

interceptor.sinks.hellosink.type=hdfs
interceptor.sinks.hellosink.hdfs.fileType=DataStream
interceptor.sinks.hellosink.hdfs.filePrefix=hello
interceptor.sinks.hellosink.hdfs.fileSuffix=.csv
interceptor.sinks.hellosink.hdfs.path=hdfs://192.168.21.2:9000/kb11file/hello2/%Y-%m-%d
interceptor.sinks.hellosink.hdfs.useLocalTimeStamp=true
interceptor.sinks.hellosink.hdfs.batchSize=640
interceptor.sinks.hellosink.hdfs.rollCount=0
interceptor.sinks.hellosink.hdfs.rollSize=6400000
interceptor.sinks.hellosink.hdfs.rollInterval=3

sink输出到kafka消费者(consumer):

interceptor.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptor.sinks.hisink.batchSize=640
interceptor.sinks.hisink.brokerList=192.168.21.2:9092
interceptor.sinks.hisink.topic=hi2

六、操作步骤总结

查看文件头:head -n l xxx.csv

第一步:
在flume目录下创建.conf文件,进行source,channel,sink的配置
vi /xxx/flume/Demo/xxxxx.conf
具体配置如上第四节所示

第二步:
在相应路径下创建channel中设置的 存放扫描欲读取文件的目录、checkpoint和date的目录

[root@hadoop2 events]# mkdir -p /root/software/flumlogfile/user
[root@hadoop2 events]# mkdir -p /root/software/flumlogfile/data/user
[root@hadoop2 events]# mkdir -p /root/software/flumlogfile/checkpoint/user

第三步:
将source欲读取文件cp到在第二步中创建的路径下
(注意,文件名要能满足Source中设置的正则匹配,否则会读取失败)

user_friends.sources.user_friendsSource.includePattern=user_friends_[0-9]{4}
-[0-9]{2}-[0-9]{2}.csv

[root@hadoop2 events]# cp ./users.csv /root/software/flumlogfile/user/user_2021-05-24.csv

第四步:
flume—Agent的启动:
–name:agent名称、必填
–conf-file:指定要执行的配置文件

[root@hadoop2 flume]# ./bin/flume-ng agent --name user_friends --conf ./conf/ --conf-file ./conf/xx/xxx.conf -Dflume.root.logger=INFO,console

如果sink到了kafka需要启动消费者服务查看消息:

kafka-console-consumer.sh --topic mydemo --bootstrap-server 192.168.21.2:9092 --from-beginning

如果sink到了hdfs需要到浏览器xxxxxx:50070上查看消息

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐