企业日志中心——ELFK+kafka+zookeeper部署
是一个基于Lucene的搜索服务器。提供搜索、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定、可靠、快速,安装使用方便。主机名IP地址安装软件配置要求node1Mem>=4Gnode2Me
文章目录
企业日志中心——ELFK+kafka+zookeeper部署
一、组件介绍
1.Elasticsearch
是一个基于Lucene的搜索服务器。提供搜索、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定、可靠、快速,安装使用方便。
2.Logstash
主要是用来日志的搜集、分析、过滤日志的工具。用于管理日志和事件的工具,你可以用它去收集日志、转换日志、解析日志并将他们作为数据提供给其他模块调用,例如搜索、存储等。
3.Kibana
是一个优秀的前端日志展示框架,他可以非常详细的将日志转化为各种图表,为用户提供强大的数据可视化支持,它能够搜索、展示存储在Elasticsearch中索引数据。使用它可以很方便的用图表、表格、地图展示和分析数据。
4.Kafka
数据缓冲队列。作为消息队列解耦了处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
- 发布和订阅记录流,类似于消息队列或企业消息传递系统。
- 以容错持久的方式存储记录流。
- 处理记录发生的流。
5.Filebeat
隶属于Beats,轻量级数据收集引擎。基于原先Logstash-fowarder的源码改造出来。换句话说:Filebeat就是新版的Logstash,也会是ELK Stack在Agent的第一选择,目前Beats包含四种工具:
- Packetbeat(搜索网络流量数据)
- Metricbeat(搜索系统、进程和文件系统级别的CPU和内存使用情况等数据。通过从操作系统和服务收集指标,帮助监控服务器及其托管的服务。)
- Filebeat(搜集文件数据)
- Winlogbeat(搜集windows事件日志数据)
二、环境介绍
主机名 | IP地址 | 安装软件 | 配置要求 |
---|---|---|---|
node1 | 192.168.222.20 | ES/zookeeper/kafka/kibana | Mem>=4G |
node2 | 192.168.222.30 | ES/zookeeper/kafka | Mem>=4G |
node3 | 192.168.222.40 | Logstash/zookeeper/kafka/apache | Mem>=4G |
node4 | 192.168.222.50 | Fliebeat/zookeeper/kafka | Mem>=4G |
三、搭建架构
官网地址:https://www.elastic.co/cn/
官方搭建文档:https://www.elastic.co/guide/index.html
这个架构图从左到右,总共分为5层,每层实现的功能和含义分别介绍如下:
第1层:数据采集层
数据采集层位于最左边的业务服务器集群上,在每个业务服务器上面安装了Filebeat做日志收集,然后把采集到的原始日志发送到Kafka+Zookeeper集群上。
第2层:消息队列层
原始日志发送到Kafka+Zookeeper集群上后,会进行集中存储,此时,Filbeat是消息的生产者,存储的消息可以随时被消费。
第3层:数据分析层
Logstash作为消费者,会去Kafka+Zookeeper集群节点实时拉取原始日志,然后将获取到的原始日志根据规则进行分析、清洗、过滤,最后将清洗好的日志转发至Elasticsearch集群。
第4层:数据持久化存储
Elasticsearch集群在接收到Logstash发送过来的数据后,执行写磁盘,建索引库等操作,最后将结构化的数据存储到Elasticsearch集群上。
第5层:数据查询、展示层
Kibana是一个可视化的数据展示平台,当有数据检索请求时,它从Elasticsearch集群上读取数据,然后进行可视化出图和多维度分析。
四、搭建ELFK+Kafka+Zookeeper
1.准备阶段
#关闭防火墙关闭核心防护
#同步时间源(全部节点操作)
[root@node1 opt]# ntpdate ntp1.aliyun.com
20 Sep 16:06:17 ntpdate[84829]: adjust time server 120.25.115.20 offset 0.008022 sec
#JDK的安装(全部节点操作)将软件包拖入/opt目录
[root@node1 opt]# tar zxf jdk-8u91-linux-x64.tar.gz -C /usr/local
[root@node1 opt]# cd /usr/local/
[root@node1 local]# mv jdk1.8.0_91/ jdk
[root@node1 local]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
[root@node1 local]# source /etc/profile
[root@node1 local]# java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
#配置hosts(全部节点)
[root@node1 local]# vim /etc/hosts
192.168.222.20 node1
192.168.222.30 node2
192.168.222.40 node3
192.168.222.50 node4
2.安装zookeeper
#拖入安装包并解压(全部节点)
[root@node1 opt]# tar zxf apache-zookeeper-3.5.7-bin.tar.gz -C /usr/local
#修改配置文件(全部节点)
[root@node1 apache-zookeeper-3.5.7-bin]# cd /usr/local/apache-zookeeper-3.5.7-bin/conf/
[root@node1 conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@node1 conf]# cp zoo_sample.cfg zoo.cfg
[root@node1 conf]# vim zoo.cfg
#2行,通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
tickTime=2000
#5行,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
initLimit=10
#8行,Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
syncLimit=5
#12行,修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataDir=/usr/local/apache-zookeeper-3.5.7-bin/data
#13行,添加,指定存放日志的目录,目录需要单独创建
dataLogDir=/usr/local/apache-zookeeper-3.5.7-bin/logs
#15行,客户端连接端口
clientPort=2181
#末行添加集群信息
server.1=192.168.222.20:3188:3288
server.2=192.168.222.30:3188:3288
server.3=192.168.222.40:3188:3288
server.4=192.168.222.50:3188:3288
注:server.A=B:C:D
A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
B是这个服务器的地址。
C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
#创建数据目录和日志目录(全部节点)
[root@node1 conf]# cd /usr/local/apache-zookeeper-3.5.7-bin/
[root@node1 apache-zookeeper-3.5.7-bin]# ls
bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt
[root@node1 apache-zookeeper-3.5.7-bin]# mkdir data
[root@node1 apache-zookeeper-3.5.7-bin]# mkdir logs
[root@node1 apache-zookeeper-3.5.7-bin]# ls
bin data lib logs README.md
conf docs LICENSE.txt NOTICE.txt README_packaging.txt
#创建myid文件(全部节点)
[root@node1 apache-zookeeper-3.5.7-bin]# echo 1 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
[root@node2 apache-zookeeper-3.5.7-bin]# echo 2 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
[root@node3 apache-zookeeper-3.5.7-bin]# echo 3 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
[root@node4 apache-zookeeper-3.5.7-bin]# echo 4 > /usr/local/apache-zookeeper-3.5.7-bin/data/myid
#创建zookeeper启动脚本(全部节点)
[root@node1 ~]# vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/apache-zookeeper-3.5.7-bin'
case $1 in
start)
echo "---------- zookeeper 启动 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重启 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 状态 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
#设置开机自启并启动(全部节点)
[root@node1 ~]# chmod +x /etc/init.d/zookeeper
[root@node1 ~]# chkconfig --add zookeeper
[root@node1 ~]# systemctl start zookeeper
[root@node1 ~]# systemctl status zookeeper
3.安装Kafka
#安装kafka软件包
[root@node1 opt]# rz -E
rz waiting to receive.
[root@node1 opt]# ls
apache-zookeeper-3.5.7-bin.tar.gz kafka_2.13-2.8.1.tgz
jdk-8u91-linux-x64.tar.gz rh
[root@node1 opt]# tar zxf kafka_2.13-2.8.1.tgz -C /usr/local
#修改配置文件
[root@node1 opt]# cd /usr/local/kafka_2.13-2.7.1/config/
[root@node1 config]# cp server.properties server.properties.bak
[root@node1 config]# vim server.properties
##31行,指定监听的IP(本机的ip地址)和端口
listeners=PLAINTEXT://192.168.222.20:9092
##36行,指定监听的IP(本机的ip地址)和端口
advertised.listeners=PLAINTEXT://192.168.222.20:9092
##42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.network.threads=3
##45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
num.io.threads=8
##48行,发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
##51行,接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
##54行,请求套接字的缓冲区大小
socket.request.max.bytes=104857600
##60行,kafka运行日志存放的路径,也是数据存放的路径
log.dirs=/usr/local/kafka_2.13-2.8.1/logs
##65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.partitions=1
##69行,用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
##103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.retention.hours=168
##110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
log.segment.bytes=1073741824
##123行,配置连接Zookeeper集群地址
zookeeper.connect=192.168.222.20:2181,192.168.222.30:2181,192.168.222.40:2181,192.168.222.50:2181
#修改环境变量
[root@node1 ~]# echo "export KAFKA_HOME=/usr/local/kafka_2.13-2.8.1" >> /etc/profile
[root@node1 ~]# echo "export PATH=$PATH:$KAFKA_HOME/bin" >> /etc/profile
[root@node1 ~]# source /etc/profile
#配置kafka启动脚本
[root@node1 ~]# vim /etc/init.d/kafka
[root@node1 ~]# vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka_2.13-2.8.1'
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
#开启服务并设置开机自启动
[root@node1 ~]# chmod +x /etc/init.d/kafka
[root@node1 ~]# chkconfig --add kafka
[root@node1 ~]# systemctl start kafka
[root@node1 ~]# systemctl status kafka
Kafka命令行操作(单节点)
3.1.创建topic
[root@node1 ~]# cd /usr/local/kafka_2.13-2.8.1/
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.222.20:2181,192.168.222.30:2181,192.168.222.40:2181,192.168.222.50:2181 --replication-factor 1 --partitions 3 --topic test
注:
--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
--replication-factor:定义分区副本数,1 代表单副本,不能超过broker(服务器)个数
--partitions:定义分区数(不定义将自动使用配置文件中的设置)
--topic:定义 topic 名称
3.2.查看当前topic列表
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --list --zookeeper 192.168.222.20
3.3.查看topic详细信息
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --describe --zookeeper 192.168.222.20
3.4.发布消息
[root@node1 kafka_2.13-2.8.1]# bin/kafka-console-producer.sh --broker-list 192.168.222.20:9092,192.168.222.30:9092,192.168.222.40:9092,192.168.222.50:9092 --topic test
>test
>123
>abcd
>^C
3.5.消费消息
[root@node1 kafka_2.13-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.222.20:9092,192.168.222.30:9092,192.168.222.40:9092,192.168.222.50:9092 --topic test --from-beginning
–from-beginning
:会把主题中以往所有的数据都读取出来
3.6.扩大分区
root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --zookeeper 192.168.222.20:2181 --alter --topic test --partitions 6
3.7.删除主题
[root@node1 kafka_2.13-2.8.1]# bin/kafka-topics.sh --delete --zookeeper 192.168.222.20:2181 --topic test
3.8.kafka-topics.sh脚本中的参数
参数 | 描述 |
---|---|
alter | 用于修改主题 |
config 键值对 | 创建或修改主题时,用于设置主题级别的参数 |
create | 创建主题 |
delete | 删除主题 |
delete-config 配置名称 | 删除主题级别被覆盖的配置 |
describe | 查看主题的详细信息 |
disable-rack-aware | 创建主题时不考虑机架信息 |
help | 打印帮助信息 |
if-exists | 修改或删除主题时使用,只有当主题存在时才会执行动作 |
if-not-exists | 创建主题时使用,只有主题不存在时才会执行动作 |
list | 列出所有可用的主题 |
partitons 分区数 | 创建主题或增加分区时指定的分区数 |
replica-assignment 分配方案 | 手工指定分区副本分配方案 |
replication-factor 副本数 | 创建主题时指定副本因子 |
topic 主题名称 | 指定主题名称 |
topics-with-overrides | 使用describe查看主题信息时,只展示包含覆盖配置的主题 |
unavailable-partitions | 使用describe查看主题信息时,只展示包含没有leader副本的分区 |
under-replicated-partitions | 使用describe查看主题信息时,只展示包含失效副本的分区 |
zookeeper | 指定连接的zk的地址,可以改成 --bootstrap-server |
4.安装Elasticsearch
#安装elasticsearch-rpm软件包
[root@node1 opt]# rpm -ivh elasticsearch-5.5.0.rpm
#加载系统服务
[root@node1 opt]# systemctl daemon-reload
[root@node1 opt]# systemctl enable elasticsearch.service
#更改Elasticsearch主配置文件
[root@node1 ~]# cd /etc/elasticsearch/
[root@node1 elasticsearch]# cp elasticsearch.yml elasticsearch.yml.bak
[root@node1 elasticsearch]# vim elasticsearch.yml
#17行;取消注释,修改;集群名字
cluster.name: my-elfk-cluster
#23行;取消注释,修改;节点名字(node2修改成node2)
node.name: node1
#33行;取消注释,修改;数据存放路径
path.data: /data/elfk_data
#37行;取消注释,修改;日志存放路径
path.logs: /var/log/elasticsearch
#43行;取消注释,修改;不在启动的时候锁定内存
bootstrap.memory_lock: false
#55行;取消注释,修改;提供服务绑定的IP地址,0.0.0.0代表所有地址
network.host: 0.0.0.0
#59行;取消注释;侦听端口为9200(默认)
http.port: 9200
#68行;取消注释,修改;集群发现通过单播实现,指定要发现的节点 node1、node2
discovery.zen.ping.unicast.hosts: ["node1", "node2"]
#检验配置
[root@node1 elasticsearch]# grep -v "^#" /etc/elasticsearch/elasticsearch.yml
#创建数据存放路径并授权
[root@node1 opt]# mkdir -p /data/elfk_data #创建数据存放的目录
[root@node1 opt]# chown elasticsearch:elasticsearch /data/elfk_data/ #修改存放数据目录的属主和属组
#启动es查看端口是否成功开启
[root@node1 opt]# systemctl start elasticsearch.service #开启服务
[root@node1 opt]# netstat -antp | grep 9200 #检查服务是否开启
使用浏览器查看节点信息:192.168.222.20:9200
检验集群健康状态:192.168.222.20:9200/_cluster/health?pretty
查看集群状态:192.168.222.20:9200/_cluster/state?pretty
5.安装Elasticsearch-head插件步骤
#编译安装node组件依赖包
[root@node1 ~]# yum install gcc gcc-c++ make -y
[root@node1 ~]# cd /opt/
[root@node1 opt]# tar zxvf node-v8.2.1.tar.gz
[root@node1 opt]# cd node-v8.2.1/
[root@node1 node-v8.2.1]# ./configure && make && make install
#安装phantomjs,前端框架
[root@node1 opt]# tar jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2 -C /usr/local/src/
[root@node1 opt]# cd /usr/local/src/phantomjs-2.1.1-linux-x86_64/bin/
[root@node1 bin]# cp phantomjs /usr/local/bin/
#安装elasticsearch-head,数据可视化工具
[root@node1 opt]# tar zxvf elasticsearch-head.tar.gz -C /usr/local/src/
[root@node1 opt]# cd /usr/local/src/elasticsearch-head
[root@node1 elasticsearch-head]# npm install
#修改主配置文件
[root@node1 ~]# vim /etc/elasticsearch/elasticsearch.yml
......
#-------末尾;添加以下内容--------
http.cors.enabled: true #开启跨域访问支持,默认为 false
http.cors.allow-origin: "*" #指定跨域访问允许的域名地址为所有
#重启es数据库服务
[root@node1 ~]# systemctl restart elasticsearch.service
#启动elasticsearch-head
[root@node1 elasticsearch-head]# npm run start &
使用elasticsearch-head插件查看集群状态:
访问:192.168.222.20:9100
在Elasticsearch 后面的栏目中输入:http://192.168.222.20:9200
#创建索引(在单台node上创建)
[root@node2 elasticsearch-head]# curl -X PUT 'localhost:9200/index-demo/test/1?pretty&pretty' -H 'content-Type: application/json' -d '{"user":"lcdb","mesg":"lichen youshoujiuxing"}'
打开浏览器输入地址,查看索引信息:192.168.222.20:9100,下图可以看见索引默认被分片5个,并且有一个副本。
点击数据浏览–会发现在node上创建的索引为index-demo类型为test相关的信息
5.安装Logstash
#安装apache服务
[root@node3 ~]# yum install httpd -y
[root@node3 ~]# systemctl start httpd
#安装Logstash-rpm软件包(拖入logstash-5.5.1.rpm包)
[root@node3 opt]# rpm -ivh logstash-5.5.1.rpm
[root@node3 opt]# systemctl start logstash.service
[root@node3 opt]# systemctl enable logstash.service
[root@node3 opt]# ln -s /usr/share/logstash/bin/logstash /usr/local/bin/
测试Logstash命令
-f:通过这个选项可以指定 Logstash 的配置文件,根据配置文件配置 Logstash 的输入和输出流。
-e:从命令行中获取,输入、输出后面跟着字符串,该字符串可以被当作 Logstash 的配置(如果是空,则默认使用 stdin 作为输入,stdout 作为输出)。
-t:测试配置文件是否正确,然后退出。
1.输入采用标准输入,输出采用标准输出
[root@node3 opt]# logstash -e 'input { stdin{} } output { stdout{} }' #默认端口为9600~9700
2.使用rubydebug显示详细输出,codec为一种编解码器
[root@node3 opt]# logstash -e 'input { stdin{} } output { stdout{ codec=>rubydebug } }'
3.使用Logstash将信息写入Elasticsearch中
[root@node3 opt]# logstash -e 'input { stdin{} } output { elasticsearch { hosts=>["192.168.222.20:9200"] } }'
4.在浏览器访问Elasticsearch-head——192.168.222.20:9100,查看索引信息,多出Logstash-日期
点击数据浏览查看相应的内容:
在Apache主机上做对接配置
Logstash配置文件主要由三个部分组成:input,output,filter(根据需要)
#给系统日志其他用户加“读”的权限
[root@node3 opt]# chmod o+r /var/log/messages
#便捷Logstash配置文件
[root@node3 opt]# vim /etc/logstash/conf.d/system.conf
input {
file{
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => ["192.168.222.20:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
#重启Logstash服务
[root@node3 opt]# systemctl restart logstash
在浏览器上访问Elasticsearch-head(192.168.222.20:9100),查看索引信息会多出system-日期
6.安装Kibana
#安装kibana-rpm包(拖入kibana-5.5.1-x86_64.rpm)
[root@node1 ~]# cd /opt
[root@node1 opt]# rpm -ivh kibana-5.5.1-x86_64.rpm
[root@node1 opt]# cd /etc/kibana/
[root@node1 kibana]# cp kibana.yml kibana.yml.bak
#修改配置文件kibana.yml
[root@node1 kibana]# vim kibana.yml
#2行;取消注释;kibana打开的端口(默认5601)
server.port: 5601
#7行;取消注释,修改;kibana侦听的地址为0.0.0.0
server.host: "0.0.0.0"
#21行;取消注释,修改;和elasticsearch建立联系
elasticsearch.url: "http://192.168.222.20:9200"
#30行;取消注释;在elasticsearch中添加.kibana索引
kibana.index: ".kibana"
#启动并设置开机自启动kibana
[root@node1 kibana]# systemctl enable --now kibana
在浏览器访问kibana服务(192.168.222.20:5601),首次登录创建一个索引名字:system-*(这是对接系统日志文件),然后点击最下面的create创建
点击左上角discover会发现system-*的信息
对接Apache主机的Apache日志文件,访问日志、错误日志(node3)
#编写对接apache服务的配置文件
[root@node3 ~]# cd /etc/logstash/conf.d/
[root@node3 conf.d]# vim apache_log.conf
input {
file{
path => "/etc/httpd/logs/access_log"
type => "access"
start_position => "beginning"
}
file{
path => "/etc/httpd/logs/error_log"
type => "error"
start_position => "beginning"
}
}
output {
if [type] == "access" {
elasticsearch {
hosts => ["192.168.222.20:9200"]
index => "apache_access-%{+YYYY.MM.dd}"
}
}
if [type] == "error" {
elasticsearch {
hosts => ["192.168.222.20:9200"]
index => "apache_error-%{+YYYY.MM.dd}"
}
}
}
#使用logstash命令指定根据配置文件输入输出
[root@node3 conf.d]# /usr/share/logstash/bin/logstash -f apache_log.conf
访问httpd制造一点访问记录:
[root@node4 ~]# curl http://192.168.222.40
打开浏览器登录Elasticsearch-head(192.168.222.20:9100)查看索引信息,能发现apache_error-2022.09.21和apache_access-2022.09.21
打开浏览器登录kibana,点击左下角management—>index patterns—>create index pattern,分别创建apache_error-和 apache_access-的索引。
注意
:如果没有在Logstash中正确编写对接apache服务的配置文件,在es-head中没有生成索引,或kibana中与要创建的索引名输入不一致,都会导致无法创建
可以选择discover查看信息:
注意
:刚开始的时候会显示No results found,是因为此时测试环境还没有日志信息,可以扩大时间范围并多访问几次httpd创造日志
7.安装Fliebeat(Node4)
#安装filebeat(拖入filebeat-6.6.0-linux-x86_64.tar.gz)
[root@node4 ~]# cd /opt/
[root@node4 opt]# [root@node4 opt]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.5.1-x86_64.rpm
[root@node4 opt]# rpm -ivh filebeat-5.5.1-x86_64.rpm
#修改配置文件filebeat.yml
[root@node4 opt]# cd /etc/filebeat/
[root@node4 filebeat]# ls
filebeat.full.yml filebeat.template-es6x.json filebeat.yml
filebeat.template-es2x.json filebeat.template.json
[root@node4 filebeat]# cp filebeat.yml filebeat.yml.bak
[root@node4 filebeat]# vim filebeat.yml
…………
filebeat.prospectors:
- input_type: log
paths:
- /var/log/messages #添加此行
- /var/log/*.log
#添加以下
output.kafka:
enabled: true
Array of hosts to connect to.
hosts: ["192.168.222.20:9092","192.168.222.30:9092","192.168.222.40:9092","192.168.222.50:9092"]
topic: "filebeat_test"
[root@node4 filebeat]# curl -XPUT 'http://192.168.222.20:9200/_template/filebeat?pretty' -d@/etc/filebeat/filebeat.template.json
[root@node4 filebeat]# /etc/init.d/filebeat start
#修改Logstash节点配置并启动(node3)
[root@node3 ~]# cd /etc/logstash/conf.d/
[root@node3 conf.d]# vim filebeat.conf
input {
kafka {
bootstrap_servers => "192.168.222.20:9092,192.168.222.30:9092,192.168.222.40:9092,192.168.222.50:9092"
topics => "filebeat_test"
group_id => "test123"
auto_offset_reset => "earliest"
}
}
output {
elasticsearch {
hosts => ["192.168.222.20:9200"]
index => "filebeat_test-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
#重启Logstash服务
[root@node3 conf.d]# systemctl restart logstash.service
8.访问测试
在浏览器先查看Elasticsearch-head(192.168.222.20:9100)是否存在filebeat_test索引
登录kibana(192.168.222.20:5601)创建filebeat_test索引
END
更多推荐
所有评论(0)