1. 项目背景

  • 每个系统都有日志,当系统出现问题时,需要通过日志解决问题
  • 当系统机器较少时,登录到服务器上查看日志即可满足需求
  • 当系统机器规模庞大时,登录到机器上查看日志几乎不现实

2. 解决方案

a. 把机器上的日志实时收集,统一的存储中心系统
b. 然后再对这些日志建立索引,通过搜索即可以找到对应日志
c. 通过提供界面友好的web界面,通过web即可以完成日志搜索

3. 面临的问题/挑战

a. 实时日志量非常大,每天几十亿条
b. 日志准实时收集,延迟控制在分钟级别
c. 能够水平可扩展

4. 业界方案ELK

elk4.1 ELK简介
通俗来讲,ELK是由Elasticsearch(弹性搜索引擎)、Logstash(日志收集)、Kibana(查看日志/可视化的web界面)三个开源软件的组成的一个组合体,ELK是elastic公司研发的一套完整的日志收集、分析和展示的企业级解决方案,在这三个软件当中,每个软件用于完成不同的功能,ELK又称为ELK stack。

对照架构图,我们来看下这三大神兽的工作过程

在这里插入图片描述
1. 用户发送请求到服务端
2. 服务端将需要记载的日志的数据通过网络请求传送到logstash
3. logstash对数据进行过滤清洗后,再传给Elasticsearch
4. Elasticsearch 负责对数据创建索引,进行存储
5. 用户通过访问kibana的web页面,能够实时(延迟低于一秒)查看日志

reference:
elk配置
elk详述

4.2 elk方案问题
a. 运维成本高,每增加一个日志收集,都需要手动修改配置
b. 监控缺失,无法准确获取logstash的状态
c.logstash属于server角色,必然出现集中式的热点问题
d.因为还需要做大量的match操作(格式化日志),消耗的CPU也很多,不利用scale out

5. 带有kafka的日志系统设计

在这里插入图片描述
各组件介绍:
a. Log Agent,日志收集客户端,用来收集服务器上的日志,每个服务器上都有一个log Agent
b. Kafka,高吞吐量分布式队列,linkin开发,apache顶级开源项目
c. ES,elasticsearch,开源的搜索引擎,提供基于http restful的web接口
d. Hadoop,分布式计算框架,能够对大量数据进行分布式处理的平台

5.1 kafka应用场景:
1.异步处理, 把非关键流程异步化,提高系统的响应时间健壮性
在这里插入图片描述
在这里插入图片描述

2.应用解耦,通过消息队列
在这里插入图片描述
3.流量削峰
如双十一秒杀活动,访问量突然剧增,通过消息队列可以有效实现流量削峰作用,即限制一次性传入后端处理的数据量
在这里插入图片描述

5.2 zookeeper(分布式存储系统)应用场景
在日志收集系统中,一般kafka会连接一个zookeeper
1. 服务注册&服务发现
在这里插入图片描述
当服务提供者发生扩容或缩容时,服务提供者会将服务注册到注册中心;
注册中心将服务变更通知给服务消费者,服务消费者根据注册变动信息实现自动任务调度优化(将数据分配给新增的服务提供者或将停止服务的提供者提出调用请求)

2. 配置中心(自动化配置)
1.在wep平台修改了业务,将变动信息传输到zk
2.zk将服务业务变动信息发送给相应的业务应用,
3.相应的业务应用将变动信息拉到本地修改业务配置
实现自动化配置
在这里插入图片描述

3.分布式锁
zookeeper是强一致的
多个客户端同时在Zookeeper上创建相同的znode,只有一个创建成功

5.3 zookeeper与kafka安装:
由于zookeeper和kafka基于java,先安装JDK

sudo apt-get update
sudo apt-get install openjdk-8-jdk

zookeeper安装:
ubuntu 安装zookeeper
安装zookeepr

sudo apt-get install zookeeperd	

配置zookeeper

cat /etc/zookeeper/conf/zoo.cfg | more //查看zoo.cfg的配置信息
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper

# the port at which the clients will connect
clientPort=2181

# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
server.1=zookeeper1:2888:3888

启动zookeeper

启动server
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
查看启动状态
$ sudo /usr/share/zookeeper/bin/zkServer.sh status
查看启动信息
ps -aux | grep zookeeper

链接server

sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181

查看日志信息

日志信息是可以配置的,通过zoo.cfg,默认在:
/var/log/zookeeper/zookeeper.log
可以查看日志信息查看一些错误和细节

kafka安装

ubuntu18.04下Kafka安装与部署

安装kafka:
ubuntu下可以用wget直接下载,我是下载到了/home/cyl/kafka目录

wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz

解压:

tar -zxvf kafka_2.12-3.1.0.tgz

重命名

mv kafka_2.12-3.1.0 ./kafka

创建日志存储目录

yunlongchen@cyl:~/kafka$ mkdir logs-1

修改kafka-server的配置文件

yunlongchen@cyl:~/kafka/kafka$ sudo vim config/server.properties 

修改配置文件中21、31、36和60行

broker.id=1
listeners=PLAINTEXT://10.141.184:9092 #为了能顺利启动broker 
advertised.listeners=PLAINTEXT://10.141.184:9092
log.dirs=/home/wzj/kafka/logs-1

启动Zookeeper
得先修改config/zookeeper.properties配置

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

建议使用:

$ sudo /usr/share/zookeeper/bin/zkServer.sh start
链接server
$ sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181

启动Kafka服务
启动流程参考
使用 kafka-server-start.sh 启动 kafka 服务

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/kafka-server-start.sh ./config/server.properties 

创建topic
使用 kafka-topics.sh 创建单分区单副本的 topic test

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --create --bootstrap-server 10.141.65.188:9092 --replication-factor 1 --partitions 1 --topic nginxLog
此处不能使用localhost:9092,

遇到的问题
问题一:版本指令变更

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
报错“Exception in thread “main” joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option”
在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。

问题二:

WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

必须与在配置文件里面的listeners保存一致,例如

listeners=PLAINTEXT://192.168.156.131:9092
# 在命令使用时也必须使用192.168.156.131:9092作为连接的地址,如下
./kafka-console-producer.sh --broker-list 192.168.156.131:9092 --topic userlog

查看 topic 列表

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --list --bootstrap-server 10.141.65.188:9092

产生消息,创建消息生产者

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/kafka-console-producer.sh --broker-list 10.141.65.188:9092 --topic nginxLog

消费消息,创建消息消费者

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/kafka-console-consumer.sh  --bootstrap-server 10.141.65.188:9092 --topic nginxLog --from-beginning

在生产消息的窗口,输入内容,在消费窗口就可以打印出来

查看Topic消息

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --describe --bootstrap-server 10.141.65.188:9092 --topic nginxLog
Topic: nginxLog	TopicId: t6M81RsMRPGj2tZVXaxltw	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: nginxLog	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。
“Leader”: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
“Replicas”: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
“Isr”: 是一组“同步”副本。这是replications列表的子集,当前活着并被引导到领导者。

删除topic

yunlongchen@cyl:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --delete --bootstrap-server 10.141.65.188:9092 --topic nginxLogtest

启动命令:
bin/kafka-server-start.sh -daemon config/server.properties
 
创建topic
./kafka-topics.sh --create --bootstrap-server spark01:9092 --replication-factor 1 --partitions 1 --topic test2
 
查看topic
./kafka-topics.sh --bootstrap-server spark01:9092 --list
 
向指定topic中生产数据
./kafka-console-producer.sh --broker-list spark01:9092 --topic test2
例如:{"id":"1","name":"xiaoming","age":"20"}
 
查看topic具体内容
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
 
创建消费者组
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --group kafkatest
 
查看消费者组
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --list
 
查看消费者详情
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --describe  --group kafkatest
 
消费数据
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning

6 代码实现

logagent 实现代码github
1.kafka demo:

package kafka

import (
	"fmt"

	"github.com/Shopify/sarama"
)

func main() {

	//配置kafka环境
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true

	client, err := sarama.NewSyncProducer([]string{"10.141.65.188:9092"}, config)
	if err != nil {
		fmt.Println("producer close, err:", err)
		return
	}
	defer client.Close()

	for i := 0; i < 10; i++ {
		msg := &sarama.ProducerMessage{}
		msg.Topic = "nginxLogTest"
		msg.Value = sarama.StringEncoder("this is a good test, my message is good~~12")

		pid, offset, err := client.SendMessage(msg)
		if err != nil {
			fmt.Println("send message failed,", err)
			return
		}

		fmt.Printf("pid:%v offset:%v\n", pid, offset)
	}
}
  1. tailf demo
package tailf

import (
	"fmt"
	"time"

	"github.com/hpcloud/tail"
)

func main() { //main()
	filename := "./my.log"
	tails, err := tail.TailFile(filename, tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //定位读取位置
		MustExist: false,                                //要求文件必须存在或者暴露
		Poll:      true,
	})
	if err != nil {
		fmt.Println("tail file err:", err)
		return
	}
	var msg *tail.Line
	var ok bool
	for {
		msg, ok = <-tails.Lines
		if !ok {
			fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
			time.Sleep(100 * time.Millisecond)
			continue
		}
		fmt.Println("msg:", msg)
	}
}
  1. config demo
package main

import (
	"fmt"

	"github.com/astaxie/beego/config"
)

func main() {
	conf, err := config.NewConfig("ini", "./logcollect.conf")
	if err != nil {
		fmt.Println("new config failed, err:", err)
		return
	}

	port, err := conf.Int("server::port")
	if err != nil {
		fmt.Println("read server:port failed, err:", err)
		return
	}
	fmt.Println("Port:", port)

	log_level := conf.String("logs::log_level")
	fmt.Println("log_level:", log_level)

	log_port, err := conf.Int("logs::port")
	if err != nil {
		fmt.Println("read logs:port failed, err:", err)
		return
	}
	fmt.Println("log_Port:", log_port)

	log_path := conf.String("logs::log_path")
	fmt.Println("log_path:", log_path)
}
  1. logs demo
package main

import (
	"encoding/json"
	"fmt"

	"github.com/astaxie/beego/logs"
)

func main() {
	config := make(map[string]interface{})
	config["filename"] = "./logcollect.log"
	config["level"] = logs.LevelTrace

	configStr, err := json.Marshal(config)
	if err != nil {
		fmt.Println("marshal failed, err:", err)
		return
	}

	logs.SetLogger(logs.AdapterFile, string(configStr))

	logs.Debug("this is a test, my name is %s", "stu01~~")
	logs.Trace("this is a trace, my name is %s", "stu02~~")
	logs.Warn("this is a warn, my name is %s", "stu03~~")
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐