springboot + flume + kafka收集日志
springboot 集成 flume 收集日志存储到 kafka中
·
日志采集flume-kafka
- 前言介绍
本文档是个人学习的记录,以下流程是在windows环境下操作的,要学习的同学可以先安装好flume、kafka直接去官网下载就可以,有什么不足之处欢迎大家提出给与宝贵建议;
简单版本
- springboot+flume采集日志,将logback日志收集到flume中,flume以日志文件形式输出
springboot配置
springboot中引入logback-flume依赖
<dependency>
<groupId>com.teambytes.logback</groupId>
<artifactId>logback-flume-appender_2.10</artifactId>
<version>0.0.9</version>
</dependency>
配置日志文件logback.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 日志存放路径 -->
<property name="log.path" value="D:\\tmp\\log" />
<!-- <property name="log.path" value="D:\log"/>-->
<!-- 日志输出格式 -->
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>
<!-- 控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<!-- 系统日志输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/sys-info.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/sys-info.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/sys-error.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/sys-error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>ERROR</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 用户访问日志输出 -->
<appender name="sys-user" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/sys-user.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 按天回滚 daily -->
<fileNamePattern>${log.path}/sys-user.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<!-- 测试 -->
<appender name="file666" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/file666.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 按天回滚 daily -->
<fileNamePattern>${log.path}/file666.%d{yyyy-MM-dd.HH}.log</fileNamePattern>
<maxHistory>1440</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<!-- 加入如下appender代码 -->
<appender name="flumeTest" class="com.teambytes.logback.flume.FlumeLogstashV1Appender">
<flumeAgents>localhost:4545</flumeAgents>
<flumeProperties>
connect-timeout=4000;
request-timeout=8000
</flumeProperties>
<batchSize>100</batchSize>
<reportingWindow>1000</reportingWindow>
<additionalAvroHeaders>
myHeader = myValue
</additionalAvroHeaders>
<application>JustryDeng's Application</application>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>
%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %message%n%ex
</pattern>
</layout>
</appender>
<logger name="zz.wyk.fk.controller.UserController" level="info">
<appender-ref ref="flumeTest"/>
</logger>
<!-- Spring日志级别控制 -->
<logger name="org.springframework" level="warn"/>
<root level="info">
<appender-ref ref="console"/>
</root>
<!--系统操作日志-->
<root level="info">
<appender-ref ref="file_info"/>
<appender-ref ref="file_error"/>
</root>
<!--系统用户操作日志-->
<logger name="sys-user" level="info">
<appender-ref ref="sys-user"/>
</logger>
<logger name="zz.wyk.fk" level="info">
<appender-ref ref="file666"/>
</logger>
</configuration>
比较重要的是flumeTest将指定的日志文件输入到flume组件中
flume配置文件
- 在flume的conf目录下新建一个example.conf配置文件收集logback日志文件采用avro形式
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 数据接入flume
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4545
# 从flume接出数据: logger采用控制台或者文件输入方式
a1.sinks.k1.type = logger
# 数据通道存储方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将数据接入和数据输出与通道进行关系绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume: flume-ng.cmd agent -conf …/conf -conf-file …/conf/example.conf -name a1 -property flume.root.logger=INFO,console
查看flume日志:
验证OK.
flume + kafka模式采集日志
配置flume采集日志,将日志信息以文件格式存储到flume中,将flume的消息输出到kafka中
springboot配置
- springboot需要配置flume的logback和kafka配置
<dependency>
<groupId>com.teambytes.logback</groupId>
<artifactId>logback-flume-appender_2.10</artifactId>
<version>0.0.9</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 配置一个kafka的消费者:指定消费者topic和groupId
@Configuration
public class FlumeConsumer {
@KafkaListener(topics = "kafka-flume-topic", groupId = "flume-consumer")
public void flume(String msg) {
System.out.println("接收flume输出的消息:" + msg);
}
}
- 配置flume的sinks输出到kafka中的配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置sources
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4545
# 配置sinks
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList=localhost:9092
a1.sinks.k1.topic=kafka-flume-topic
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.serializer.appendNewline=false
# 配置channels通道存储方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定连接
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
springBoot + flume + kafka 收集日志logback输出到kafka中配置到此已经完结;
验证结果:
flume启动的命令: flume-ng.cmd agent -conf …/conf -conf-file …/conf/example.conf -name a1 -property flume.root.logger=INFO,console
更多推荐
已为社区贡献1条内容
所有评论(0)