![cover](https://i-blog.csdnimg.cn/direct/5da96c5d44394968b36296919d2e5876.png)
ELK看这一篇就够!!!
是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。主要是用来日志的搜集、分析、过滤日志的工具。用于管理日志和事件的工
ELK
ELK+kafka+filebeat企业内部日志分析系统介绍
1、Elasticsearch: 是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。 2、Logstash: 主要是用来日志的搜集、分析、过滤日志的工具。用于管理日志和事件的工具,你可以用它去收集日志、转换日志、解析日志并将他们作为数据提供给其它模块调用,例如搜索、存储等。 3、Kibana: 是一个优秀的前端日志展示框架,它可以非常详细的将日志转化为各种图表,为用户提供强大的数据可视化支持,它能够搜索、展示存储在 Elasticsearch 中索引数据。使用它可以很方便的用图表、表格、地图展示和分析数据。 4、Kafka: 数据缓冲队列。作为消息队列解耦合处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 - 1.发布和订阅记录流,类似于消息队列或企业消息传递系统。 - 2.以容错持久的方式存储记录流。 - 3.处理记录发生的流。 5、Filebeat: 隶属于Beats,轻量级数据收集引擎。基于原先 Logstash-fowarder 的源码改造出来。换句话说:Filebeat就是新版的 Logstash-fowarder,也会是 ELK Stack 在 Agent 的第一选择,目前Beats包含四种工具: - 1.Packetbeat(搜集网络流量数据) - 2.Metricbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据。通过从操作系统和服务收集指标,帮助您监控服务器及其托管的服务。) - 3.Filebeat(搜集文件数据) - 4.Winlogbeat(搜集 Windows 事件日志数据)
架构图
环境介绍
安装软件 | 主机名 | IP地址 | 系统版本 | 配置 |
---|---|---|---|---|
Elasticsearch/Logstash/kibana | Elk | 10.3.145.14 | centos7.5.1804 | 2核4G |
Elasticsearch | Es1 | 10.3.145.56 | centos7.5.1804 | 2核3G |
Elasticsearch | Es2 | 10.3.145.57 | centos7.5.1804 | 2核3G |
zookeeper/kafka | Kafka1 | 10.3.145.41 | centos7.5.1804 | 1核2G |
zookeeper/kafka | Kafka2 | 10.3.145.42 | centos7.5.1804 | 1核2G |
zookeeper/kafka | Kafka3 | 10.3.145.43 | centos7.5.1804 | 1核2G |
Filebeat | file | 10.3.145.44 | centos7.5.1804 | 1核2G |
版本说明
Elasticsearch: 7.13.2 Logstash: 7.13.2 Kibana: 7.13.2 Kafka: 2.11-1 Filebeat: 7.13.2 相应的版本最好下载对应的插件
搭建架构
1、日志数据由filebate进行收集,定义日志位置,定义kafka集群,定义要传给kafka的那个topic 2、kafka接受到数据后,端口为9092,等待消费 3、logstash消费kafka中的数据,对数据进行搜集、分析,根据输入条件,过滤条件,输出条件处理后,将数据传输给es集群 4、es集群接受数据后,搜集、分析、存储 5、kibana提供可视化服务,将es中的数据展示。 官网地址:https://www.elastic.co 官网搭建:https://www.elastic.co/guide/index.html
elasticsearch集群部署
服务器
安装软件 | 主机名 | IP地址 | 系统版本 | 配置 |
---|---|---|---|---|
Elasticsearch | Elk | 10.3.145.14 | centos7.5.1804 | 2核4G |
Elasticsearch | Es1 | 10.3.145.56 | centos7.5.1804 | 2核3G |
Elasticsearch | Es2 | 10.3.145.57 | centos7.5.1804 | 2核3G |
开始部署
1、创建允许es的普通用户 [root@elk ~]# useradd es [root@elk ~]# echo "123" | passwd --stdin "es" 2、安装配置es 安装包在elk课件中,elasticsearch-7.13.2-linux-x86_64.tar.gz包名 [root@elk ~]# tar xvf elasticsearch-7.13.2-linux-x86_64.tar.gz -C /usr/local/ [root@elk ~]# mv /usr/local/elasticsearch-7.13.2/ /usr/local/es [root@elk ~]# vim /usr/local/es/config/elasticsearch.yml ------------------------------- cluster.name: bjbpe01-elk cluster.initial_master_nodes: ["192.168.1.101","192.168.1.102","192.168.1.103"] # 单节点模式这里的地址只填写本机地址 node.name: elk01 #各个节点名字不冲突即可 node.master: true node.data: true path.data: /data/elasticsearch/data path.logs: /data/elasticsearch/logs bootstrap.memory_lock: false bootstrap.system_call_filter: false network.host: 0.0.0.0 http.port: 9200 transport.tcp.port: 9300 # 单节点模式下,将discovery开头的行注释 discovery.seed_hosts: ["192.168.1.102","192.168.1.103"] #除本机以外的其他es节点ip discovery.zen.minimum_master_nodes: 2 discovery.zen.ping_timeout: 150s discovery.zen.fd.ping_retries: 10 client.transport.ping_timeout: 60s http.cors.enabled: true http.cors.allow-origin: "*" ------------------------------- 3、创建es数据及日志存储目录,赋权和设组 [root@elk ~]# mkdir -p /data/elasticsearch/{data,logs} [root@elk ~]# chown -R es.es /data/elasticsearch [root@elk ~]# chown -R es.es /usr/local/es 4、设置JVM堆大小 #7.0默认为4G [root@elk ~]# sed -i 's/## -Xms4g/-Xms4g/' /usr/local/es/config/jvm.options [root@elk ~]# sed -i 's/## -Xmx4g/-Xmx4g/' /usr/local/es/config/jvm.options 注意: 确保堆内存最小值(Xms)与最大值(Xmx)的大小相同,防止程序在运行时改变堆内存大小。 如果系统内存足够大,将堆内存最大和最小值设置为31G,因为有一个32G性能瓶颈问题。 堆内存大小不要超过系统内存的50% 5、系统优化 增加最大文件打开数 [root@elk ~]# echo "* soft nofile 65536" >> /etc/security/limits.conf 增加最大进程数 [root@elk ~]# echo "* soft nproc 65536" >> /etc/security/limits.conf 增加最大内存映射数 [root@elk ~]# echo "vm.max_map_count=262144" >> /etc/sysctl.conf [root@elk ~]# sysctl -p [root@elk ~]# vim /etc/security/limits.conf #更多的参数调整可以使用设置,上面三个已经够了,启动不起来的话再使用这个 ---------------------- * soft nofile 65536 * hard nofile 131072 * soft nproc 4096 * hard nproc 4096 ---------------------- 6、启动es [root@elk ~]# su es [es@elk ~]# /usr/local/es/bin/elasticsearch #先放到前台启动,如果没问题执行下列命令 [es@elk ~]# nohup /usr/local/es/bin/elasticsearch & #后台启动
启动如果报下列错误
memory locking requested for elasticsearch process but memory is not locked elasticsearch.yml文件 bootstrap.memory_lock : false /etc/sysctl.conf文件 vm.swappiness=0 错误: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536] 意思是elasticsearch用户拥有的客串建文件描述的权限太低,知道需要65536个 解决: 切换到root用户下面, [root@elk ~]# vim /etc/security/limits.conf 在最后添加 ---------------------- * hard nofile 65536 * hard nproc 65536 ---------------------- 重新启动elasticsearch,还是无效? 必须重新登录启动elasticsearch的账户才可以,例如我的账户名是elasticsearch,退出重新登录。 另外*也可以换为启动elasticsearch的账户也可以,* 代表所有,其实比较不合适 启动还会遇到另外一个问题,就是 max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144] 意思是:elasticsearch用户拥有的内存权限太小了,至少需要262114。这个比较简单,也不需要重启,直接执行 sysctl -w vm.max_map_count=262144 就可以了
/usr/local/es/config/elasticsearch.yml配置项含义:
cluster.name 集群名称,各节点配成相同的集群名称。 cluster.initial_master_nodes 集群ip,默认为空,如果为空则加入现有集群,第一次需配置 node.name 节点名称,各节点配置不同。 node.master 指示某个节点是否符合成为主节点的条件。 node.data 指示节点是否为数据节点。数据节点包含并管理索引的一部分。 path.data 数据存储目录。 path.logs 日志存储目录。 bootstrap.memory_lock 内存锁定,是否禁用交换,测试环境建议改为false。 bootstrap.system_call_filter 系统调用过滤器。 network.host 绑定节点IP。 http.port rest api端口。 discovery.seed_hosts 提供其他 Elasticsearch 服务节点的单点广播发现功能,这里填写除了本机的其他ip discovery.zen.minimum_master_nodes 集群中可工作的具有Master节点资格的最小数量,官方的推荐值是(N/2)+1,其中N是具有master资格的节点的数量。 discovery.zen.ping_timeout 节点在发现过程中的等待时间。 discovery.zen.fd.ping_retries 节点发现重试次数。 http.cors.enabled 是否允许跨源 REST 请求,用于允许head插件访问ES。 http.cors.allow-origin 允许的源地址。
安装配置head监控插件(只在其中一台es部署即可)
安装head插件,需要用到node前端打包工具,phantomjs是head必要的插件 将node-v10.0.0-linux-x64.tar.gz和phantomjs-2.1.1-linux-x86_64.tar.bz2上传到虚拟机中,压缩包都在elk课件中 1、安装node tar xvf node-v10.0.0-linux-x64.tar.gz -C /usr/local/ mv /usr/local/node-v10.0.0-linux-x64/ /usr/local/node echo " #添加环境变量 NODE_HOME=/usr/local/node PATH=\$NODE_HOME/bin:\$PATH export NODE_HOME PATH " >>/etc/profile source /etc/profile node -v #可以看到版本代表node部署成功 2、下载head插件 wget https://github.com/mobz/elasticsearch-head/archive/master.zip #下载head插件 unzip master.zip #解压完会有一个elasticsearch-head-master目录 mv elasticsearch-head-master/ /usr/local/ cd /usr/local/elasticsearch-head-master/ 3、安装grunt npm install -g grunt-cli #下载grunt-cli,启动需要用到这个工具 grunt -version #查看版本号 4、修改head源码 vim /usr/local/elasticsearch-head-master/Gruntfile.js +99 ---------------- keepalive: true, #在这一行后面加个,号 hostname: "*" #绑定本机的哪个ip ---------------- vim /usr/local/elasticsearch-head-master/_site/app.js +4388 ---------------- this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://(将该部分修改为es集群的节点ip):9200"; #绑定哪个节点作为head插件的服务器 ---------------- 5、下载head必要的文件 wget https://github.com/Medium/phantomjs/releases/download/v2.1.1/phantomjs-2.1.1-linux-x86_64.tar.bz2 #下载head必要的插件,实在GitHub上下载的,比较慢,压缩包也在elk课件中 mkdir /tmp/phantomjs #为phantomjs创建一个目录,该文件只能放到这个目录下 yum -y install bzip2 mv phantomjs-2.1.1-linux-x86_64.tar.bz2 /tmp/phantomjs/ chmod 777 /tmp/phantomjs -R 6、运行head cd /usr/local/elasticsearch-head-master/ npm install phantomjs-prebuilt@2.1.16 --ignore-scripts #取消版本限制 npm install #安装 grunt server #启动 nohup grunt server & #启动没有报错的话,放到后台运行 7、测试访问 head虚拟机ip:9100端口
logstash部署
压缩包在elk课件中,将logstash-7.13.2-linux-x86_64.tar压缩包传入到服务器中 tar zxf /usr/local/package/logstash-7.13.2.tar.gz -C /usr/local/ mv /usr/local/package/logstash-7.13.2.tar.gz /usr/local/package/logstash 就可以开始使用了,案例如下
案例一: 标准输入 => 标准输出
vim /usr/local/logstash/conf/stdin.conf --------------------------------------- input { stdin {} } output { stdout { codec => rubydebug #codec 输出编解码器,rubydebug 编解码器生成的字符输出到标准输出 } } --------------------------------------- /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/stdin.conf #运行logstash
案例2: 标准输入 => 标准输出及es集群
vim /usr/local/logstash/conf/stdin.conf --------------------------------------- input { stdin {} } output { stdout { codec => rubydebug } elasticsearch { hosts => ["es1:9200","es2:9200","es3:9200"] #哪个es接受消息 index => 'jiaming.test.log' #文件名 } --------------------------------------- /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/stdin.conf #运行logstash
案例3: 端口输入 => 字段匹配 => 标准输出及es集群
vim /usr/local/logstash/conf/file.conf --------------------------------------- input { tcp { port => 6688 } } filter { grok { match => { "message" => "%{GREEDYDATA:inet} %{IP:ip}/%{GREEDYDATA:netmask} %{GREEDYDATA:other}" } } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["es1:9200","es2:9200","es3:9200"] index => 'test.log' } } --------------------------------------- /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/stdin.conf #运行logstash 注:match => { "message" => "%{GREEDYDATA:inet} %{IP:ip}/%{GREEDYDATA:netmask} %{GREEDYDATA:other}" } #该部分有些问题,修改为 match => { "message" => "%{GREEDYDATA:inet} %{IP:ip}/%{INIT:netmask} %{GREEDYDATA:other}" } 这样netmask字段展示的将是子网掩码,由于修改投入成本太高,课件中的图片就不修改了 在另一台服务器上测试往logstash输入内容,logstash会输出内容,并将内容输入到es服务器中head和kibana中 yum install -y nc #测试命令 ip a | grep inet | grep ens33 | nc 192.168.223.190 6688 #将本机ip输入到logstash
案例4: 远程文件输入 => 字段匹配及修改时间格式修改 => es集群
vim /usr/local/logstash/conf/stdin.conf #专门为/var/log/messages日志写的格式 --------------------------------------- input { tcp { port => 6688 } } filter { grok { match => { "message" => "%{MONTH:month} %{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:service}%{GREEDYDATA:other}" } } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["es1:9200","es2:9200","es3:9200"] index => 'message.test' } } --------------------------------------- /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/stdin.conf #运行logstash 在另一台服务器上测试往logstash输入/var/log/messages日志,logstash会输出内容,并将内容输入到es服务器中head和kibana中 tail /var/log/messages | nc 192.168.223.190 6688
案例5: 将本地文件输入 => 字段匹配及修改时间格式修改 => es集群
vim /usr/local/logstash/conf/file.conf #将nginx错误日志输入到logstash --------------------------------------- input { file { type => "nginx-log" path => "/var/log/nginx/error.log" start_position => "beginning" } } filter { grok { match => { "message" => '%{DATESTAMP:date} \[%{WORD:level}\] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"' } } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["es1:9200","es2:9200"] index => 'nginx-error2332.log' } } --------------------------------------- /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/stdin.conf #运行logstash 可以在其他服务器上访问logstash节点服务器中的nginx页面,只要错误添加到日志中,logstash就会进行收集
案例6: filebate 传输给 logstash (nginx日志)
在logstash服务器上执行 vim /usr/local/logstash/conf/file.conf --------------------------------------- #logstash的配置 input { beats { port => 5000 } } filter { grok { match => {"message" => "%{IPV4:cip}"} } } output { elasticsearch { hosts => ["es1:9200","es2:9200"] index => 'filebeat.test' } stdout { codec => rubydebug } } --------------------------------------- /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf/stdin.conf #运行logstash 在file beat服务器上执行 yum install -y nginx nginx curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.2-x86_64.rpm yum install -y filebeat-7.13.2-x86_64.rpm vim /etc/filebeat/filebeat.yml --------------------------------------- #filebeat的配置 #在Elasticsearch Output区域中将 output.elasticsearch: #将这行注释掉,使用自己的配置 hosts: ["localhost:9200"] #将这行注释掉,使用自己的配置 filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log output.logstash: hosts: ["es1:5000"] #logstash节点服务器 #在配置文件中的最底下添加,否则会运行不起来 seccomp: default_action: allow syscalls: - action: allow names: - rseq --------------------------------------- cd /etc/filebeat/ filebeat -e -c filebeat.yml #启动file beat,会发现本地/var/log/nginx/access.log文件会导入到es1:5000端口中logstash中
file beat配置
kibana部署
将head放到后台运行 ctrl +z bg 1、安装 将kibana-7.13.2-linux-x86_64.tar.gz压缩包传输到root下,压缩包在elk课件中 tar xvf kibana-7.13.2-linux-x86_64.tar.gz -C /usr/local/ 2、配置 vim /usr/local/kibana-7.13.2-linux-x86_64/config/kibana.yml ------------------------------------------- server.port: 5601 #kibana的端口 server.host: "192.168.253.141" #kibana绑定哪个主机ip elasticsearch.hosts: ["http://192.168.253.141:9200","http://192.168.253.140:9200","http://192.168.253.143:9200"] #连接哪个es节点,全写上,宕了有备用 kibana.index: ".kibana" #启动时创建的索引文件 i18n.locale: "zh-CN" #字符集 ------------------------------------------- 3、启动 /usr/local/kibana-7.13.2-linux-x86_64/bin/kibana --allow-root #启动,允许root用户启动 nohup /usr/local/kibana-7.13.2-linux-x86_64/bin/kibana --allow-root & #后台启动 web浏览器访问head端口,会发现kibana连接上head,页面发生改变
web浏览器访问绑定kibana的ip:5601端口 添加样例数据
根据关键字查找数据
查看详细字段
官方教程kibana添加其他服务日志
配置反向代理
yum install -y nginx vim /etc/nginx/nginx.conf -------------------------------- user nginx; worker_processes auto; error_log /var/log/nginx/error.log; pid /var/run/nginx.pid; worker_rlimit_nofile 65535; events { worker_connections 65535; use epoll; } http { include mime.types; default_type application/octet-stream; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log /var/log/nginx/access.log main; server_names_hash_bucket_size 128; autoindex on; sendfile on; tcp_nopush on; tcp_nodelay on; keepalive_timeout 120; fastcgi_connect_timeout 300; fastcgi_send_timeout 300; fastcgi_read_timeout 300; fastcgi_buffer_size 64k; fastcgi_buffers 4 64k; fastcgi_busy_buffers_size 128k; fastcgi_temp_file_write_size 128k; #gzip模块设置 gzip on; #开启gzip压缩输出 gzip_min_length 1k; #最小压缩文件大小 gzip_buffers 4 16k; #压缩缓冲区 gzip_http_version 1.0; #压缩版本(默认1.1,前端如果是squid2.5请使用1.0) gzip_comp_level 2; #压缩等级 gzip_types text/plain application/x-javascript text/css application/xml; #压缩类型,默认就已经包含textml,所以下面就不用再写了,写上 去也不会有问题,但是会有一个warn。 gzip_vary on; #开启限制IP连接数的时候需要使用 #limit_zone crawler $binary_remote_addr 10m; #tips: #upstream bakend{#定义负载均衡设备的Ip及设备状态}{ # ip_hash; # server 127.0.0.1:9090 down; # server 127.0.0.1:8080 weight=2; # server 127.0.0.1:6060; # server 127.0.0.1:7070 backup; #} #在需要使用负载均衡的server中增加 proxy_pass http://bakend/; server { listen 80; #charset koi8-r; # access_log /var/log/nginx/host.access.log main; access_log off; location / { auth_basic "Kibana"; #可以是string或off,任意string表示开启认证,off表示关闭认证。 auth_basic_user_file /etc/nginx/passwd.db; #指定存储用户名和密码的认证文件。 proxy_pass http://192.168.253.141:5601; #这里填kibana的ip:端口 proxy_set_header Host $host:5601; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Via "nginx"; } location /status { stub_status on; #开启网站监控状态 access_log /var/log/nginx/kibana_status.log; #监控日志 auth_basic "NginxStatus"; } location /head/{ auth_basic "head"; auth_basic_user_file /etc/nginx/passwd.db; proxy_pass http://192.168.253.141:9100/; #这里填head的ip:端口 proxy_set_header Host $host:9100; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Via "nginx"; } # redirect server error pages to the static page /50x.html error_page 500 502 503 504 /50x.html; location = /50x.html { } } } -------------------------------- yum install -y httpd-tools #创建用户密码的软件 htpasswd -cm /etc/nginx/passwd.db kibana #kibana用户 设置密码:123 htpasswd -m /etc/nginx/passwd.db head #head用户 设置密码:123 在web浏览器中直接访问负载均衡ip,则转发到kibana服务器的5601端口,输入用户kibana,密码123 在web浏览器中直接访问负载均衡ip/head,则转发到head服务器的9100端口,输入用户head,密码123
kafka部署
部署kafka集群需要三台虚拟机 因为kafka需要java环境,所以要先部署java环境,再部署kafka 将jdk-8u211-linux-x64.tar.gz和kafka_2.11-2.0.0.tgz的包传到虚拟机中,三台一样的操作 部署java环境,三台机器一样的操作 tar xvf jdk-8u211-linux-x64.tar.gz -C /usr/local/ mv /usr/local/jdk1.8.0_211/ /usr/local/java -------------------------- #命令行中写入Java的环境变量 echo ' JAVA_HOME=/usr/local/java PATH=$JAVA_HOME/bin:$PATH export JAVA_HOME PATH ' >>/etc/profile -------------------------- source /etc/profile java -version 部署kafka,三台机器一样的操作 tar xvf kafka_2.11-2.0.0.tgz -C /usr/local/ mv /usr/local/kafka_2.11-2.0.0/ /usr/local/kafka #可以在一台服务器上解压,scp到其他节点 scp -r /usr/local/kafka/ 192.168.253.149:/usr/local/ vim /usr/local/kafka/config/zookeeper.properties #将该文件的内容全删掉,使用该配置 --------------------------------- dataDir=/opt/data/zookeeper/data dataLogDir=/opt/data/zookeeper/logs clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.253.140:2888:3888 #kafka集群IP:Port .1为id,3处要对应 server.2=192.168.253.143:2888:3888 server.3=192.168.253.149:2888:3888 --------------------------------- /usr/local/kafka/config/zookeeper.properties #配置详解 --------------------------------- dataDir ZK数据存放目录。 dataLogDir ZK日志存放目录。 clientPort 客户端连接ZK服务的端口。 tickTime ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。 initLimit 允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。 syncLimit Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。 server.1=172.16.244.31:2888:3888 2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。 --------------------------------- mkdir -p /opt/data/zookeeper/{data,logs} #创建data、log目录 echo 1 > /opt/data/zookeeper/data/myid #创建myid文件,kafka集群中myid的数字不可以冲突 配置kafka vim /usr/local/kafka/config/server.properties #将该文件的内容全删掉,使用该配置 ------------------------------------------------------------------ broker.id=1 #kafka集群中,此处id不可以重复 listeners=PLAINTEXT://192.168.253.140:9092 #写本机ip即可 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/data/kafka/logs num.partitions=6 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.253.140:2181,192.168.253.143:2181,192.168.253.149:2181 #都有哪些kafka集群 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 ------------------------------------------------------------------ /usr/local/kafka/config/server.properties #配置详解 ------------------------------------------------------------------ broker.id 每个server需要单独配置broker id,如果不配置系统会自动配置。 listeners 监听地址,格式PLAINTEXT://IP:端口。 num.network.threads 接收和发送网络信息的线程数。 num.io.threads 服务器用于处理请求的线程数,其中可能包括磁盘I/O。 socket.send.buffer.bytes 套接字服务器使用的发送缓冲区(SO_SNDBUF) socket.receive.buffer.bytes 套接字服务器使用的接收缓冲区(SO_RCVBUF) socket.request.max.bytes 套接字服务器将接受的请求的最大大小(防止OOM) log.dirs 日志文件目录。 num.partitions partition数量。 num.recovery.threads.per.data.dir 在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量,默认1。 offsets.topic.replication.factor 偏移量话题的复制因子(设置更高保证可用),为了保证有效的复制,偏移话题的复制因子是可配置的,在偏移话题的第一次请求的时候可用的broker的数量至少为复制因子的大小,否则要么话题创建失败,要么复制因子取可用broker的数量和配置复制因子的最小值。 log.retention.hours 日志文件删除之前保留的时间(单位小时),默认168 log.segment.bytes 单个日志文件的大小,默认1073741824 log.retention.check.interval.ms 检查日志段以查看是否可以根据保留策略删除它们的时间间隔。 zookeeper.connect ZK主机地址,如果zookeeper是集群则以逗号隔开。 zookeeper.connection.timeout.ms 连接到Zookeeper的超时时间。 ------------------------------------------------------------------ mkdir -p /opt/data/kafka/logs #创建kafka日志目录 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties #启动zookeeper /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #前台确认没有报错,后台启动zookeeper yum install -y nmap #下载插件 echo conf | nc 127.0.0.1 2181 #如果可以看到如下配置,则代表没有问题 ---------------------------------- clientPort=2181 dataDir=/opt/data/zookeeper/data/version-2 dataLogDir=/opt/data/zookeeper/logs/version-2 tickTime=2000 maxClientCnxns=60 minSessionTimeout=4000 maxSessionTimeout=40000 serverId=1 initLimit=20 syncLimit=10 electionAlg=3 electionPort=3888 quorumPort=2888 peerType=0 ---------------------------------- echo stat | nc 192.168.253.140 2181 #查看所有zookeeper状态 ---------------------------------- Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT Clients: /127.0.0.1:39810[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x200000002 Mode: follower #follower 跟随者(代码集群的slave),leader 领导者(代表集群的master),也可以通过端口判断,有2888端口的就是master Node count: 4 ---------------------------------- 测试kafka是否部署成功 cd /usr/local/kafka bin/kafka-server-start.sh config/server.properties #前台启动,kafka启动需要依赖zookeeper启动,如果kafka启动报错,有可能是zookeeper没有启动,或者防火墙、selinux没有关,时间不同步等 nohup bin/kafka-server-start.sh config/server.properties & #后台启动,并将输出到内容写入到nohup中,启动的端口为9092 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ming #写入数据进行测试 输出:Created topic "ming". #代表消息成功写入 bin/kafka-topics.sh --zookeeper 192.168.253.143:2181 --list #查看数据是否成功写入,可以显示刚才创建的jiaming,则代表kafka消息队列创建成功,任意一个节点都可以看其他kafka集群的数据 bin/kafka-topics.sh --zookeeper 192.168.253.149:2181 --list #同上 模拟消息生产和消费 bin/kafka-console-producer.sh --broker-list 192.168.253.143:9092 --topic ming #生产消息,在这一台服务器上输入消息 bin/kafka-console-consumer.sh --bootstrap-server 192.168.253.149:9092 --topic ming --from-beginning #接受消息,在这一台上接受消息 结果如下图的话,代表kafka消息队列部署成功
filebeat部署
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.2-x86_64.rpm #下载安装包 yum install -y filebeat-7.13.2-x86_64.rpm #安装 vim /etc/filebeat/filebeat.yml #修改file beat配置文件 --------------------------------------- #在Elasticsearch Output区域中 hosts: ["es1:9200","es2:9200"] #添加es节点服务器 username: "es" #将这一行注释删掉,并写es服务器的用户,该用户密码是用于尝试连接的,所有被写入的服务器都会尝试使用该用户密码连接 password: "123" #to #在Kibana区域中 host: "es1:5601" #这一行写部署kibana的节点ip --------------------------------------- filebeat modules enable elasticsearch #启动elastic search服务 vim /etc/filebeat/modules.d/elasticsearch.yml #编写elastic search文件,都添加什么模块 --------------------------------------- var.paths: #将注释删掉,收集哪些文件 - /data/elasticsearch/logs/*.log - /data/elasticsearch/logs/_server.json gc: #垃圾收集日志 enabled: true #打开 # Set custom paths for the log files. If left empty, # Filebeat will choose the paths depending on your OS. var.paths: #将注释删除,并添加收集哪些日志 - /data/elasticsearch/logs/gc.log.[0.9]* - /data/elasticsearch/logs/gc.log audit: #自定义收集哪些日志 enabled: true # Set custom paths for the log files. If left empty, # Filebeat will choose the paths depending on your OS. var.paths: #同上 - /data/elasticsearch/logs/*_access.log - /data/elasticsearch/logs/*_audit.json slowlog: #慢查询日志 enabled: true # Set custom paths for the log files. If left empty, # Filebeat will choose the paths depending on your OS. var.paths: #同上 - /data/elasticsearch/logs/*_index_search_slowlog.log - /data/elasticsearch/logs/*_index_indexing_slowlog.log - /data/elasticsearch/logs/*_index_search_slowlog.json - /data/elasticsearch/logs/*_index_indexing_slowlog.json deprecation: #应用程序或服务的弃用警告日志 enabled: true # Set custom paths for the log files. If left empty, # Filebeat will choose the paths depending on your OS. var.paths: - /data/elasticsearch/logs/*_deprecation.log - /data/elasticsearch/logs/*_deprecation.json --------------------------------------- filebeat setup #启动file beat配置 如下图1:会发现在/etc/filebeat/modules.d/elasticsearch.yml添加的配置会在kibana中显示 systemctl start filebeat 启动file beat 如下图2:会发现elastic search模块z
补充:logstashz正则
grok
1、手动输入日志数据
一般为debug 方式,检测 ELK 集群是否健康,这种方法在 logstash 启动后可以直接手动数据数据,并将格式化后的数据打印出来。
数据链路
1、启动logstash
2、logstash启动后,直接进行数据输入
3、logstash处理后,直接进行返回
input { stdin {} } output { stdout { codec => rubydebug } }
2、手动输入数据,并存储到 es
数据链路
1、启动logstash
2、启动后直接在终端输入数据
3、数据会由logstash处理后返回并存储到es集群中
input { stdin {} } output { stdout { codec => rubydebug } elasticsearch { hosts => ["节点服务器的ip:"] index => 'logstash-debug-%{+YYYY-MM-dd}' } }
3、自定义日志1
数据链路
1、由tcp 的8888端口将日志发送到logstash
2、数据被grok进行正则匹配处理
3、处理后,数据将被打印到终端并存储到es
input { tcp { port => 8888 } } filter { grok { match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"] index => 'logstash-debug-%{+YYYY-MM-dd}' } } # yum install -y nc # free -m |awk 'NF==2{print $1,$3}' |nc logstash_ip 8888
4、自定义日志2
数据链路
1、由tcp 的8888端口将日志发送到logstash
2、数据被grok进行正则匹配处理
3、处理后,数据将被打印到终端
input { tcp { port => 8888 } } filter { grok { match => {"message" => "%{WORD:username}\:%{WORD:passwd}\:%{INT:uid}\:%{INT:gid}\:%{DATA:describe}\:%{DATA:home}\:%{GREEDYDATA:shell}"} } } output { stdout { codec => rubydebug } } # cat /etc/passwd | nc logstash_ip 8888
5、nginx access 日志
数据链路
1、在filebeat配置文件中,指定kafka集群ip [output.kafka] 的指定topic当中
2、在logstash配置文件中,input区域内指定kafka接口,并指定集群ip和相应topic
3、logstash 配置filter 对数据进行清洗
4、将数据通过 output 存储到es指定index当中
5、kibana 添加es 索引,展示数据
input { kafka { type => "audit_log" codec => "json" topics => "haha" #decorate_events => true #enable_auto_commit => true auto_offset_reset => "earliest" bootstrap_servers => ["192.168.52.129:9092,192.168.52.130:9092,192.168.52.131:9092"] } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}"} } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } geoip { source => "lan_ip" } } output { if [type] == "audit_log" { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.52.129","192.168.52.130","192.168.52.131"] index => 'tt-%{+YYYY-MM-dd}' } } } #filebeat 配置 filebeat.prospectors: - input_type: log paths: - /opt/logs/server/nginx.log json.keys_under_root: true json.add_error_key: true json.message_key: log output.kafka: hosts: ["10.3.145.41:9092","10.3.145.42:9092","10.3.145.43:9092"] topic: 'nginx' # nginx 配置 log_format main '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sents":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}'; access_log /var/log/nginx/access.log main;
6、nginx error日志
数据链路
1、直接将本地的日志数据拉去到logstash当中
2、将日志进行处理后存储到es
input { file { type => "nginx-log" path => "/var/log/nginx/error.log" start_position => "beginning" } } filter { grok { match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'} } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } } output { if [type] == "nginx-log" { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'logstash-audit_log-%{+YYYY-MM-dd}' } } }
7、filebate 传输给 logstash
input { beats { port => 5000 } } filter { grok { match => {"message" => "%{IPV4:cip}"} } } output { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'test-%{+YYYY-MM-dd}' } stdout { codec => rubydebug } } filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log output.logstash: hosts: ["192.168.52.134:5000"]
filebeat 日志模板
filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log output.kafka: hosts: ["192.168.52.129:9092","192.168.52.130:9092","192.168.52.131:9092"] topic: haha partition.round_robin: reachable_only: true required_acks: 1
logstash 使用细节
logstash 具有 input filter output 3个配置区域 input 负责管理输入到logstash的数据 filter 负责进行过滤、处理 output 负责将数据发出
input 支持的常用字段如下:
filter 支持字段如下
-
date: 用于处理日期和时间的过滤器或插件。它可能用于解析、转换或格式化日期字段。 drop: 通常是一个过滤器,用于从数据流中删除特定的事件或字段。 grok: 通常指的是Logstash中的grok过滤器。这是一个强大的正则表达式解析器,用于解析和匹配文本字段中的模式。 geoip: 用于地理位置信息的插件或过滤器。它可以根据IP地址或其他标识符提供地理位置数据。 mutate: 一个通用的过滤器或插件,用于执行各种转换操作,如重命名、替换、删除或修改字段。 split: 一个过滤器或插件,用于将字段分割成多个部分,并可能将这些部分作为新的字段添加到数据中。 translate: 一个过滤器或插件,用于根据某种映射或查找表将字段值转换为其他值。 useragent: 用于解析用户代理字符串的插件或过滤器。用户代理字符串通常包含在HTTP请求头中,并提供了关于发出请求的浏览器或客户端的信息。 uuid: 用于生成或验证UUID(通用唯一标识符)的插件或过滤器。UUID是一种常用的标识符,用于唯一地标识信息。 xml: 用于解析XML数据的插件或过滤器。 json: 用于解析或生成JSON数据的插件或过滤器。 json-encode: 特定于将数据结构或数据编码为JSON格式的插件或过滤器。
output 常用字段如下:
logstash grok 预定义字段
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} INT (?:[+-]?(?:[0-9]+)) BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) NUMBER (?:%{BASE10NUM}) BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)) BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b POSINT \b(?:[1-9][0-9]*)\b NONNEGINT \b(?:[0-9]+)\b WORD \b\w+\b NOTSPACE \S+ SPACE \s* DATA .*? GREEDYDATA .* QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``)) UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} # Networking MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}) CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9]) IP (?:%{IPV6}|%{IPV4}) HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) HOST %{HOSTNAME} IPORHOST (?:%{HOSTNAME}|%{IP}) HOSTPORT %{IPORHOST}:%{POSINT} # paths PATH (?:%{UNIXPATH}|%{WINPATH}) UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+ TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+)) WINPATH (?>[A-Za-z]+:|\\)(?:\[^\\?*]*)+ URIPROTO [A-Za-z]+(\+[A-Za-z+]+)? URIHOST %{IPORHOST}(?::%{POSINT:port})? # uripath comes loosely from RFC1738, but mostly from what Firefox # doesn't turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+ #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-[]]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? # Months: January, Feb, 3, 03, 12, December MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b MONTHNUM (?:0?[1-9]|1[0-2]) MONTHNUM2 (?:0[1-9]|1[0-2]) MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) # Days: Monday, Tue, Thu, etc... DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) # Years? YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) # '60' is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} TZ (?:[PMCE][SD]T|UTC) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND} # Syslog Dates: Month Day HH:MM:SS SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} PROG (?:[\w._/%-]+) SYSLOGPROG %{PROG:program}(?:[%{POSINT:pid}])? SYSLOGHOST %{IPORHOST} SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} # Shortcuts QS %{QUOTEDSTRING} # Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} # Log Levels LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
USERNAME [a-zA-Z0-9._-]+:匹配用户名,可以包含字母、数字、点、下划线和短划线。 示例:alice_123 john.doe user-name
USER %{USERNAME}:使用USERNAME表达式的一个示例。 示例:user123 jdoe some_user
INT (?:[+-]?(?:[0-9]+)):匹配整数,可以包含正负号。 示例:123 -456 789
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:.[0-9]+)?)|(?:.[0-9]+))):匹配十进制数,包括小数。 示例:123 -456.789
NUMBER (?:%{BASE10NUM}):使用BASE10NUM表达式的一个示例。 示例:123 -456.789
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)):匹配十六进制数,可以包含正负号和0x前缀。 示例:0x1A -0x2F
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:.[0-9A-Fa-f]*)?)|(?:.[0-9A-Fa-f]+)))\b:匹配十六进制浮点数,可以包含正负号、0x前缀和小数部分。 示例:0x1A -0x2F.5
POSINT \b(?:[1-9][0-9]*)\b:匹配正整数。 示例:123 456
NONNEGINT \b(?:[0-9]+)\b:匹配非负整数。 示例:0 42
WORD \b\w+\b:匹配单词字符。 示例:hello world123
NOTSPACE \S+:匹配非空白字符。 示例:ThisIsSomeText
SPACE \s*:匹配空白字符。 示例:(空白)
DATA .*?:匹配任意字符(非贪婪模式)。 示例:Some data here.
*GREEDYDATA . **:匹配任意字符(贪婪模式)。 示例:This is a long text...
QUOTEDSTRING (?>(?<!\)(?>"(?>\.|[^\"]+)+"|""|(?>'(?>\.|[^\']+)+')|''|(?>(?>\\.|[^\\]+)+`)|``)):匹配带引号的字符串,可以是双引号、单引号或反引号。 示例:"Hello, world!" 'This is a test.'
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}:匹配 UUID(通用唯一标识符)。 示例:123e4567-e89b-12d3-a456-426655440000
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}):匹配各种格式的 MAC 地址,如 Cisco、Windows 和通用格式。 示例:00:1A:2B:3C:4D:5E AA-BB-CC-DD-EE-FF 001A2B3C4D5E
IPV6 ...:匹配 IPv6 地址。 示例:2001:0db8:85a3:0000:0000:8a2e:0370:7334
IPV4 ...:匹配 IPv4 地址。 示例:192.168.1.100
HOSTNAME ...:匹配主机名。 示例:example.com subdomain.example.net
HOSTPORT %{IPORHOST}:%{POSINT}:匹配主机和端口号的组合。 示例:example.com:8080 localhost:80
URIPATH ...:匹配 URI 路径部分。 示例:/path/to/resource
MONTH ...:匹配月份的简写和全称。 示例:Jan December
MONTHNUM ...:匹配月份的数字表示。 示例:01 12
DAY ...:匹配星期几的简写。 示例:Mon
YEAR ...:匹配年份。 示例:2023
HOUR ...:匹配小时。 示例:01 23
MINUTE ...:匹配分钟。 示例:05 50
SECOND ...:匹配秒数。 示例:15 59
DATE ...:匹配日期,包括多种格式。 示例:2023-08-13 08/13/2023
DATESTAMP_RFC822 ...:匹配 RFC 822 格式的日期时间。 示例:Mon, 15 Aug 2023 12:34:56 +0000
DATESTAMP_RFC2822 ...:匹配 RFC 2822 格式的日期时间。 示例:Tue, 16 Aug 2023 15:45:30 +0300
SYSLOGTIMESTAMP ...:匹配 Syslog 格式的日期时间。 示例:Aug 13 23:59:59
SYSLOGPROG ...:匹配 Syslog 格式的程序名和进程 ID。 示例:program[12345]
SYSLOGHOST ...:匹配 Syslog 格式的主机名。 示例:192.168.1.100
SYSLOGFACILITY ...:匹配 Syslog 格式的设备号和优先级。 示例:4.2
HTTPDATE ...:匹配 HTTP 格式的日期时间。 示例:13/Aug/2023:08:30:45 +0000
QS %{QUOTEDSTRING}:匹配带引号的字符串,同 QUOTEDSTRING。 示例:"Quoted text"
SYSLOGBASE ...:匹配 Syslog 基本格式,包括时间戳、设备号、主机名和程序名。 示例:Aug 13 23:59:59 hostname program[12345]:
COMMONAPACHELOG ...:匹配常见的 Apache 日志格式,包括客户端 IP、标识、认证、时间戳、请求、响应等信息。 示例:192.168.1.100 - user [13/Aug/2023:08:30:45 +0000] "GET /index.html HTTP/1.1" 200 1234
COMBINEDAPACHELOG ...:匹配综合的 Apache 日志格式,包括常见格式的信息以及引用和代理信息。 示例:192.168.1.100 - user [13/Aug/2023:08:30:45 +0000] "GET /index.html HTTP/1.1" 200 1234 "http://referrer.com" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"
LOGLEVEL ...:匹配日志级别,包括各种大小写和缩写形式。 示例:DEBUG WARN ERR ···
logstash 配置grok自定义表达式
1.将自定义的正则模式写入到一个文件中 [root@localhost ~]# mkdir /data/elk/logstash/regex/ [root@localhost ~]# vim /data/elk/logstash/regex/patterns ID [0-9]{3,6}$ 2.配置logstash [root@localhost ~]# cat /data/elk/logstash/conf.d/test.conf filter { grok { patterns_dir => "/data/elk/logstash/regex/patterns" #指定正则模式文件所在的路径 match => { "message" => "%{IP:client_ip} %{WORD:request_type} %{URIPATHPARAM:url} %{NUMBER:bytes} %{NUMBER:response_time} %{ID:id}" #增加上我们自定义的正则模式 } } } 3.重载logstash [root@localhost ~]# ps aux | grep logstash | grep -v grep | awk '{print $2}' |xargs kill -HUP-HUP
完整部署elk架构
安装软件 | 主机名 | IP地址 | 系统版本 | 配置 |
---|---|---|---|---|
Elasticsearch/Logstash/kibana | elk | 10.3.145.14 | rocky9 | 2核4G |
Elasticsearch | es1 | 10.3.145.56 | rocky9 | 2核3G |
Elasticsearch | es2 | 10.3.145.57 | rocky9 | 2核3G |
zookeeper/kafka/file beat | Kafka1 | 10.3.145.41 | rocky9 | 1核2G |
zookeeper/kafka | Kafka2 | 10.3.145.42 | rocky9 | 1核2G |
zookeeper/kafka | Kafka3 | 10.3.145.43 | rocky9 | 1核2G |
1、环境准备
[root@elk ~]# systemctl disable firewalld --now #关闭防火墙 [root@elk ~]# setenforce 0 #关闭selinux [root@elk ~]# sed -i 's/SELINUX=.*/SELINUX=disable/g' /etc/selinux/config #配置文件中永久关闭selinux [root@elk ~]# hwclock -s #时间同步 [root@elk ~]# vim /etc/hosts #域名解析 ------------------------------- 10.35.186.141 elk 10.35.186.11 es1 10.35.186.231 es2 10.35.186.143 kafka1 10.35.186.146 kafka2 10.35.186.147 kafka3 ------------------------------- [root@elk ~]# scp /etc/hosts es1:/etc/hosts [root@elk ~]# scp /etc/hosts es2:/etc/hosts [root@elk ~]# scp /etc/hosts kafka1:/etc/hosts [root@elk ~]# scp /etc/hosts kafka2:/etc/hosts [root@elk ~]# scp /etc/hosts kafka3:/etc/hosts
2、部署es集群,三台一样的操作
1、创建允许es的普通用户 [root@elk ~]# useradd es #创建用户,用于启动es集群 [root@elk ~]# echo "123" | passwd --stdin "es" #给es用户密码 2、安装配置es 将elasticsearch-7.13.2-linux-x86_64.tar.gz包上传到elk服务器中 [root@elk ~]# tar xvf elasticsearch-7.13.2-linux-x86_64.tar.gz -C /usr/local/ #解压到 [root@elk ~]# mv /usr/local/elasticsearch-7.13.2/ /usr/local/es #改名 [root@elk ~]# scp -r /usr/local/es/ es1:/usr/local/ [root@elk ~]# scp -r /usr/local/es/ es2:/usr/local/ [root@elk ~]# vim /usr/local/es/config/elasticsearch.yml ------------------------------- cluster.name: es-test #es集群名字 cluster.initial_master_nodes: ["elk","es1","es2"] #所有es节点ip node.name: elk #该es节点名字,es节点中名字不能重复 node.master: true #该节点是否符合主节点 node.data: true #该节点是否为数据目录 path.data: /data/elasticsearch/data #数据存储目录 path.logs: /data/elasticsearch/logs #日志存储目录 bootstrap.memory_lock: false #内存锁定,是否禁用交换分区 bootstrap.system_call_filter: false #系统调用过滤器 network.host: 0.0.0.0 #绑定节点ip http.port: 9200 #es集群端口 transport.tcp.port: 9300 #给其他es节点通信的ip discovery.seed_hosts: ["es1","es2"] #除本机外其他es集群节点ip discovery.zen.minimum_master_nodes: 2 #集群中可工作具有master节点资格的最小数量 discovery.zen.ping_timeout: 150s #节点发现过程中等待的时间 discovery.zen.fd.ping_retries: 10 #节点发现重试次数 client.transport.ping_timeout: 60s #检查连接是否存在,60s没响应代表断开 http.cors.enabled: true #是否允许跨源reat请求,用于允许head插件访问es http.cors.allow-origin: "*" #允许的源地址 ------------------------------- [root@elk ~]# scp -r /usr/local/es/config/elasticsearch.yml es1:/usr/local/es/config/elasticsearch.yml [root@elk ~]# scp -r /usr/local/es/config/elasticsearch.yml es2:/usr/local/es/config/elasticsearch.yml #修改部分 #-------------------------------------------- #node.name: elk #每个es节点的名字不一样 #discovery.seed_hosts: ["es1","es2"] #除了本机以外的es节点 #-------------------------------------------- 3、创建es数据及日志存储目录,赋权和设组 [root@elk ~]# mkdir -p /data/elasticsearch/{data,logs} #创建es数据和日志目录 [root@elk ~]# chown -R es.es /data/elasticsearch #授权 [root@elk ~]# chown -R es.es /usr/local/es #授权 4、设置JVM堆大小 #7.0默认为4G [root@elk ~]# sed -i 's/## -Xms4g/-Xms4g/' /usr/local/es/config/jvm.options [root@elk ~]# sed -i 's/## -Xmx4g/-Xmx4g/' /usr/local/es/config/jvm.options 5、系统优化 [root@elk ~]# echo "* soft nofile 65536" >> /etc/security/limits.conf #增加最大文件打开数 [root@elk ~]# echo "* soft nproc 65536" >> /etc/security/limits.conf #增加最大进程数 [root@elk ~]# echo "vm.max_map_count=262144" >> /etc/sysctl.conf #增加最大内存映射数 [root@elk ~]# sysctl -p 备用:#[root@elk ~]# vim /etc/security/limits.conff #上面三个已经够了,启动不起来的话再使用这个 #---------------------- * soft nofile 65536 * hard nofile 131072 * soft nproc 4096 * hard nproc 4096 #---------------------- 6、启动es [root@elk ~]# su es #es不允许用root用户启动,需要切换到普通用户启动 [es@elk ~]# /usr/local/es/bin/elasticsearch #前台启动,没问题执行下列命令,后台启动 [es@elk ~]# nohup /usr/local/es/bin/elasticsearch & 7、浏览器中分别访es集群ip:9200端口,都可以访问且cluster_uuid正常显示则代表es集群部署完成,就可以放到后台运行了
3、安装head监控插件(任意一台el服务器部署即可)
将node-v10.0.0-linux-x64.tar.gz包、elasticsearch-head-master.zip包和phantomjs-2.1.1-linux-x86_64.tar.bz2包上传到elk服务器中 1、安装node [root@elk ~]# tar xvf node-v10.0.0-linux-x64.tar.gz -C /usr/local/ [root@elk ~]# mv /usr/local/node-v10.0.0-linux-x64/ /usr/local/node [root@elk ~]# echo " NODE_HOME=/usr/local/node PATH=\$NODE_HOME/bin:\$PATH export NODE_HOME PATH " >>/etc/profile [root@elk ~]# source /etc/profile [root@elk ~]# node -v #可以看到版本号则代表部署成功 2、下载head插件 [root@elk ~]# unzip elasticsearch-head-master.zip [root@elk ~]# mv elasticsearch-head-master /usr/local/ [root@elk ~]# cd /usr/local/elasticsearch-head-master/ 3、安装grunt,启动head需要用到这个工具 [root@elk elasticsearch-head-master]# npm install -g grunt-cli #安装grunt-cli命令 [root@elk elasticsearch-head-master]# grunt -version #可以看到版本号则代表部署成功 4、修改head源码 [root@elk elasticsearch-head-master]# vim /usr/local/elasticsearch-head-master/Gruntfile.js +99 ---------------- keepalive: true, #在这一行后面加个,号 hostname: "*" #允许绑定本机的哪个ip ---------------- [root@elk elasticsearch-head-master]# vim /usr/local/elasticsearch-head-master/_site/app.js +4374 ---------------- this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://(将该部分修改为es集群的节点ip):9200"; #绑定哪个节点作为head插件的服务器 ---------------- 5、下载head必要的文件 wget https://github.com/Medium/phantomjs/releases/download/v2.1.1/phantomjs-2.1.1-linux-x86_64.tar.bz2 #下载head必要的插件,在GitHub上下载的,比较慢,压缩包也在elk课件中 [root@elk ~]# mkdir /tmp/phantomjs #为phantomjs创建一个目录,该文件只能放到这个目录下 [root@elk ~]# yum install -y bzip2 #下载bzip2软件,否则tar解压不了 [root@elk ~]# tar xvf phantomjs-2.1.1-linux-x86_64.tar.bz2 -C /tmp/phantomjs/ [root@elk ~]# chmod 777 /tmp/phantomjs -R 6、运行head [root@elk ~]# cd /usr/local/elasticsearch-head-master/ [root@elk elasticsearch-head-master]# npm install phantomjs-prebuilt@2.1.16 --ignore-scripts #取消版本限制 [root@elk elasticsearch-head-master]# npm install #安装 [root@elk elasticsearch-head-master]# grunt server #启动 [root@elk elasticsearch-head-master]# nohup grunt server & #启动没有报错则后台运行 7、测试访问 head虚拟机ip:9100端口
4、kibana部署
1、安装kibana 将kibana-7.13.2-linux-x86_64.tar.gz包上传到elk服务器中 [root@elk ~]# tar xvf kibana-7.13.2-linux-x86_64.tar.gz -C /usr/local/ 2、修改配置文件,可以直接删除配置文件的内容,使用如下即可 [root@elk ~]# vim /usr/local/kibana-7.13.2-linux-x86_64/config/kibana.yml -------------------------------- server.port: 5601 server.host: "elk" elasticsearch.hosts: ["http://elk:9200","http://es1:9200","http://es2:9200"] kibana.index: ".kibana" #kibana在elasticsearch中使用索引来存储保存的searches i18n.locale: "zh-CN" -------------------------------- 3、启动 [root@elk ~]# /usr/local/kibana-7.13.2-linux-x86_64/bin/kibana --allow-root #启动,允许root用户启动 [root@elk ~]# nohup /usr/local/kibana-7.13.2-linux-x86_64/bin/kibana --allow-root & #后台启动 4、访问kibana的ip:5601端口,可以访问到页面代表部署成功
5、kafka集群部署
1、安装Java环境 将jdk-8u211-linux-x64.tar.gz包和kafka_2.11-2.0.0.tgz包上传到elk服务器中 [root@kafka1 ~]# tar xvf jdk-8u211-linux-x64.tar.gz -C /usr/local/ [root@kafka1 ~]# mv /usr/local/jdk1.8.0_211/ /usr/local/java [root@kafka1 ~]# echo " JAVA_HOME=/usr/local/java PATH=\$JAVA_HOME/bin:\$PATH export JAVA_HOME PATH " >>/etc/profile [root@kafka1 ~]# source /etc/profile [root@kafka1 ~]# java -version [root@kafka1 ~]# scp -r /usr/local/java/ kafka2:/usr/local/ #所有kafka节点相同操作 [root@kafka1 ~]# scp -r /usr/local/java/ kafka3:/usr/local/ 2、安装kafka [root@kafka1 ~]# tar xvf kafka_2.11-2.0.0.tgz -C /usr/local/ [root@kafka1 ~]# mv /usr/local/kafka_2.11-2.0.0/ /usr/local/kafka [root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.properties -------------------------------- dataDir=/opt/data/zookeeper/data #zookeeper数据存放目录 dataLogDir=/opt/data/zookeeper/logs #日志存放目录 clientPort=2181 #客户端连接zookeeper服务的端口 tickTime=2000 #zookeeper服务器与客户端之间维持心跳的时间间隔 initLimit=20 #允许follower连接并同步到leader的初始化连接时间 syncLimit=10 #leader与follower之间发送消息时,请求和应答时间长度 server.1=kafka1:2888:3888 #2888是follower与leader交换信息的端口 server.2=kafka2:2888:3888 #3888是当leader挂了时用来执行选举时服务器相互通信的端口 server.3=kafka3:2888:3888 -------------------------------- [root@kafka1 ~]# mkdir -p /opt/data/zookeeper/{data,logs} [root@kafka1 ~]# echo 1 > /opt/data/zookeeper/data/myid #每个kafka节点数字和配置server.*对应 [root@kafka1 ~]# vim /usr/local/kafka/config/server.properties -------------------------------- broker.id=1 #每个kafaka节点ip不同 listeners=PLAINTEXT://kafka1:9092 #监听地址 num.network.threads=3 #接受和发送网络信息的线程数 num.io.threads=8 #服务器用于处理请求的线程数,其中可能包括磁盘i/o socket.send.buffer.bytes=102400 #套接字服务器使用的发送缓冲区(SO_SNDBUF) socket.receive.buffer.bytes=102400 #套接字服务器使用的接受缓冲区(SO_RCVBUF) socket.request.max.bytes=104857600 #套接字服务器将接受的请求最大大小(防止OOM) log.dirs=/opt/data/kafka/logs #日志目录 num.partitions=6 #partition数量 num.recovery.threads.per.data.dir=1 #在启动时恢复日志,关闭时刷盘日志每个数据目录的线程数量 offsets.topic.replication.factor=2 #偏移量话题的复制因子(设置更保证高可用) transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 #日志文件删除之前保留的时间(单位小时) log.segment.bytes=536870912 #单个日志文件的大小,默认1073741824字节 log.retention.check.interval.ms=300000 #检查日志段以查看是否可以根据保留策略删除他们的时间间隔 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 #zookeeper主机地址 zookeeper.connection.timeout.ms=6000 #连接到zookeeper的超时时间 group.initial.rebalance.delay.ms=0 #减少消费者刚启动或所有成员都离开时,由于网络延迟或配置导致的快速重新平衡 -------------------------------- [root@kafka1 ~]# scp -r /usr/local/kafka/ kafka2:/usr/local/ [root@kafka1 ~]# scp -r /usr/local/kafka/ kafka3:/usr/local/ #只需要修改server.properties配置即可 [root@kafka3 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties #前台启动 [root@kafka1 ~]# nohup /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #后台启动 [root@kafka3 ~]# yum install -y nmap #插件测试 [root@kafka3 ~]# echo conf | nc 127.0.0.1 2181 #可以看到配置即可 [root@kafka3 ~]# echo stat | nc 192.168.253.140 2181 #查看所有zookeeper状态 5、测试kafka是否部署成功 [root@kafka3 ~]# cd /usr/local/kafka [root@kafka3 kafka]# bin/kafka-server-start.sh config/server.properties #前台启动 [root@kafka3 kafka]# nohup bin/kafka-server-start.sh config/server.properties & #后台启动 [root@kafka1 kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiaming #如果返回Created topic "jiaming".则代表创建成功 [root@kafka1 kafka]# bin/kafka-topics.sh --zookeeper kafka1:2181 --list #在任意节点测试队列是否创建成功,输出jiaming代表创建成功 6、模拟消息生产和消费 [root@kafka1 kafka]# bin/kafka-console-producer.sh --broker-list kafka2:9092 --topic jiaming #模拟生产,出现>后输入消息 [root@kafka2 kafkabin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic jiaming --from-beginninging #模拟输出,在生产环境中输入消息后,输出环境将会产生生产环境的消息
6、logstash部署
1、安装logstash 将logstash-7.13.2-linux-x86_64.tar.gz包上传到elk服务器中 [root@elk ~]# tar xvf logstash-7.13.2-linux-x86_64.tar.gz -C /usr/local/ [root@elk ~]# mv /usr/local/logstash-7.13.2/ /usr/local/logstash [root@elk ~]# cd /usr/local/logstash/ [root@elk logstash]# mkdir conf [root@elk logstash]# vim conf/filebeat.yml ------------------------------------- input { kafka { type => "audit_log" #type指定了事件的类型,类型将在后续的处理和输出中被引用 codec => "json" #意味着从kafka读取的数据将被当作json格式处理 topics => "access-nginx.log" #指定kafka中的主题,logstash将从这个主题读取数据 #decorate_events => true #enable_auto_commit => true auto_offset_reset => "earliest" #如果logstash没有从kafka中读取到任何偏移量,将从最早的可用消息开始读取 bootstrap_servers => ["kafka1:9092","kafka2:9092","kafka3:9092"] #kafka集群的ip:端口 } } filter { grok { #使用正则表达式解析日志消息 match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}"} } date { #尝试从事件中的timestamp字段解析日期,并设置其格式为下列格式 match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } geoip { #根据ip地址执行geoip,以添加地址位置信息到事件中 source => "lan_ip" } } output { if [type] == "audit_log" { #if条件判断,只有当事件的type字段等于audit_log时才会执行下面的输出配置 stdout { #将事件输出到标准输出,使用rubydebug编解码器以易于阅读的格式显示 codec => rubydebug } elasticsearch { #将事件发送到elastic search集群 hosts => ["elk:9200","es1:9200","es2:9200"] index => 'nginx-access-%{+YYYY-MM-dd}' } } } ------------------------------------- [root@elk logstash]# ./bin/logstash -f conf/filebeat.yml
7、部署file beat
1、安装nginx,收集日志 [root@kafka1 kafka]# yum install -y nginx [root@kafka1 kafka]# vim /etc/nginx/nginx.conf #将日志格式改为这种 -------------------------------------------------------------- log_format main '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sents":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}'; -------------------------------------------------------------- [root@kafka1 kafka]# nginx 2、安装file beat [root@kafka1 kafka]# curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.2-x86_64.rpm [root@kafka1 kafka]# yum install -y filebeat-7.13.2-x86_64.rpm 3、修改配置文件 [root@kafka1 kafka]# cd /etc/filebeat [root@kafka1 filebeat]# vim filebeat.yml -------------------------------- #将Filebeat inputs区域中 #- type: log下的 enabled: false 修改为 enable: true #paths:下的 - /var/log/*.log 修改为 /var/log/nginx/access.log #将Elasticsenarch Output区域中 output.elasticsearch: 修改为 #output.elasticsearch: hosts: ["localhost:9200"] 修改为 #hosts: ["localhost:9200"] #在Logstash Output区域的最下方添加 output.kafka: hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"] #将file beat的数据发送到kafka集群的ip:端口 topic: 'access-nginx.log' #jfile beat的哪个文件 #在配置文件的最下方添加,不加会运行不起来 seccomp: default_action: allow syscalls: - action: allow names: - rseq -------------------------------- 4、启动file beat [root@kafka1 filebeat]# filebeat -e -c filebeat.yml [root@kafka1 filebeat]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper kafka1:2181 --list #查看nginx-access.log
更多推荐
所有评论(0)