企业日志中心——ELFK+kafka+zookeeper部署

一、组件介绍

1.Elasticsearch

是一个基于Lucene的搜索服务器。提供搜索、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定、可靠、快速,安装使用方便。

2.Logstash

主要是用来日志的搜集、分析、过滤日志的工具。用于管理日志和事件的工具,你可以用它去收集日志、转换日志、解析日志并将他们作为数据提供给其他模块调用,例如搜索、存储等。

3.Kibana

是一个优秀的前端日志展示框架,他可以非常详细的将日志转化为各种图表,为用户提供强大的数据可视化支持,它能够搜索、展示存储在Elasticsearch中索引数据。使用它可以很方便的用图表、表格、地图展示和分析数据。

4.Kafka

数据缓冲队列。作为消息队列解耦了处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  • 以容错持久的方式存储记录流。
  • 处理记录发生的流。

5.Filebeat

隶属于Beats,轻量级数据收集引擎。基于原先Logstash-fowarder的源码改造出来。换句话说:Filebeat就是新版的Logstash,也会是ELK Stack在Agent的第一选择,目前Beats包含四种工具:

  • Packetbeat(搜索网络流量数据)
  • Metricbeat(搜索系统、进程和文件系统级别的CPU和内存使用情况等数据。通过从操作系统和服务收集指标,帮助监控服务器及其托管的服务。)
  • Filebeat(搜集文件数据)
  • Winlogbeat(搜集windows事件日志数据)

二、环境介绍

主机名IP地址安装软件配置要求
node1192.168.222.20ES/zookeeper/kafka/kibanaMem>=4G
node2192.168.222.30ES/zookeeper/kafkaMem>=4G
node3192.168.222.40Logstash/zookeeper/kafka/apacheMem>=4G
node4192.168.222.50Fliebeat/zookeeper/kafkaMem>=4G

三、搭建架构

官网地址:https://www.elastic.co/cn/

官方搭建文档:https://www.elastic.co/guide/index.html

这个架构图从左到右,总共分为5层,每层实现的功能和含义分别介绍如下:

第1层:数据采集层

数据采集层位于最左边的业务服务器集群上,在每个业务服务器上面安装了Filebeat做日志收集,然后把采集到的原始日志发送到Kafka+Zookeeper集群上。

第2层:消息队列层

原始日志发送到Kafka+Zookeeper集群上后,会进行集中存储,此时,Filbeat是消息的生产者,存储的消息可以随时被消费。

第3层:数据分析层

Logstash作为消费者,会去Kafka+Zookeeper集群节点实时拉取原始日志,然后将获取到的原始日志根据规则进行分析、清洗、过滤,最后将清洗好的日志转发至Elasticsearch集群。

第4层:数据持久化存储

Elasticsearch集群在接收到Logstash发送过来的数据后,执行写磁盘,建索引库等操作,最后将结构化的数据存储到Elasticsearch集群上。

第5层:数据查询、展示层

Kibana是一个可视化的数据展示平台,当有数据检索请求时,它从Elasticsearch集群上读取数据,然后进行可视化出图和多维度分析。

四、搭建ELFK+Kafka+Zookeeper

1.准备阶段

#关闭防火墙关闭核心防护
#同步时间源(全部节点操作)
[root@node1 opt]# ntpdate ntp1.aliyun.com
20 Sep 16:06:17 ntpdate[84829]: adjust time server 120.25.115.20 offset 0.008022 sec
#JDK的安装(全部节点操作)将软件包拖入/opt目录
[root@node1 opt]# tar zxf jdk-8u91-linux-x64.tar.gz -C /usr/local
[root@node1 opt]# cd /usr/local/
[root@node1 local]# mv jdk1.8.0_91/ jdk
[root@node1 local]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
[root@node1 local]# source /etc/profile
[root@node1 local]# java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

#配置hosts(全部节点)
[root@node1 local]# vim /etc/hosts
192.168.222.20 node1
192.168.222.30 node2
192.168.222.40 node3
192.168.222.50 node4

2.安装zookeeper

#拖入安装包并解压(全部节点)
[root@node1 opt]# tar zxf apache-zookeeper-3.5.7-bin.tar.gz -C /usr/local
#修改配置文件(全部节点)
[root@node1 apache-zookeeper-3.5.7-bin]# cd /usr/local/apache-zookeeper-3.5.7-bin/conf/
[root@node1 conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@node1 conf]# cp zoo_sample.cfg zoo.cfg
[root@node1 conf]# vim zoo.cfg
#2行,通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
tickTime=2000
#5行,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
initLimit=10
#8行,Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
syncLimit=5
#12行,修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataDir=/usr/local/apache-zookeeper-3.5.7-bin/data
#13行,添加,指定存放日志的目录,目录需要单独创建
dataLogDir=/usr/local/apache-zookeeper-3.5.7-bin/logs
#15行,客户端连接端口
clientPort=2181
#末行添加集群信息
server.1=192.168.222.20:3188:3288
server.2=192.168.222.30:3188:3288
server.3=192.168.222.40:3188:3288
server.4=192.168.222.50:3188:3288
注:server.A=B:C:D
A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
B是这个服务器的地址。
C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。 
#创建数据目录和日志目录(全部节点)
[root@node1 conf]# cd /usr/local/apache-zookeeper-3.5.7-bin/
[root@node1 apache-zookeeper-3.5.7-bin]# ls
bin  conf  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.txt
[root@node1 apache-zookeeper-3.5.7-bin]# mkdir data
[root@node1 apache-zookeeper-3.5.7-bin]# mkdir logs
[root@node1 apache-zookeeper-3.5.7-bin]# ls
bin   data  lib          logs        README.md
conf  docs  LICENSE.txt  NOTICE.txt  README_packaging.txt
#创建myid文件(全部节点)
[root@node1 apache-zookeeper-3.5.7-bin]# echo 1 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
[root@node2 apache-zookeeper-3.5.7-bin]# echo 2 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
[root@node3 apache-zookeeper-3.5.7-bin]# echo 3 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
[root@node4 apache-zookeeper-3.5.7-bin]# echo 4 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
#创建zookeeper启动脚本(全部节点)
[root@node1 ~]# vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/apache-zookeeper-3.5.7-bin'
case $1 in
start)
    echo "---------- zookeeper 启动 ------------"
    $ZK_HOME/bin/zkServer.sh start
;;
stop)
    echo "---------- zookeeper 停止 ------------"
    $ZK_HOME/bin/zkServer.sh stop
;;
restart)
    echo "---------- zookeeper 重启 ------------"
    $ZK_HOME/bin/zkServer.sh restart
;;
status)
    echo "---------- zookeeper 状态 ------------"
    $ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac
#设置开机自启并启动(全部节点)
[root@node1 ~]# chmod +x /etc/init.d/zookeeper 
[root@node1 ~]# chkconfig --add zookeeper
[root@node1 ~]# systemctl start zookeeper
[root@node1 ~]# systemctl status zookeeper

3.安装Kafka

#安装kafka软件包
[root@node1 opt]# rz -E
rz waiting to receive.
[root@node1 opt]# ls
apache-zookeeper-3.5.7-bin.tar.gz  kafka_2.13-2.8.1.tgz
jdk-8u91-linux-x64.tar.gz          rh
[root@node1 opt]# tar zxf kafka_2.13-2.8.1.tgz -C /usr/local
#修改配置文件
[root@node1 opt]# cd /usr/local/kafka_2.13-2.7.1/config/
[root@node1 config]# cp server.properties server.properties.bak
[root@node1 config]# vim server.properties
##31行,指定监听的IP(本机的ip地址)和端口
listeners=PLAINTEXT://192.168.222.20:9092
##36行,指定监听的IP(本机的ip地址)和端口
advertised.listeners=PLAINTEXT://192.168.222.20:9092
##42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.network.threads=3
##45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
num.io.threads=8
##48行,发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
##51行,接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
##54行,请求套接字的缓冲区大小
socket.request.max.bytes=104857600
##60行,kafka运行日志存放的路径,也是数据存放的路径
log.dirs=/usr/local/kafka_2.13-2.8.1/logs
##65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.partitions=1
##69行,用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
##103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.retention.hours=168
##110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
log.segment.bytes=1073741824
##123行,配置连接Zookeeper集群地址
zookeeper.connect=192.168.222.20:2181,192.168.222.30:2181,192.168.222.40:2181,192.168.222.50:2181
#修改环境变量
[root@node1 ~]# echo "export KAFKA_HOME=/usr/local/kafka_2.13-2.8.1" >> /etc/profile
[root@node1 ~]# echo "export PATH=$PATH:$KAFKA_HOME/bin" >> /etc/profile
[root@node1 ~]# source /etc/profile
#配置kafka启动脚本
[root@node1 ~]# vim /etc/init.d/kafka
[root@node1 ~]# vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka_2.13-2.8.1'
case $1 in
start)
    echo "---------- Kafka 启动 ------------"
    ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
    echo "---------- Kafka 停止 ------------"
    ${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
    $0 stop
    $0 start
;;
status)
    echo "---------- Kafka 状态 ------------"
    count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac
#开启服务并设置开机自启动
[root@node1 ~]# chmod +x /etc/init.d/kafka 
[root@node1 ~]# chkconfig --add kafka
[root@node1 ~]# systemctl start kafka
[root@node1 ~]# systemctl status kafka

Kafka命令行操作(单节点)
3.1.创建topic
[root@node1 ~]# cd /usr/local/kafka_2.13-2.8.1/
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.222.20:2181,192.168.222.30:2181,192.168.222.40:2181,192.168.222.50:2181 --replication-factor 1 --partitions 3 --topic test

注:
--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
--replication-factor:定义分区副本数,1 代表单副本,不能超过broker(服务器)个数
--partitions:定义分区数(不定义将自动使用配置文件中的设置)
--topic:定义 topic 名称

3.2.查看当前topic列表
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --list --zookeeper 192.168.222.20

3.3.查看topic详细信息
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --describe --zookeeper 192.168.222.20

3.4.发布消息
[root@node1 kafka_2.13-2.8.1]# bin/kafka-console-producer.sh --broker-list 192.168.222.20:9092,192.168.222.30:9092,192.168.222.40:9092,192.168.222.50:9092 --topic test
>test
>123
>abcd
>^C

3.5.消费消息
[root@node1 kafka_2.13-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.222.20:9092,192.168.222.30:9092,192.168.222.40:9092,192.168.222.50:9092 --topic test --from-beginning

–from-beginning:会把主题中以往所有的数据都读取出来

3.6.扩大分区
root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --zookeeper 192.168.222.20:2181 --alter --topic test --partitions 6

3.7.删除主题
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --delete --zookeeper 192.168.222.20:2181 --topic test

3.8.kafka-topics.sh脚本中的参数
参数描述
alter用于修改主题
config 键值对创建或修改主题时,用于设置主题级别的参数
create创建主题
delete删除主题
delete-config 配置名称删除主题级别被覆盖的配置
describe查看主题的详细信息
disable-rack-aware创建主题时不考虑机架信息
help打印帮助信息
if-exists修改或删除主题时使用,只有当主题存在时才会执行动作
if-not-exists创建主题时使用,只有主题不存在时才会执行动作
list列出所有可用的主题
partitons 分区数创建主题或增加分区时指定的分区数
replica-assignment 分配方案手工指定分区副本分配方案
replication-factor 副本数创建主题时指定副本因子
topic 主题名称指定主题名称
topics-with-overrides使用describe查看主题信息时,只展示包含覆盖配置的主题
unavailable-partitions使用describe查看主题信息时,只展示包含没有leader副本的分区
under-replicated-partitions使用describe查看主题信息时,只展示包含失效副本的分区
zookeeper指定连接的zk的地址,可以改成 --bootstrap-server

4.安装Elasticsearch

#安装elasticsearch-rpm软件包
[root@node1 opt]# rpm -ivh elasticsearch-5.5.0.rpm 
#加载系统服务
[root@node1 opt]# systemctl daemon-reload
[root@node1 opt]# systemctl enable elasticsearch.service
#更改Elasticsearch主配置文件
[root@node1 ~]# cd /etc/elasticsearch/
[root@node1 elasticsearch]# cp elasticsearch.yml elasticsearch.yml.bak
[root@node1 elasticsearch]# vim elasticsearch.yml
#17行;取消注释,修改;集群名字
cluster.name: my-elfk-cluster
#23行;取消注释,修改;节点名字(node2修改成node2)
node.name: node1
#33行;取消注释,修改;数据存放路径
path.data: /data/elfk_data
#37行;取消注释,修改;日志存放路径
path.logs: /var/log/elasticsearch
#43行;取消注释,修改;不在启动的时候锁定内存
bootstrap.memory_lock: false
#55行;取消注释,修改;提供服务绑定的IP地址,0.0.0.0代表所有地址
network.host: 0.0.0.0
#59行;取消注释;侦听端口为9200(默认)
http.port: 9200
#68行;取消注释,修改;集群发现通过单播实现,指定要发现的节点 node1、node2
discovery.zen.ping.unicast.hosts: ["node1", "node2"]

#检验配置
[root@node1 elasticsearch]# grep -v "^#" /etc/elasticsearch/elasticsearch.yml

#创建数据存放路径并授权
[root@node1 opt]# mkdir -p /data/elfk_data     #创建数据存放的目录
[root@node1 opt]# chown elasticsearch:elasticsearch /data/elfk_data/     #修改存放数据目录的属主和属组
#启动es查看端口是否成功开启
[root@node1 opt]# systemctl start elasticsearch.service     #开启服务
[root@node1 opt]# netstat -antp | grep 9200     #检查服务是否开启

使用浏览器查看节点信息:192.168.222.20:9200

检验集群健康状态:192.168.222.20:9200/_cluster/health?pretty

image-20220920231648625

查看集群状态:192.168.222.20:9200/_cluster/state?pretty

5.安装Elasticsearch-head插件步骤

#编译安装node组件依赖包
[root@node1 ~]# yum install gcc gcc-c++ make -y
[root@node1 ~]# cd /opt/
[root@node1 opt]# tar zxvf node-v8.2.1.tar.gz
[root@node1 opt]# cd node-v8.2.1/
[root@node1 node-v8.2.1]# ./configure && make && make install

#安装phantomjs,前端框架
[root@node1 opt]# tar jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2 -C /usr/local/src/
[root@node1 opt]# cd /usr/local/src/phantomjs-2.1.1-linux-x86_64/bin/
[root@node1 bin]# cp phantomjs /usr/local/bin/

#安装elasticsearch-head,数据可视化工具
[root@node1 opt]# tar zxvf elasticsearch-head.tar.gz -C /usr/local/src/
[root@node1 opt]# cd /usr/local/src/elasticsearch-head
[root@node1 elasticsearch-head]# npm install


#修改主配置文件
[root@node1 ~]# vim /etc/elasticsearch/elasticsearch.yml
......
#-------末尾;添加以下内容--------
http.cors.enabled: true              #开启跨域访问支持,默认为 false
http.cors.allow-origin: "*"          #指定跨域访问允许的域名地址为所有
#重启es数据库服务
[root@node1 ~]# systemctl restart elasticsearch.service

#启动elasticsearch-head
[root@node1 elasticsearch-head]# npm run start &

使用elasticsearch-head插件查看集群状态:

访问:192.168.222.20:9100

在Elasticsearch 后面的栏目中输入:http://192.168.222.20:9200

#创建索引(在单台node上创建)
[root@node2 elasticsearch-head]# curl -X PUT 'localhost:9200/index-demo/test/1?pretty&pretty' -H 'content-Type: application/json' -d '{"user":"lcdb","mesg":"lichen youshoujiuxing"}'

打开浏览器输入地址,查看索引信息:192.168.222.20:9100,下图可以看见索引默认被分片5个,并且有一个副本。

image-20220921011440501

点击数据浏览–会发现在node上创建的索引为index-demo类型为test相关的信息

5.安装Logstash

#安装apache服务
[root@node3 ~]# yum install httpd -y
[root@node3 ~]# systemctl start httpd
#安装Logstash-rpm软件包(拖入logstash-5.5.1.rpm包)
[root@node3 opt]# rpm -ivh logstash-5.5.1.rpm
[root@node3 opt]# systemctl start logstash.service 
[root@node3 opt]# systemctl enable logstash.service 
[root@node3 opt]# ln -s /usr/share/logstash/bin/logstash /usr/local/bin/

测试Logstash命令
-f:通过这个选项可以指定 Logstash 的配置文件,根据配置文件配置 Logstash 的输入和输出流。

-e:从命令行中获取,输入、输出后面跟着字符串,该字符串可以被当作 Logstash 的配置(如果是空,则默认使用 stdin 作为输入,stdout 作为输出)。

-t:测试配置文件是否正确,然后退出。
1.输入采用标准输入,输出采用标准输出
[root@node3 opt]# logstash -e 'input { stdin{} } output { stdout{} }'		#默认端口为9600~9700

2.使用rubydebug显示详细输出,codec为一种编解码器
[root@node3 opt]# logstash -e 'input { stdin{} } output { stdout{ codec=>rubydebug } }'

3.使用Logstash将信息写入Elasticsearch中
[root@node3 opt]# logstash -e 'input { stdin{} } output { elasticsearch { hosts=>["192.168.222.20:9200"] } }'

4.在浏览器访问Elasticsearch-head——192.168.222.20:9100,查看索引信息,多出Logstash-日期

点击数据浏览查看相应的内容:

在Apache主机上做对接配置

Logstash配置文件主要由三个部分组成:input,output,filter(根据需要)

#给系统日志其他用户加“读”的权限
[root@node3 opt]# chmod o+r /var/log/messages

#便捷Logstash配置文件
[root@node3 opt]# vim /etc/logstash/conf.d/system.conf
input {
       file{
        path => "/var/log/messages"
        type => "system"
        start_position => "beginning"
        }
      }
output {
        elasticsearch {
          hosts => ["192.168.222.20:9200"]
          index => "system-%{+YYYY.MM.dd}"
          }
        }
        
#重启Logstash服务
[root@node3 opt]# systemctl restart logstash

在浏览器上访问Elasticsearch-head(192.168.222.20:9100),查看索引信息会多出system-日期

6.安装Kibana

#安装kibana-rpm包(拖入kibana-5.5.1-x86_64.rpm)
[root@node1 ~]# cd /opt
[root@node1 opt]# rpm -ivh kibana-5.5.1-x86_64.rpm
[root@node1 opt]# cd /etc/kibana/
[root@node1 kibana]# cp kibana.yml kibana.yml.bak
#修改配置文件kibana.yml
[root@node1 kibana]# vim kibana.yml
#2行;取消注释;kibana打开的端口(默认5601)
server.port: 5601
#7行;取消注释,修改;kibana侦听的地址为0.0.0.0
server.host: "0.0.0.0"
#21行;取消注释,修改;和elasticsearch建立联系
elasticsearch.url: "http://192.168.222.20:9200"
#30行;取消注释;在elasticsearch中添加.kibana索引
kibana.index: ".kibana"
#启动并设置开机自启动kibana
[root@node1 kibana]# systemctl enable --now kibana

在浏览器访问kibana服务(192.168.222.20:5601),首次登录创建一个索引名字:system-*(这是对接系统日志文件),然后点击最下面的create创建

点击左上角discover会发现system-*的信息

对接Apache主机的Apache日志文件,访问日志、错误日志(node3)
#编写对接apache服务的配置文件
[root@node3 ~]# cd /etc/logstash/conf.d/
[root@node3 conf.d]# vim apache_log.conf
input {
     file{
        path => "/etc/httpd/logs/access_log"
        type => "access"
        start_position => "beginning"
      }
     file{
        path => "/etc/httpd/logs/error_log"
        type => "error"
        start_position => "beginning"
      }
}
output {
    if [type] == "access" {
        elasticsearch {
          hosts => ["192.168.222.20:9200"]
          index => "apache_access-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "error" {
        elasticsearch {
          hosts => ["192.168.222.20:9200"]
          index => "apache_error-%{+YYYY.MM.dd}"
        }
    }
}

#使用logstash命令指定根据配置文件输入输出
[root@node3 conf.d]# /usr/share/logstash/bin/logstash -f apache_log.conf

访问httpd制造一点访问记录:

[root@node4 ~]# curl http://192.168.222.40

打开浏览器登录Elasticsearch-head(192.168.222.20:9100)查看索引信息,能发现apache_error-2022.09.21和apache_access-2022.09.21

打开浏览器登录kibana,点击左下角management—>index patterns—>create index pattern,分别创建apache_error-和 apache_access-的索引。

注意:如果没有在Logstash中正确编写对接apache服务的配置文件,在es-head中没有生成索引,或kibana中与要创建的索引名输入不一致,都会导致无法创建

可以选择discover查看信息:

注意:刚开始的时候会显示No results found,是因为此时测试环境还没有日志信息,可以扩大时间范围并多访问几次httpd创造日志

7.安装Fliebeat(Node4)

#安装filebeat(拖入filebeat-6.6.0-linux-x86_64.tar.gz)
[root@node4 ~]# cd /opt/
[root@node4 opt]# [root@node4 opt]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.5.1-x86_64.rpm
[root@node4 opt]# rpm -ivh filebeat-5.5.1-x86_64.rpm 

#修改配置文件filebeat.yml
[root@node4 opt]# cd /etc/filebeat/
[root@node4 filebeat]# ls
filebeat.full.yml            filebeat.template-es6x.json  filebeat.yml
filebeat.template-es2x.json  filebeat.template.json
[root@node4 filebeat]# cp filebeat.yml filebeat.yml.bak
[root@node4 filebeat]# vim filebeat.yml
…………
filebeat.prospectors:
- input_type: log
 
paths:
     - /var/log/messages	#添加此行
     - /var/log/*.log
#添加以下
output.kafka:
     enabled: true
     Array of hosts to connect to.
     hosts: ["192.168.222.20:9092","192.168.222.30:9092","192.168.222.40:9092","192.168.222.50:9092"]
     topic: "filebeat_test"
[root@node4 filebeat]# curl -XPUT 'http://192.168.222.20:9200/_template/filebeat?pretty' -d@/etc/filebeat/filebeat.template.json
[root@node4 filebeat]# /etc/init.d/filebeat start

#修改Logstash节点配置并启动(node3)
[root@node3 ~]# cd /etc/logstash/conf.d/
[root@node3 conf.d]# vim filebeat.conf
input {
    kafka {
        bootstrap_servers => "192.168.222.20:9092,192.168.222.30:9092,192.168.222.40:9092,192.168.222.50:9092"
        topics  => "filebeat_test"
        group_id => "test123"
        auto_offset_reset => "earliest"
    }
}

output {
    elasticsearch {
        hosts => ["192.168.222.20:9200"]
        index => "filebeat_test-%{+YYYY.MM.dd}"
    }
    stdout {
        codec => rubydebug
    }
}
#重启Logstash服务
[root@node3 conf.d]# systemctl restart logstash.service

8.访问测试

在浏览器先查看Elasticsearch-head(192.168.222.20:9100)是否存在filebeat_test索引

登录kibana(192.168.222.20:5601)创建filebeat_test索引

END

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐