Flume(原理解释、配置以及使用)
一、Flume是什么?
官方网站: http://flume.apache.org/
用户文档: http://flume.apache.org/FlumeUserGuide.html
开发文档: http://flume.apache.org/FlumeDeveloperGuide.html
特点:
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,
flume支持接收各类数据发送方,来收集数据,并对收集的数据做简单的处理,如arvo,exec等
flume支持将数据写入各类数据接收方,如文本,hdfs,hbase等
flume的数据流是由时间(Event)贯穿始终,事件是Flume的基本数据单位,它携带日志数据并且携带有头信息,这些Event由Agent外部的Source生成。
Source捕获事件后会进行特定的格式化,然后Source会把事件推入Channel中
Channel可以看作一个缓冲区,它会保存事件直到Sink处理完该事件。
Sink则负责持久化日志或者把事件推向另一个Source
flume具有可靠性和可恢复性
- Flume 使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份文件 channel 作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。
2)flume的可恢复性
还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。
二、Flume的核心概念
Client:
客户端产生数据,运行在一个独立的线程。
Event:
一个数据单元,消息头和消息体组成(可以是日志记录,avro对象)
Flow:
从源头到达目的点的迁移的抽象
Agent:
一个独立的Flume进程,包含组件Source,Channel,Sink。一个agent可以包含多个sources和sinks。
Source:
数据收集组件。(source从Client收集数据,传递给Channel)
负责将数据捕获后进行特殊的格式化,将数据封装到事件(event)里,然后再将数据传送到Channel中。Flume中提供了很多内置Source,支持读取的文件格式又Avro,log4j,syslog,和http post(body为json格式)等。这样应用程序可以直接和Source对接。如果内置Source无法满足需求,Flume支持自定义Source。
Channel:
中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)
Channel是连接Source和Sink的组件,可以看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。两个较为常用的Channel是MemoryChannel和FileChannel。
Sink:
从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)
Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。
三、Flume使用场景
3.1 多个agent顺序连接
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中
3.2 多个Agent的数据汇聚到同一个Agent
应用场景有:比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
3.3 多级流
Flume还支持多级流,什么多级流?结合在云开发中的应用来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。
3.4 load balance
Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上 。
四、Flume配置以及使用
4.1 列出两种source的配置
Source的类型有:
Avro Source、Thrift Source、Exec Source、JMS Source、Spooling Directory Source、Taildir Source、Twitter 1% firehose
Source (实验性的)、Kafka Source、NetCat TCP Source、NetCat UDP
Source、Sequence Generator Source、Syslog Sources、Syslog TCP
Source、Multiport Syslog TCP Source、Syslog UDP Source、HTTP
Source、Stress Source、Legacy Sources、Avro Legacy Source、Thrift Legacy
Source、Custom Source、Scribe Source需要的时候可查阅官网参数配置
第一种:Spooling Directory Source
这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。
属性名 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: spooldir. |
spoolDir | – | Flume Source监控的文件夹目录,该目录下的文件会被Flume收集 |
fileSuffix | .COMPLETED | 被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED |
deletePolicy | never | 是否删除已完成收集的文件,可选值: never 或 immediate |
fileHeader | false | 是否添加文件的绝对路径名(绝对路径+文件名)到header中。 |
fileHeaderKey | file | 添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用) |
basenameHeader | false | 是否添加文件名(只是文件名,不包括路径)到header 中 |
basenameHeaderKey | basename | 添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用) |
includePattern | ^.*$ | 指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高 |
ignorePattern | ^$ | 指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePattern 和 includePattern 两个正则都匹配到,这个文件会被忽略。 |
trackerDir | .flumespool | 用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建 |
consumeOrder | oldest | 设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldest 、 youngest 和 random 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。 当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集 |
pollDelay | 500 | Flume监视目录内新文件产生的时间间隔,单位:毫秒 |
recursiveDirectorySearch | false | 是否收集子目录下的日志文件 |
maxBackoff | 4000 | 等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。 |
batchSize | 100 | 每次批量传输到channel时的size大小 |
inputCharset | UTF-8 | 解析器读取文件时使用的编码(解析器会把所有文件当做文本读取) |
decodeErrorPolicy | FAIL | 当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD; IGNORE :忽略无法解析的字符。 |
deserializer | LINE | 指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口 |
deserializer.* | 解析器的相关属性,根据解析器不同而不同 | |
bufferMaxLines | – | (已废弃) |
bufferMaxLineLength | 5000 | (已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替 |
selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 |
selector.* | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | – | 该source所使用的拦截器,多个用空格分开 |
interceptors.* | 拦截器相关的属性配置 |
示例:
user_friends.sources=user_friendsSource
user_friends.channels=user_friendsChannel
user_friends.sinks=user_friendsSink
//----------------------------------------------------------------------
// Source部分的配置,具体作用参考上标中标红字段
user_friends.sources.user_friendsSource.type=spooldir
user_friends.sources.user_friendsSource.spoolDir=/root/software/flumlogfile/ user_friends
user_friends.sources.user_friendsSource.deserializer=LINE
//提示deserializer.maxLineLength 的默认值是2048,这个数值对于日志行来说有点小,如果实际使用中日志每行字符数可能超过2048,超出的部分会被截断,需要根据自己的日志长度调大这个值。
user_friends.sources.user_friendsSource.deserializer.maxLineLength=32000
user_friends.sources.user_friendsSource.includePattern=user_friends_[0-9]{4} -[0-9]{2}-[0-9]{2}.csv
//----------------------------------------------------------------------
user_friends.channels.user_friendsChannel.type=file
user_friends.channels.user_friendsChannel.checkpointDir=/root/software/fluml ogfile/checkpoint/user_friends
user_friends.channels.user_friendsChannel.dataDirs=/root/software/flumlogfil e/data/user_friends
user_friends.sinks.user_friendsSink.type=hdfs
user_friends.sinks.user_friendsSink.hdfs.fileType=DataStream
user_friends.sinks.user_friendsSink.hdfs.filePrefix=userfriend
user_friends.sinks.user_friendsSink.hdfs.fileSuffix=.csv
user_friends.sinks.user_friendsSink.hdfs.path=hdfs://192.168.21.2:9000/kb11f ile/userfriend/%Y-%m-%d
user_friends.sinks.user_friendsSink.hdfs.useLocalTimeStamp=true
user_friends.sinks.user_friendsSink.hdfs.batchSize=640
user_friends.sinks.user_friendsSink.hdfs.rollCount=0
user_friends.sinks.user_friendsSink.hdfs.rollSize=6400000
user_friends.sinks.user_friendsSink.hdfs.rollInterval=30
user_friends.sources.user_friendsSource.channels=user_friendsChannel
第二种:Kafka Source
Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。
属性名 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | Source使用的Kafka集群实例列表 |
kafka.consumer.group.id | flume | 消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组 |
kafka.topics | – | 将要读取消息的目标 Kafka topic 列表,多个用逗号分隔 |
kafka.topics.regex | – | 会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。 |
batchSize | 1000 | |
batchDurationMillis | 1000 | 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。 |
backoffSleepIncrement | 1000 | 当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 |
maxBackoffSleep | 5000 | Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 |
useFlumeEventFormat | false | 默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。 |
setTopicHeader | true | 当设置为true时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。 |
topicHeader | topic | 如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。 |
migrateZookeeperOffsets | true | 如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。 |
kafka.consumer.security.protocol | PLAINTEXT | 设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXT、SASL_SSL 和 SSL有关安全设置的其他信息,请参见下文。 |
more consumer security props | 如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置 | |
Other Kafka Consumer Properties | – | 其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如:kafka.consumer.auto.offset.reset |
4.2 列出两种Channel的配置
channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。
Channel的类型有: Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel、Pseudo Transaction Channel、Custom
Channel channel 是在 Agent 上暂存 Event 的缓冲池。 Event由source添加,由sink消费后删除。
第一种:Memory Channel
内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: memory |
capacity | 100 | 内存中存储 Event 的最大数 |
transactionCapacity | 100 | source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大) |
keep-alive | 3 | 添加或删除一个 Event 的超时时间(秒) |
byteCapacityBufferPercentage | 20 | 指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比 |
byteCapacity | Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。 |
示例:
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
第二种:File Channel
示例:
user_friends.channels = c1
user_friends.channels.user_friendsChannel.type=file
user_friends.channels.user_friendsChannel.checkpointDir=/root/software/fluml ogfile/checkpoint/user_friends
user_friends.channels.user_friendsChannel.dataDirs=/root/software/flumlogfil e/data/user_friends
4.3 列出三种Flume Sinks的配置
Sink的类型有:HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、IRC Sink、File Roll Sink、Null Sink、HBaseSinks、HBaseSink、AsyncHBaseSink、MorphlineSolrSink、ElasticSearchSink、Kite Dataset Sink、Kafka Sink、HTTP Sink、Custom Sink
HDFS Sink
Sink将Event写入Hadoop分布式文件系统。
Sink可以根据写入时间、文件大小和Event数量定期滚动文件
Sink可以根据Event自带的时间戳或系统时间等属性对数据进行分区
属性名 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 连接的 channel |
type | – | 组件类型,这个是: hdfs |
hdfs.path | – | HDFS目录路径(例如:hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Flume在HDFS文件夹下创建新文件的固定前缀 |
hdfs.fileSuffix | – | Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置) |
hdfs.inUsePrefix | – | Flume正在写入的临时文件前缀,默认没有 |
hdfs.inUseSuffix | .tmp | Flume正在写入的临时文件后缀 |
hdfs.rollInterval | 30 | 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 |
hdfs.rollSize | 1024 | 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 |
hdfs.rollCount | 10 | 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) |
hdfs.idleTimeout | 0 | 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒 |
hdfs.batchSize | 100 | 向 HDFS 写入内容时每次批量操作的 Event 数量 |
hdfs.codeC | – | 压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop` 、 ``snappy |
hdfs.fileType | SequenceFile | 文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数 |
hdfs.maxOpenFiles | 5000 | 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭 |
hdfs.minBlockReplicas | – | 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。 |
hdfs.writeFormat | Writable | 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。 |
hdfs.callTimeout | 10000 | 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒) |
hdfs.threadsPoolSize | 10 | 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等) |
hdfs.rollTimerPoolSize | 1 | 每个HDFS Sink实例调度定时文件滚动的线程数 |
hdfs.kerberosPrincipal | – | 用于安全访问 HDFS 的 Kerberos 用户主体 |
hdfs.kerberosKeytab | – | 用于安全访问 HDFS 的 Kerberos keytab 文件 |
hdfs.proxyUser | 代理名 | |
hdfs.round | false | 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符) |
hdfs.roundValue | 1 | 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30 |
hdfs.roundUnit | second | 向下舍入的单位,可选值: second 、 minute 、 hour |
hdfs.timeZone | Local Time | 解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai |
hdfs.useLocalTimeStamp | false | 使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳) |
hdfs.closeTries | 0 | 开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。 |
hdfs.retryInterval | 180 | 连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。 |
serializer | TEXT | Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。 |
serializer.* | 根据上面 serializer 配置的类型来根据需要添加序列化器的参数 |
示例:
user_friends.sinks.user_friendsSink.type=hdfs
user_friends.sinks.user_friendsSink.hdfs.fileType=DataStream
user_friends.sinks.user_friendsSink.hdfs.filePrefix=userfriend
user_friends.sinks.user_friendsSink.hdfs.fileSuffix=.csv
user_friends.sinks.user_friendsSink.hdfs.path=hdfs://192.168.21.2:9000/kb11file/userfriend/%Y-%m-%d
user_friends.sinks.user_friendsSink.hdfs.useLocalTimeStamp=true
user_friends.sinks.user_friendsSink.hdfs.batchSize=640
user_friends.sinks.user_friendsSink.hdfs.rollCount=0
user_friends.sinks.user_friendsSink.hdfs.rollSize=6400000
user_friends.sinks.user_friendsSink.hdfs.rollInterval=30
Logger Sink
Sink将Event输出到日志中,一般用于测试、调用
提示:
一般情况下不会将输出数据流中的原始数据输入到运行日志中,但是为了满足某些特殊情况,可以借助Logger Sink将其输出。
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: logger |
maxBytesToLog | 16 | Event body 输出到日志的最大字节数,超出的部分会被丢弃 |
配置:
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
Kafka Sink
Sink可以将数据发送到Kafka topic上,从而实现Flume与Kafka集成,以便于处理系统可以处理来自各种Flume Source的数据
1.9版本:
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | – | Kafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔 |
kafka.topic | default-flume-topic | 用于发布消息的 Kafka topic 名称 。如果这个参数配置了值,消息就会被发布到这个 topic 上。如果Event header中包含叫做“topic”的属性, Event 就会被发布到 header 中指定的 topic 上,而不会发布到 kafka.topic 指定的 topic 上。支持任意的 header 属性动态替换, 比如%{lyf}就会被 Event header 中叫做“lyf”的属性值替换(如果使用了这种动态替换,建议将 Kafka 的 auto.create.topics.enable 属性设置为 true )。 |
flumeBatchSize | 100 | 一批中要处理的消息数。设置较大的值可以提高吞吐量,但是会增加延迟。 |
kafka.producer.acks | 1 | 在考虑成功写入之前,要有多少个副本必须确认消息。可选值, 0 :(从不等待确认); 1 :只等待leader确认; -1 :等待所有副本确认。 设置为-1可以避免某些情况 leader 实例失败的情况下丢失数据。 |
useFlumeEventFormat | false | 默认情况下,会直接将 Event body 的字节数组作为消息内容直接发送到 Kafka topic 。如果设置为true,会以 Flume Avro 二进制格式进行读取。 与 Kafka Source 上的同名参数或者 Kafka channel 的 parseAsFlumeEvent 参数相关联,这样以对象的形式处理能使生成端发送过来的 Event header 信息得以保留。 |
defaultPartitionId | – | 指定所有 Event 将要发送到的 Kafka 分区ID,除非被 partitionIdHeader 参数的配置覆盖。 默认情况下,如果没有设置此参数,Event 会被 Kafka 生产者的分发程序分发,包括 key(如果指定了的话),或者被 kafka.partitioner.class 指定的分发程序来分发 |
partitionIdHeader | – | 设置后,Sink将使用 Event header 中使用此属性的值命名的字段的值,并将消息发送到 topic 的指定分区。 如果该值表示无效分区,则将抛出 EventDeliveryException。 如果存在标头值,则此设置将覆盖 defaultPartitionId 。假如这个参数设置为“lyf”,这个 Sink 就会读取 Event header 中的 lyf 属性的值,用该值作为分区ID |
allowTopicOverride | true | 如果设置为 true,会读取 Event header 中的名为 topicHeader 的的属性值,用它作为目标 topic。 |
topicHeader | topic | 与上面的 allowTopicOverride 一起使用,allowTopicOverride 会用当前参数配置的名字从 Event header 获取该属性的值,来作为目标 topic 名称 |
kafka.producer.security.protocol | PLAINTEXT | 设置使用哪种安全协议写入 Kafka。可选值:SASL_PLAINTEXT 、 SASL_SSL 和 SSL, 有关安全设置的其他信息,请参见下文。 |
more producer security props | 如果使用了 SASL_PLAINTEXT 、 SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为生产者增加安全相关的参数配置 | |
Other Kafka Producer Properties | – | 其他一些 Kafka 生产者配置参数。任何 Kafka 支持的生产者参数都可以使用。唯一的要求是使用“kafka.producer.”这个前缀来配置参数,比如:kafka.producer.linger.ms |
1.6版本:(1.9中已弃用)
属性 | 默认值 | 解释 |
---|---|---|
brokerList | – | 改用 kafka.bootstrap.servers |
topic | default-flume-topic | 改用 kafka.topic |
batchSize | 100 | 改用 kafka.flumeBatchSize |
requiredAcks | 1 | 改用 kafka.producer.acks |
配置:
interceptor.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptor.sinks.hisink.batchSize=640
interceptor.sinks.hisink.brokerList=192.168.21.2:9092
interceptor.sinks.hisink.topic=hi2
五、课堂笔记
5.1 安装配置以及测试
第一步:解压flume压缩包,并重命名:
# tar xf flume-ng-1.6.0-cdh5.14.0.tar.gz
# mv apache-flume-1.6.0-cdh5.14.0-bin/ flume160
第二步:安装nc和telnet:
nc(服务器端Server):可以实现监听服务器端口,并与客户端(最多只能接受一个客户端)对指定服务器进行端口扫描,作为客户端连接到远程服务器进行通信
telnet(客户端Client):连接服务器端口,并进行通信
测试:(在两个session中分别输入)
nc -lk 44444
telnet localhost 44444
5.2 示例1:source从控制台输入、sink从控制台输出
配置文件:
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume agent:
[root@hadoop2 flume]# ./bin/flume-ng agent --name a1 --conf ./conf/ --conf-file ./conf/kb11Job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
控制台输入:
[root@hadoop2 kb11Job]# telnet 127.0.0.1 44444
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hhhhh
OK
flume监控输出:
2021-05-25 10:14:54,576 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 68 68 68 68 0D hhhhh. }
5.3 示例2:source来源于文件、sink从控制台输出
读取 xxx.csv文件
interceptor.sources.interceptorSource.type=spooldir
interceptor.sources.interceptorSource.spoolDir=/root/software/flumlogfile/interceptor
interceptor.sources.interceptorSource.deserializer=LINE
interceptor.sources.interceptorSource.deserializer.maxLineLength=32000
interceptor.sources.interceptorSource.includePattern=interceptor_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
读取 xxx.txt文件
a2.sources.r1.type=exec
a2.sources.r1.command=tail -f /opt/flume160/conf/jobkb09/tmp/tmp.txt
控制台输出:
interceptor.sinks.othersink.type=logger
5.4 示例3:加载csv文件,并sink到hdfs或kafka
sink输出到hdfs:
interceptor.sinks.hellosink.type=hdfs
interceptor.sinks.hellosink.hdfs.fileType=DataStream
interceptor.sinks.hellosink.hdfs.filePrefix=hello
interceptor.sinks.hellosink.hdfs.fileSuffix=.csv
interceptor.sinks.hellosink.hdfs.path=hdfs://192.168.21.2:9000/kb11file/hello2/%Y-%m-%d
interceptor.sinks.hellosink.hdfs.useLocalTimeStamp=true
interceptor.sinks.hellosink.hdfs.batchSize=640
interceptor.sinks.hellosink.hdfs.rollCount=0
interceptor.sinks.hellosink.hdfs.rollSize=6400000
interceptor.sinks.hellosink.hdfs.rollInterval=3
sink输出到kafka消费者(consumer):
interceptor.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptor.sinks.hisink.batchSize=640
interceptor.sinks.hisink.brokerList=192.168.21.2:9092
interceptor.sinks.hisink.topic=hi2
六、操作步骤总结
查看文件头:head -n l xxx.csv
第一步:
在flume目录下创建.conf文件,进行source,channel,sink的配置
vi /xxx/flume/Demo/xxxxx.conf
具体配置如上第四节所示
第二步:
在相应路径下创建channel中设置的 存放扫描欲读取文件的目录、checkpoint和date的目录
[root@hadoop2 events]# mkdir -p /root/software/flumlogfile/user
[root@hadoop2 events]# mkdir -p /root/software/flumlogfile/data/user
[root@hadoop2 events]# mkdir -p /root/software/flumlogfile/checkpoint/user
第三步:
将source欲读取文件cp到在第二步中创建的路径下
(注意,文件名要能满足Source中设置的正则匹配,否则会读取失败)
user_friends.sources.user_friendsSource.includePattern=user_friends_[0-9]{4}
-[0-9]{2}-[0-9]{2}.csv
[root@hadoop2 events]# cp ./users.csv /root/software/flumlogfile/user/user_2021-05-24.csv
第四步:
flume—Agent的启动:
–name:agent名称、必填
–conf-file:指定要执行的配置文件
[root@hadoop2 flume]# ./bin/flume-ng agent --name user_friends --conf ./conf/ --conf-file ./conf/xx/xxx.conf -Dflume.root.logger=INFO,console
如果sink到了kafka需要启动消费者服务查看消息:
kafka-console-consumer.sh --topic mydemo --bootstrap-server 192.168.21.2:9092 --from-beginning
如果sink到了hdfs需要到浏览器xxxxxx:50070上查看消息
更多推荐