Kafka:消息队列 Kafka 构建的高吞吐量、高可扩展性的分布式消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等场景,是大数据生态中不可或缺的产品之一。

Elasticsearch:搜索与分析引擎,致力于数据库加速、数据分析、信息检索、智能运维监控等场景服务。适用于写入 TPS 较高、写入流量波动较大和搜索 QPS 较低的时序数据分析场景,例如日志检索分析、Metric 监控分析、IoT 智能硬件数据收集及监控分析等。

===========================================

安装配置 nginx  
步骤1 安装 nginx:  yum -y install nginx
步骤2 修改 nginx 配置文件中的日志格式:
vim /etc/nginx/nginx.conf
设置日志打印格式如下:
log_format main '[\"$remote_addr\",\"$remote_user\",\"$time_iso8601\",\"$request\"'
',\"$status\",\"$body_bytes_sent\",\"$http_referer\"'
',\"$http_user_agent\",\"$http_x_forwarded_for\"]';

user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;

include /usr/share/nginx/modules/*.conf;

events {
    worker_connections 1024;
}
http {
    log_format main '[\"$remote_addr\",\"$remote_user\",\"$time_iso8601\",\"$request\"'
',\"$status\",\"$body_bytes_sent\",\"$http_referer\"'
',\"$http_user_agent\",\"$http_x_forwarded_for\"]';
    access_log /var/log/nginx/access.log main;
    
}
步骤3 启动 nginx,并检查启动状态。
/usr/sbin/nginx
ps -ef|grep nginx

============================================

安装配置 filebeat
步骤1 安装 filebeat
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.10.0-
x86_64.rpm
rpm -ivh filebeat-7.10.0-x86_64.rpm
编辑配置文件:vim /etc/filebeat/filebeat.yml,同步 nginx 日志到 kafka
步骤2 修改 filebeat.inputs 配置项。
将第 24 行的 false 修改为 true。在第 28 行设置 nginx 的日志完整路径
/var/log/nginx/access.log。

filebeat inputs

- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
步骤3 修改 Elasticsearch Output 配置项。
将 176 行至 186 行的内容注释掉。
步骤4 确认 Logstash Output 配置项。
确认 189 行至 201 行的内容已全部注释掉。
步骤5 插入 Kafka 等配置项
# ------------------------------ Kafka Output ---------------------------------
output.kafka:
hosts: ["192.168.30.209:9092","192.168.30.210:9092","192.168.30.208:9092"]
topic: nginx_log
version: 0.10.2
步骤6 修改 Processors 配置项。
注释掉原有内容,插入:
-
drop_fields:
fields: ["ecs","input","host","log","agent"]
步骤7 启动 filebeat:
service filebeat start
===========================================

安装压测工具
步骤1 安装 httpd-tools。
yum -y install httpd-tools
步骤2 测试访问 nginx:
ab -n100 -c10 http://localhost/
步骤3 登录 Kafka 控制台,检查消息是否已同步至 Kafka

==========================================

Elasticsearch创建索引模板
步骤1 创建索引模板。
命名模板名称为 nginx-log-template。
输入索引模式:nginx-* 和 nginx*
Settings 配置如下:
{
"index.mapping.total_fields.limit": "3000",
"index.translog.flush_threshold_size": "2gb",
"index.number_of_replicas": "1",
"index.translog.sync_interval": "100s",
"index.refresh_interval": "60s",
"index.translog.durability": "async",
"index.merge.policy.segments_per_tier": "10",
"index.routing.allocation.total_shards_per_node": "200",
"index.merge.policy.max_merged_segment": "512m",
"index.number_of_shards": "48"
}

==========================================

flink 解析Kafka到ES
CREATE TEMPORARY TABLE `kafka_table` (
msg varchar
) WITH (
'connector' = 'kafka',
'topic' = 'nginx_log',
'properties.bootstrap.servers' = 'xxxxx:9092,xxxxx:9092,xxxxxx:9092',
'properties.group.id' = 'point2',
'format' = 'raw'
);
CREATE TEMPORARY TABLE es_sink (
client VARCHAR,
users VARCHAR,
access_time TIMESTAMP,
request VARCHAR,
status int,
body_bytes_sent int,
http_referer VARCHAR,
http_user_agent VARCHAR,
http_x_forwarded_for VARCHAR,
`@timestamp` TIMESTAMP
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://xxxxxxxxxxx:9200',
'index' = 'nginx-{access_time|yyyy.MM.dd}',
'username' ='elastic',
'password' ='***'
);
INSERT into es_sink
select
json_value(json_value(msg,'$.message'),'$[0]'),
json_value(json_value(msg,'$.message'),'$[1]'),
cast(DATE_FORMAT(REGEXP_REPLACE(json_value(json_value(msg,'$.messag
e'),'$[2]'),'T',' '),'yyyy-MM-dd HH:mm:ss') as timestamp),
json_value(json_value(msg,'$.message'),'$[3]'),
cast(json_value(json_value(msg,'$.message'),'$[4]') as int),
cast(json_value(json_value(msg,'$.message'),'$[5]') as int),
json_value(json_value(msg,'$.message'),'$[6]'),
json_value(json_value(msg,'$.message'),'$[7]'),
json_value(json_value(msg,'$.message'),'$[8]'),
cast(DATE_FORMAT(REGEXP_REPLACE(json_value(json_value(msg,'$.messag
e'),'$[2]'),'T',' '),'yyyy-MM-dd HH:mm:ss') as timestamp)
from kafka_table;

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐