python链接Elasticsearch
本文实现的是一个某个时间段nginx客户端IP访问次数告警功能环境:filebeat将nginx日志发送到ELKpython通过链接Elasticsearch获取数据python不太好,写的比较渣,大概意思如下:1、连接ES,这里的ES没有密码,所以使用了warnings模块屏蔽不安全链接的告警;2、定义一个现在时间变量和十五分钟前时间变量,对应ES中的时间应该是-8小时;3、定义查询语句,也就是
本文实现的是一个某个时间段nginx客户端IP访问次数告警功能
环境:
filebeat将nginx日志发送到ELK
python通过链接Elasticsearch获取数据
python不太好,写的比较渣,大概意思如下:
1、连接ES,需要安装elasticsearch模块,pip install elasticsearch ,这里的ES没有密码,所以使用了warnings模块屏蔽不安全链接的告警;
2、定义一个现在时间变量和十五分钟前时间变量,对应ES中的时间应该是-8小时;
3、定义查询语句,也就是代码中的 es查询规则;
4、定义一个函数client_ip_number,参数body,body就是es规则名称,函数中的功能是根据es查询语句,提取出客户端IP地址和IP地址对应的访问次数,如果次数大于2000则输出;
5、最后调用函数给定参数es_js。
# coding=utf-8
from elasticsearch import Elasticsearch
import time
import datetime
import warnings
'''
连接Elasticsearch
'''
es = Elasticsearch(["114.55.124.67:625"])
warnings.filterwarnings("ignore")
'''
搜索十五分钟内的信息
注意:时区为-8小时
'''
# 获取当前时间
t_new = datetime.datetime.now()
# 格式化时间为es时间戳格式
t_new2 = (t_new-datetime.timedelta(hours=8)).strftime('%Y-%m-%d') + "T" + (t_new-datetime.timedelta(hours=8)).strftime('%H:%M:%S') + ".000Z"
# 获取8小时15分钟前时间
t2 = (t_new-datetime.timedelta(hours=8, minutes=15)).strftime('%Y-%m-%d %H:%M:%S')
#转换为秒级时间戳
ts2 = time.mktime(time.strptime(t2, '%Y-%m-%d %H:%M:%S'))
# 格式化时间为es时间戳格式
t3 = (time.strftime('%Y-%m-%d' , time.localtime(ts2)) + "T" + time.strftime('%H:%M:%S', time.localtime(ts2))) + ".000Z"
'''
es查询规则
'''
es_js = {
"aggs": {
"86cde0c2-9312-4698-bc61-1097cb8e2af8": {
"terms": {
"field": "http_x_forwarded_for.keyword",
"order": {
"_count": "desc"
},
"size": 10
}
}
},
"size": 0,
"fields": [
{
"field": "@timestamp",
"format": "date_time"
}
],
"script_fields": {},
"stored_fields": [
"*"
],
"runtime_mappings": {},
"_source": {
"excludes": []
},
"query": {
"bool": {
"must": [],
"filter": [
{
"match_all": {}
},
{
"match_all": {}
},
{
"bool": {
"minimum_should_match": 1,
"should": [
{
"match_phrase": {
"log.file.path.keyword": "/data/nginx/logs/js/js.access.log"
}
},
]
}
},
{
"range": {
"@timestamp": {
"gte": t3,
"lte": t_new2,
"format": "strict_date_optional_time"
}
}
}
],
"should": [],
"must_not": [
{
"match_phrase": {
"method.keyword": "HEAD"
}
},
{
"match_phrase": {
"http_x_forwarded_for.keyword": "\"-\""
}
}
]
}
}
}
def client_ip_number(body):
print(body)
"""
因为有多种格式的es查询,所以将此段代码编写为函数
执行查询
index为索引
"""
# 索引后缀为当天日期
to_day = t_new.strftime('%Y.%m.%d')
query = es.search(index="logstash-nginx_log-" + to_day, body=body)
'''
字典取值
列表遍历,取出大于指定值的ip
'''
aaa = query['aggregations']
bbb = aaa['86cde0c2-9312-4698-bc61-1097cb8e2af8']
ccc = bbb['buckets']
for value1 in ccc:
dict2 = (value1['key'], value1['doc_count'])
client_number = (dict2[1])
client_ip = (dict2[0])
if client_number > 2000:
print('客户端IP:' + str(client_ip) + ' 十五分钟内访问次数:' + str(client_number) + '\r')
else:
continue
client_ip_number(es_js)
上面的py写的很一般,于我而言es查询语句才是难点,因为不会。。。
我是怎么投机取巧的呢?如下:
1、打开kibana
2、制作图表,也就是获取客户端排名的图
3、保存后回到查看页面如图点击,
视图改选为请求,点击请求就会出现查询语句,复制粘贴到代码中即可。。。
注意更改"gte": t3, "lte": t_new2, 这两个时间为上面编写的时间变量
以上就是我这次使用python调用ES的完整配置,还可以添加通过企业微信发送输出信息,便于知道是否有人恶意访问公司网站,当然这是被动的选择。
还可以加入淘宝地址库,直接查询出客户端IP地址归属地,因为有些服务是ToB的,可能存在很多人同时使用一个公网出口访问网站。
另外安装完elasticsearch模块后调用Elasticsearch时说找不到,最后发现是我自己的脚本名称定义成了elasticsearch.py 。。。改成其他的就行了。
最后,有没有人能告诉我代码中 aaa、bbb、ccc这三个递归字典应该咋写高大上一点,哭唧唧。。。
更多推荐
所有评论(0)