日志采集flume-kafka

  • 前言介绍

本文档是个人学习的记录,以下流程是在windows环境下操作的,要学习的同学可以先安装好flume、kafka直接去官网下载就可以,有什么不足之处欢迎大家提出给与宝贵建议;

简单版本

  • springboot+flume采集日志,将logback日志收集到flume中,flume以日志文件形式输出

springboot配置

springboot中引入logback-flume依赖

<dependency>
    <groupId>com.teambytes.logback</groupId>
    <artifactId>logback-flume-appender_2.10</artifactId>
    <version>0.0.9</version>
</dependency>

配置日志文件logback.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!-- 日志存放路径 -->
    <property name="log.path" value="D:\\tmp\\log" />
    <!--    <property name="log.path" value="D:\log"/>-->
    <!-- 日志输出格式 -->
    <property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n"/>

    <!-- 控制台输出 -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${log.pattern}</pattern>
        </encoder>
    </appender>

    <!-- 系统日志输出 -->
    <appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/sys-info.log</file>
        <!-- 循环政策:基于时间创建日志文件 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志文件名格式 -->
            <fileNamePattern>${log.path}/sys-info.%d{yyyy-MM-dd}.log</fileNamePattern>
            <!-- 日志最大的历史 60天 -->
            <maxHistory>60</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${log.pattern}</pattern>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <!-- 过滤的级别 -->
            <level>INFO</level>
            <!-- 匹配时的操作:接收(记录) -->
            <onMatch>ACCEPT</onMatch>
            <!-- 不匹配时的操作:拒绝(不记录) -->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/sys-error.log</file>
        <!-- 循环政策:基于时间创建日志文件 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志文件名格式 -->
            <fileNamePattern>${log.path}/sys-error.%d{yyyy-MM-dd}.log</fileNamePattern>
            <!-- 日志最大的历史 60天 -->
            <maxHistory>60</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${log.pattern}</pattern>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <!-- 过滤的级别 -->
            <level>ERROR</level>
            <!-- 匹配时的操作:接收(记录) -->
            <onMatch>ACCEPT</onMatch>
            <!-- 不匹配时的操作:拒绝(不记录) -->
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 用户访问日志输出  -->
    <appender name="sys-user" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/sys-user.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 按天回滚 daily -->
            <fileNamePattern>${log.path}/sys-user.%d{yyyy-MM-dd}.log</fileNamePattern>
            <!-- 日志最大的历史 60天 -->
            <maxHistory>60</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${log.pattern}</pattern>
        </encoder>
    </appender>

    <!-- 测试  -->
    <appender name="file666" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/file666.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 按天回滚 daily -->
            <fileNamePattern>${log.path}/file666.%d{yyyy-MM-dd.HH}.log</fileNamePattern>
            <maxHistory>1440</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>${log.pattern}</pattern>
        </encoder>
    </appender>

    <!-- 加入如下appender代码 -->
    <appender name="flumeTest" class="com.teambytes.logback.flume.FlumeLogstashV1Appender">
        <flumeAgents>localhost:4545</flumeAgents>
        <flumeProperties>
            connect-timeout=4000;
            request-timeout=8000
        </flumeProperties>
        <batchSize>100</batchSize>
        <reportingWindow>1000</reportingWindow>
        <additionalAvroHeaders>
            myHeader = myValue
        </additionalAvroHeaders>
        <application>JustryDeng's Application</application>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>
                %d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %message%n%ex
            </pattern>
        </layout>
    </appender>

    <logger name="zz.wyk.fk.controller.UserController" level="info">
        <appender-ref ref="flumeTest"/>
    </logger>

    <!-- Spring日志级别控制  -->
    <logger name="org.springframework" level="warn"/>

    <root level="info">
        <appender-ref ref="console"/>
    </root>

    <!--系统操作日志-->
    <root level="info">
        <appender-ref ref="file_info"/>
        <appender-ref ref="file_error"/>
    </root>

    <!--系统用户操作日志-->
    <logger name="sys-user" level="info">
        <appender-ref ref="sys-user"/>
    </logger>

    <logger name="zz.wyk.fk" level="info">
        <appender-ref ref="file666"/>
    </logger>

</configuration>

比较重要的是flumeTest将指定的日志文件输入到flume组件中

flume配置文件

  • 在flume的conf目录下新建一个example.conf配置文件收集logback日志文件采用avro形式
 a1.sources = r1
 a1.sinks = k1
 a1.channels = c1
 
 # 数据接入flume
 a1.sources.r1.type = avro
 a1.sources.r1.bind = localhost
 a1.sources.r1.port = 4545
 
 # 从flume接出数据: logger采用控制台或者文件输入方式
 a1.sinks.k1.type = logger
 
 # 数据通道存储方式
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000
 a1.channels.c1.transactionCapacity = 100
 
 # 将数据接入和数据输出与通道进行关系绑定
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

启动flume: flume-ng.cmd agent -conf …/conf -conf-file …/conf/example.conf -name a1 -property flume.root.logger=INFO,console

查看flume日志:
请添加图片描述

验证OK.

flume + kafka模式采集日志

配置flume采集日志,将日志信息以文件格式存储到flume中,将flume的消息输出到kafka中

springboot配置

  • springboot需要配置flume的logback和kafka配置
<dependency>
    <groupId>com.teambytes.logback</groupId>
    <artifactId>logback-flume-appender_2.10</artifactId>
    <version>0.0.9</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 配置一个kafka的消费者:指定消费者topic和groupId
@Configuration
public class FlumeConsumer {

    @KafkaListener(topics = "kafka-flume-topic", groupId = "flume-consumer")
    public void flume(String msg) {
        System.out.println("接收flume输出的消息:" + msg);
    }
}
  • 配置flume的sinks输出到kafka中的配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置sources
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4545

# 配置sinks
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList=localhost:9092
a1.sinks.k1.topic=kafka-flume-topic
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.serializer.appendNewline=false

# 配置channels通道存储方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定连接
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

springBoot + flume + kafka 收集日志logback输出到kafka中配置到此已经完结;
验证结果:
在这里插入图片描述

flume启动的命令: flume-ng.cmd agent -conf …/conf -conf-file …/conf/example.conf -name a1 -property flume.root.logger=INFO,console

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐