欢迎大家关注我的公众号,添加我为好友!

       

        开始的时候感觉日志监控是比较NB的技术,感觉很神奇,那么多日志,为什么一下子就能够找到自己想要的?后来初步了解到了ELK(ElasticSearch + Logstash + Kibana)这个开源项目,然后我就开始在自己的虚拟机试着搞一下子。说的多不如做得多,我一开始也是什么都不会,但是我从0做到了1 (后期会从1到10,再从10到100)并且在一台生产的机器上成功部署了ELK,自我感觉已经很完美了。但是后来我知道了另外到一个问题,就是大批量日志涌入的时候会造成数据的丢失,传统的Logstash收集日志比较消耗资...... 一切问题铺面而来,开始的时候,脑子比较懵逼的,但是我知道,一个一个来就能够解决,事实也是如此。我当天的问题都是第二天解决的,呵呵 也解决了不是吗?好了不吹逼了!下面简单介绍一下这个可以抗住 T 级别的日志监控系统。

注意:如果单机版的ELK单个系统没有跑成功的小伙伴,建议去先搞定单机版再来学习集群,循序渐进以是一个过程

介绍一下我使用的版本:

CentOS-7-x86_64-DVD-1804.iso

jdk-8u191-linux-x64.tar.gz

kibana-6.5.0-linux-x86_64.tar.gz

elasticsearch-6.5.0.tar.gz

logstash-6.5.0.tar.gz

filebeat-6.5.0-linux-x86_64.tar.gz

kafka_2.11-2.1.0.tgz

zookeeper-3.4.12.tar.gz

1.首先来看一副 新浪的 ELK日志架构图(2015年的抗下32亿条日志)原文链接

 从上面这幅图中我们可以看到 日志被收集之后 推送到kafka集群中,使用kafka集群中间消息控件做一个队列,可以防止大数据量的涌入丢失问题。然后Logstash分批量从kafka集群中获取日志进行消费并过滤,过滤之后的结果推送到ElasticSearch集群中保存,最后通过Kibana前台展示界面进行展示。这个框架图还是比较容易理解的,既然理解了就放手去做吧。

2.虚拟机安装

我在这里部署了三台虚拟机,每台1G内存,后来发现整体运行的时候根本跑不起来,排查了好久,请教了好久大神才知道是内存的原因,所以建议有条件的每个都搞4G这样,可以避免很多错误。还需要注意的是,需要为每个虚拟机配置静ip,如果不知道为啥配置静态ip,那估计是你还没感受到电脑切换不同WiFi之后虚拟机的ip变化。因为安装涉及到许多配置文件,配置文件里面有ip,如果你感觉每次该配置文件没问题的话,我只能说,还是太年轻呀,哈哈哈哈哈!!!!!!

安装教程我就不教了,这里分享一下配置静态ip的方法吧,也是我从大佬哪里学来的哟,嘻嘻嘻!!!

最先要做的就是虚拟机指定IP配置:

1.检查VMware虚拟机服务是否都开启了。

2.打开VMware虚拟机网络配置。

3.选择“更改设置”。

4.一下配置按照我给的图片来就行了,记得点击“确定”。

5.下一步就是修改电脑VMnet8属性(在哪找的我就不说了,电脑上多找找就出来了)。

然后重启虚拟机,然后配置虚拟机的ip就可以了。

6.登录虚拟机然后执行命令“vim /etc/sysconfig/network‐scripts/ifcfg-ens33”编辑配置文件。

第一台主机IP:192.168.252.131(其他的参照下面配置即可)

第二台主机IP:192.168.252.132

第三台主机IP:192.168.252.133

每一台机器配置好之后切记要输入命令“ifconfig” 查看下网卡和IP配置,然后ping一下网络主机看一下网络是否通畅。

3.安装JDK(安装好的可以忽略,但是要注意版本)

如果你还不知道为啥安装jdk那就需要去ElasticSearch官网好好看看官方文档了,而且官方建议的是1.8版本以上的,最低也要是是1.6 为了方便起见,我这里使用了1.8。

1.使用一个趁手的工具把JDK上传到服务器上(你也可以下载,但是生产的环境有的时候是没有外网的,我使用的是SourceCRT)。

2.最好在服务器上创建一个目录来安装ELK 以后启动和修改文件都比较好找。

3.解压压缩包:“tar -zxvf jdk-8u191-linux-x64.tar.gz”。

4.解压好之后需要添加环境变量,输入命令:“vim /etc/profile”并且定位到最后位置,添加以下命令(JAVA_HOME必须根据实际目录来, 建议在解压后的目录下输入命令 pwd 然后拷贝路径并粘贴)。

JAVA_HOME= /home/es/elk/jdk1.8.0_191
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH

5.保存并退出,执行命令:“source /etc/profile”是配置文件重新生效,然后输入:“java -version

6.到此为止java环境安装成功,参考以上安装其他两台服务器JDK。

4.修改主机名

这个其实修改不修改都行,我想生产上你也没权限修改,除非你是大boss。既然是自己搞还是修改一下,正规一下吧。

1.输入命令:“hostnamectl set-hostname houry”。

2.输入命令:“reboot ”。

3.再次输入命令:“hostname”。

到此为止主机名已经被正确修改。

5.添加用户组

因为ElasticSearch不允许在root用户下启动所以还需要创建一个用户并授权,所以统一创建新的用户并在新用户上传文件(如果是root用户传文件,则新建的用户没有权限访问文件权限。也许你会想到赋权,但是正规生产环境是不允许这样操作来降低服务器安全性的,所以如果是root用户,可以利用CRT传项目的时候在新用户下传,简便办法就是在 crt 下切换用户登录,然后使用快捷键“alt+p”上传)这是我遇到的第一个坑。

1.输入命令添加用户:“useradd es ” 。

2.给刚刚创建的用户设置密码:“passwd es 添加密码” ,连续两次输入密码即可(如果输入有问题不合规范会有提示,可以忽略,确认即可)。

6.安装ElasticSearch集群

做完了之前准备工作之后就到了激动人心的安装集群的时刻,说实话,我在这里跳了不少坑,但是最后还是一步一步爬上来了,切记切换到创建的用户下安装集群,否则root用户状态下你装了也没有权限。

1.还是要把下载好的压缩包传到服务器上,并且移动到指定位置然后输入命令:“tar -zxvf elasticsearch-6.5.0.tar.gz”解压。

2.使用命令 cd 到解压过后的目录下,找到config文件夹然后编辑“elasticsearch.yml”文件。

vim  /home/es/elk/elasticsearch-6.5.0/config”(注意你的文件位置)。

3.更改文件配置,注意“cluster.name”三台机器要使用相同的名字,因为ElasticSearch会自动去发现集群(相同IP网段下)。还有就是ElasticSearch集群我做了主节点和数据节点分离,防止脑裂问题的出现(不明白脑裂的问题可以去官网查看说明:脑裂问题)解决办法就是选举或者节点分离。

为防止数据丢失,必须配置 “discovery.zen.minimum_master_nodes”,以便每个符合主节点的节点都知道必须可见的最大主节点数,才能形成集群,但是我在设置的时候有一些问题,下面会给出论证。

配置规则是:(master_eligible_nodes / 2) + 1

主节点:192.168.252.131 “elasticsearch.yml”配置文件。

还需要注意的是“network.host”的设置,网上很多设置成“0.0.0.0”全网断,这样是不安全的,最好还是设置成本机或者localhost(我没尝试过,感兴趣的可以尝试一下)。

需要注意的是节点配置,一点要认真仔细。

其他两台机器也可以参照这样的配置,但是有一些地方需要进行修改。 

数据节点:192.168.252.132 “elasticsearch.yml”配置文件。

数据节点:192.168.252.133 “elasticsearch.yml”配置文件。 

这里有我从网上找到的一些关于这个配置文件的介绍。

cluster.name:elasticsearch
配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
node.name:”FranzKafka”
节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,其中有很多作者添加的有趣名字。
node.master:true
指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。
node.data:true
指定该节点是否存储索引数据,默认为true。
index.number_of_shards:5
设置默认索引分片个数,默认为5片。
index.number_of_replicas:1
设置默认索引副本个数,默认为1个副本。
path.conf:/path/to/conf
设置配置文件的存储路径,默认是es根目录下的config文件夹。
path.data:/path/to/data
设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:
path.data:/path/to/data1,/path/to/data2
path.work:/path/to/work
设置临时文件的存储路径,默认是es根目录下的work文件夹。
path.logs:/path/to/logs
设置日志文件的存储路径,默认是es根目录下的logs文件夹
path.plugins:/path/to/plugins
设置插件的存放路径,默认是es根目录下的plugins文件夹
bootstrap.mlockall:true
设置为true来锁住内存。因为当jvm开始swapping时es的效率会降低,所以要保证它不swap,可以把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit-lunlimited`命令。
network.bind_host:192.168.0.1
设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。network.publish_host:192.168.0.1
设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。
network.host:192.168.0.1
这个参数是用来同时设置bind_host和publish_host上面两个参数。
transport.tcp.port:9300
设置节点间交互的tcp端口,默认是9300。
transport.tcp.compress:true
设置是否压缩tcp传输时的数据,默认为false,不压缩。
http.port:9200
设置对外服务的http端口,默认为9200。
http.max_content_length:100mb
设置内容的最大容量,默认100mb
http.enabled:false
是否使用http协议对外提供服务,默认为true,开启。
gateway.type:local
gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器,其它文件系统的设置方法下次再详细说。
gateway.recover_after_nodes:1
设置集群中N个节点启动时进行数据恢复,默认为1。
gateway.recover_after_time:5m
设置初始化数据恢复进程的超时时间,默认是5分钟。
gateway.expected_nodes:2
设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。
cluster.routing.allocation.node_initial_primaries_recoveries:4
初始化数据恢复时,并发恢复线程的个数,默认为4。
cluster.routing.allocation.node_concurrent_recoveries:2
添加删除节点或负载均衡时并发恢复线程的个数,默认为4。
indices.recovery.max_size_per_sec:0
设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。
indices.recovery.concurrent_streams:5
设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。
discovery.zen.minimum_master_nodes:1
设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)
discovery.zen.ping.timeout:3s
设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。
discovery.zen.ping.multicast.enabled:false
设置是否打开多播发现节点,默认是true。
discovery.zen.ping.unicast.hosts:[“host1″,”host2:port”,”host3[portX-portY]”]
设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。
其他两个找例子安装(Linux 大神肯定有其他办法) 有些修改需要注意

 

如果你仔细看了上面的配置文件,聪明的你也许会发现问题,一个困扰了我一整天的问题,就是官网上说的防止数据丢失的问题,为此我专门做了一些测试。

192.168.252.131 :mater ( 作为主节点 )

192.168.252.132 :data  (作为数据节点)

192.168.252.133 :data  (作为数据节点)

测试主机:192.168.252.131 :设置“discovery.zen.minimum_master_nodes 3”启动报错(错误提示:需要两个master节点,然而我只有一个)。

测试主机192.168.252.132 :设置“discovery.zen.minimum_master_nodes 2启动正常(设置成 3 失败)。

测试主机192.168.252.133 :设置discovery.zen.minimum_master_nodes 2启动正常(设置成 3 失败)。

猜想:因为master 只有一个,设置成 2 会报错 ,所以如果进行了角色分离配置那么这个值是仅跟当前节点是什么节点有关,而不是跟服务器 es安装的数量有关。

 4.编辑:“vim jvm.options”设置JVM堆大小。

-Xms512m
-Xmx512m

5.在启动之前最好修改一下配置文件,因为我深有体会,不修改就没法启动ElasticSearch,更不用说ElasticSearch集群了(不信的小伙伴可以先启动,在修改,也是没关系的,毕竟报错亲自见见也没有什么坏处)。

错误:启动之后报如下错误

办法:统一切换到root用户下,如果你要问我为啥?那我只能告诉你,普通用户没有权限,我也是被这个搞过一次。

6.输入命令:“vim /etc/security/limits.conf ”。

* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096

7.输入命令:“vim /etc/sysctl.conf”。

vm.max_map_count = 655360

8.保存之后再次输入命令:“sysctl -p”。

9.我在搭建集群环境的时候通过查询资料还有一些其他的错误,因为我安装过程中没有出现,所以也就不粘贴出来了。

如果以上的三台环境你都安装并且配置好了,那么接下来就是启动测试环节。

7.启动ElasticSearch集群测试

我再说一遍,记得切换用户,记得关闭防火墙,否者三个节点是不会通的!不通的!!不通的!!!

1.启动也是有顺序的,我们需要首先启动主节点,数据节点需要在主节点启动之后在启动的,否则就会出现数据节点找不到主节点的报错,也就是说,主节点你要是启动不起来,就要想办法,千方百计的启动起来,才能启动数据节点。

2.进入到主节点bin目录下执行命令“./elasticsearch” 在控制台看日志分析ElasticSearch是否启动成功(如果想后台执行则使用命令:“./elasticsearch -d”)。

看到主节点已经起来了,然后再去启动数据节点并观察主节点日志。。。。。。 

通过看日志可以发现另外两个节点已经成功加入,就问你 开心不开心? 反正我是开心的不行不行的,哈哈哈哈!!!!!! 

3.我们通过浏览器或者其他的请求工具来查看节点状态:

浏览器:

curl '192.168.252.131:9200/_cluster/health?pretty'
{
  "cluster_name" : "es-cluster",  #集群名称
  "status" : "green",           # 集群状态 green表示集群健康 yellow red 表示有问题
  "timed_out" : false,          # 是否有超时
  "number_of_nodes" : 3,      # 集群中节点的数量
  "number_of_data_nodes" : 2,  # 集群中data的数量
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

RestletClient:(我为什么使用这个工具?因为它好看呀^_^)

 4.我们可以看到集群状态是“green”,以下摘自《ElasticSearch权威指南-中文版》

当我们查看集群状态的时候,我们可能得到绿色、黄色或红色三种状态。绿色代表一切正常(集群功能齐 全);黄色意味着所有的数据都是可用的,但是某些复制没有被分配(集群功能齐全);    红色则代表因为 某些原因,某些数据不可用。注意,即使是集群状态是红色的,集群仍然是部分可用的(它仍然会利用可 用的分片来响应搜索请求),但是可能你需要尽快修复它,因为你有丢失的数据。

到此为止你可以放松休息一下,因为事情要一步一来,安装ElasticSearch已经很不容易了。

8.安装zookeeper集群

为什么要安装zookeeper呢?因为kafka需要呀!为什么要安装zookeeper集群呢?因为kafka集群需要呀!!使用kafka自带的不行吗?不行!!!安装难吗?哈哈哈,会了就不难了!!!!!!

1.我们需要去官网找稳定版本下载:zookeeper稳定版本下载

2.上传至服务器,然后输入命令“tar –zxvf zookeeper-3.4.12.tar.gz”解压。

3.到 conf 目录下查看:zoo_sample.cfg这个文件是官方给我们的zookeeper的样板文件。我们需要复制一份名为zoo.cfg的文件,zoo.cfg是zookeeper官方指定的文件命名规则。我们以在第一台虚拟机上的操作为例(上面的操作都是在第一台虚拟机上,你需要在每台虚拟机上都执行上述以及本次操作,相信自己你能行的!!!!!!)。

4.编辑“zoo.cfg”配置文件。

配置详解:

# The number of milliseconds of each tick
tickTime=2000   //Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,每隔tickTime时间就会发送一个心跳
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10    //配置 Zookeeper 接受客户端(此客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已超过initLimit个tickTime长度后 Zookeeper 服务器还没有收到客户端的返回信息,则表明客户端连接失败。总的时间长度就是 initLimit * tickTime 秒。
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5     //配置 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 syncLimit * tickTime 秒 
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/app/zookeeper/data     //Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里
dataLogDir=/app/zookeeper/logs  //若没提供的话则用dataDir。zookeeper的持久化都存储在这两个目录里。dataLogDir里是放到的顺序日志(WAL),指定的目录下有version-2文件夹(下有log.1文件)。而dataDir里放的是内存数据结构的snapshot,便于快速恢复。为了达到性能最大化,一般建议把dataDir和dataLogDir分到不同的磁盘上,以充分利用磁盘顺序写的特性。
# the port at which the clients will connect
clientPort=2181     //Zookeeper服务器监听的端口,以接受客户端的访问请求。
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=10.55.2.81:2888:3888
server.2=10.55.2.82:2888:3888
server.3=10.55.2.83:2888:3888
// server.n n是一个数字,表示这个是第几号服务器;“=”号后面是对应几号服务器的IP地址,2888是这个服务器与集群中的 Leader 服务器交换信息的端口;3888表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,此端口就是用来执行选举时服务器相互通信的端口。

5.输入命令:“mkdir zkdata zkdatalog”,在“zookeeper”目录下创建两个文件夹,也就是上面设置的“dataDir”和“dataLogDir”。

zkdata :          /home/es/elk/zookeeper-3.4.12/zkdata

zkdatalog:      /home/es/elk/zookeeper-3.4.12/zkdatalog

6.创建myid文件。以现在所在的第一台虚拟机192.168.250.131为例,对应server.1,通过上边的配置信息可以查到。创建myid文件的目的是为了让zookeeper知道自己在哪台服务器上,例如现在所在的虚拟机是192.168.252.131,它对应的id是1,那么就在myid文件中写入1。

命令:“echo "1">/home/es/elk/zookeeper-3.4.12/zkdata/myid” 切记完成之后记得  cat 命令一下,查看是否添加成功。

7.另外两台参考此配置完成安装。

echo "2">/home/es/elk/zookeeper-3.4.12/zkdata/myid

echo "3">/home/es/elk/zookeeper-3.4.12/zkdata/myid

9.启动zookeeper

集中注意力:因为前方有坑!坑!!坑!!!

1.zookeeper的开启顺序是有要求的,要先开启leader,剩下的节点才能正常启动。那么问题来了,究竟哪个是leader呢?管他呢,直接给三个都起来不就行了!哈哈,所以也就是有时候在起了一个然后查看节点状态的时候为啥报错,全部起来之后在查看就好了。

2.进入到bin目录下 执行命令:“./zkServer.sh start” 看到下面提示就是成功了(再依次启动另外两台)。

3.都启动完毕后我们再来查看状态,还是进入到bin目录下,执行命令:“./zkServer.sh status”。

可以看到一台leader 另外两台都是 follower。

到了这里是不是在开心一下子呢?哈哈哈,开心归开心,还有很多坑呢。。。。。。

10.Kafka集群安装

前面zookeeper的安装已经为我们做了不少工作了,我们只需要在进行kafka的安装就行了。

1.上传压缩输入命令:“tar -zxvf kafka_2.11-2.1.0.tgz”。

2.进入到 config目录下,编辑配置文件 :“ vim server.properties”。( 其他两台也参照这个配置即可,但是有坑,仔细往下看!!! )。

IP:192.168.252.131

IP:192.168.252.132

IP:192.168.252.133

看下面例子:

192.168.252.131主机:zookeeper.connect=localhost:2181           启动三秒后宕机

192.168.252.132主机:zookeeper.connect=localhost:2181           启动三秒后宕机

192.168.252.132主机:zookeeper.connect=localhost:2181           启动三秒后宕机

 

192.168.252.131主机:zookeeper.connect=192.168.252.31:2181,192.168.252.131:2181,192.168.252.133:2181      启动三秒后宕机

192.168.252.132主机:zookeeper.connect=192.168.252.31:2181,192.168.252.131:2181,192.168.252.133:2181      启动三秒后宕机

192.168.252.132主机:zookeeper.connect=192.168.252.31:2181,192.168.252.131:2181,192.168.252.133:2181      启动三秒后宕机

 

192.168.252.131主机:zookeeper.connect=192.168.252.31:2181,192.168.252.131:2181,192.168.252.133:2181      启动成功

192.168.252.132主机:zookeeper.connect=localhost:2181                                                                                                启动成功

192.168.252.132主机:zookeeper.connect=localhost:2181                                                                                                启动成功

我当时的感受是(MMP)。。。。。。也许是我的机器有毛病,希望大家遇到问题的时候不要气馁!!!

kafka详细配置(摘自网络)。

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

11.Kafka集群启动

1.启动kafka集群需要到 bin目录下执行命令:“./kafka-server-start.sh -daemon ../config/server.properties” 然后输入"jps"查看。如果上面的你按照我的配置之后kafka还是一秒躺,就需要删除一个文件,反正我的就是,删除之后就好了。

启动成功的提示:

2.如果失败就需要根据日志删除一个文件夹的全部内容,具体看日志。

cd /tmp/kafka-logs/

rm -rf *

然后再次启动。。。。。。

3.这里推荐两个Kafka可视化集群查看工具。

http://www.kafkatool.com/download.html  选择自己电脑版本下载安装。

https://blog.csdn.net/dabokele/article/details/52373960 这个比较好看点。

4.使用工具查看

到此kafka集群就算完成了,真的很不错了,继续加油。

12.Filebeat安装

因为Filebeat收集日志比较轻,占用资源较少,所以使用Filebeat收集日志推送到kafka集群中,下面就让我们一起来做吧。

1.下载压缩包,传递到服务器上然后输入命令:“tar -zxvf filebeat-6.5.0-linux-x86_64.tar.gz”。

2.编辑配置文件:“vim filebeat.yml”。

3.注意 : 如果使用 “%{[type]}”来配置的话需要注意版本问题,因为6.0.0之后,document_type这个选项被deprecated了,也就没法通过%{[type]}来访问。因此,同样的配置会造成filebeat无法获得topic的配置,进而不能给kafka发送消息。可以通过如下配置:

- type: log

  enabled: true

  paths:

    - /var/log/test1.log

    fields:

    log_topics: test1

output.kafka:

  enabled: true

  hosts: [{{kafka_url}}]

  topic: '%{[fields][log_topics]}'

#还有一点需要特别注意 就是上面的格式 也就是 需要注意 .yml文件的格式问题。

#对应的,在logstash上,如果要分别解释对应的topic:

input {

  kafka{

        bootstrap_servers => "{{kafka_url}}"

        topics => ["test1","test2"]

        codec => "json"

        consumer_threads => 2

        enable_auto_commit => true

        auto_commit_interval_ms => "1000"

        group_id=> test

  }

}

filter {

  if[fields][log_topics] == "test1" {

    grok {

      patterns_dir => ["./patterns"]

      match => {

        "message" => "%{PLATFORM_SYSLOG}"

      }

    }

  }

  if[fields][log_topics] == "test2" {

    grok {

      patterns_dir => ["./patterns"]

      match => {

        "message" => "%{IAM_SYSLOG}"

      }

    }

  }

4.做好以上配置之后就开始启动filebeat做测试吧,在目录下执行命令:“./filebeat -e -c filebeat.yml”(测试之前如果kafka集群里面有相同的topic 先删除它,让filebeat为我们自动创建)。

执行命令之前 的topics。 

看控制台日志:

执行命令之后的topics。

可以看到filebeat成功收集了日志并推送到了kafka中(其他三台主机按照这个例子就行了)。

14.Kibana安装

kibana主要是用来做一个可视化界面,还是需要上传文件到服务器(只用在主机上安装就行了)。

1.输入命令解压:“tar -zxvf kibana-6.5.0-linux-x86_64.tar.gz

2.编辑配置信息:“vim /home/es/elk/kibana-6.5.0-linux-x86_64/config/kibana.yml

15.启动kibana

进入到bin目录下执行命令:“./kibana” 稍等一会在浏览器中输入“http://192.168.252.131:5601”(注意自己的ip)成功之后就是如下界面,还是比较好看的,这个界面提示,我们没有索引,让我们创建一个。

kibana工具大家下去自行学习吧,因为功能很强大,特别是结合插件!!!!!!

16.Logstash安装

为什么要在最后安装Logstash呢?我也不知道为啥,估计是为了方便吧,安装完之后,做个提取转发就行了,嘻嘻嘻!!!

1.上传文件到服务器并使用命令解压:“tar -zxvf logstash-6.5.0.tar.gz”。

2.测试logstash:“/home/es/elk/logstash-6.5.0/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' ”然后随便输入敲回车,等几秒钟,出现下面结果就是安装成功。

{
       "message" => "hello world",
      "@version" => "1",
    "@timestamp" => 2018-11-25T14:11:09.006Z,
          "host" => "houry"
}

3.在Logstash目录下创建文件:“mkdir etc”, 然后进入etc 目录 并编辑配置文件:“vim logtstash_kafka”,输入以下内容并保存。其实这里还是有个坑的,就是出去部分,有个控制批量的设置,之前设置过大然后服务起不来,后来我去掉了,参照下面这个就可以。

input{
    kafka{

        bootstrap_servers => ["192.168.252.131:9092,192.168.252.132:9092,192.168.252.133:9092"]

        group_id => "logstash-hdfs2"
        auto_offset_reset => "earliest"
        consumer_threads => 5
        decorate_events => false

        topics => ["test"]
        type => "filebeat"
      }
}


output {
    elasticsearch {
        hosts => ["192.168.252.131:9200","192.168.252.132:9200","192.168.252.133:9200"]
        index => "tomcat_logs_index_%{+YYYY.MM.dd}"
        sniffing => true
        template_overwrite => true
    }
}

4.注意点就是topics的设置要和kafka里面设置的一样,还有kafka集群地址,我还是希望按照我上面的来一步一步来,先成功在优化。

还有一个就是配置的索引中有“/” 反正我是尝试过,最后ElasticSearch也没有产生索引。

5.启动logstash : “nohup /home/es/elk/logstash-6.5.0/bin/logstash  -f  /home/es/elk/logstash-6.5.0/etc/logtstash_kafka --path.data=/home/es/elk/logstash-6.5.0/logs/log1 > /dev/null 2>&1 &

6.等待大约三十秒刷新kibana即可发现创建的索引。

7.配置我们的缩索引开头。

9.选择按照时间戳。

10.然后点击发现就可以看到收集的日志了。

到此为止ELK集群+Kafka集群+FileBeat的安装就全部完成,如果还有问题可以与我讨论,毕竟三个臭皮匠顶上一个诸葛亮。

总结:

1.流程启动顺序是FIleBeat收集推动到kafka然后logstash过滤推动到ElasticSearch,再使用Kibana展现。

2.解决问题要一点一点的来,学会看日志,和使用Google的stackoverflow,毕竟“同性交友”最大的网站。

3.努力去学习英语,不然看不懂英文文档,还要靠Google 翻译。

4.实在决绝不了了 就去百度 找几个相关知识的群加群进去死不要脸的问。

5.相关的Linux基础知识太差。

6.努力上进。。。。。。

 

 

欢迎大家关注我的公众号,添加我为好友!

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐