本文实现的是一个某个时间段nginx客户端IP访问次数告警功能

环境:

filebeat将nginx日志发送到ELK

python通过链接Elasticsearch获取数据

python不太好,写的比较渣,大概意思如下:

1、连接ES,需要安装elasticsearch模块,pip install elasticsearch ,这里的ES没有密码,所以使用了warnings模块屏蔽不安全链接的告警;

2、定义一个现在时间变量和十五分钟前时间变量,对应ES中的时间应该是-8小时;

3、定义查询语句,也就是代码中的 es查询规则;

4、定义一个函数client_ip_number,参数body,body就是es规则名称,函数中的功能是根据es查询语句,提取出客户端IP地址和IP地址对应的访问次数,如果次数大于2000则输出;

5、最后调用函数给定参数es_js。

# coding=utf-8

from elasticsearch import Elasticsearch
import time
import datetime
import warnings


'''
连接Elasticsearch
'''
es = Elasticsearch(["114.55.124.67:625"])
warnings.filterwarnings("ignore")

'''
搜索十五分钟内的信息
注意:时区为-8小时
'''
# 获取当前时间
t_new = datetime.datetime.now()
# 格式化时间为es时间戳格式
t_new2 = (t_new-datetime.timedelta(hours=8)).strftime('%Y-%m-%d') + "T" + (t_new-datetime.timedelta(hours=8)).strftime('%H:%M:%S') + ".000Z"
# 获取8小时15分钟前时间
t2 = (t_new-datetime.timedelta(hours=8, minutes=15)).strftime('%Y-%m-%d %H:%M:%S')
#转换为秒级时间戳
ts2 = time.mktime(time.strptime(t2, '%Y-%m-%d %H:%M:%S'))
# 格式化时间为es时间戳格式
t3 = (time.strftime('%Y-%m-%d' , time.localtime(ts2)) + "T" +  time.strftime('%H:%M:%S', time.localtime(ts2))) + ".000Z"


'''
es查询规则
'''
es_js = {
  "aggs": {
    "86cde0c2-9312-4698-bc61-1097cb8e2af8": {
      "terms": {
        "field": "http_x_forwarded_for.keyword",
        "order": {
          "_count": "desc"
        },
        "size": 10
      }
    }
  },
  "size": 0,
  "fields": [
    {
      "field": "@timestamp",
      "format": "date_time"
    }
  ],
  "script_fields": {},
  "stored_fields": [
    "*"
  ],
  "runtime_mappings": {},
  "_source": {
    "excludes": []
  },
  "query": {
    "bool": {
      "must": [],
      "filter": [
        {
          "match_all": {}
        },
        {
          "match_all": {}
        },
        {
          "bool": {
            "minimum_should_match": 1,
            "should": [
              {
                "match_phrase": {
                  "log.file.path.keyword": "/data/nginx/logs/js/js.access.log"
                }
              },
            ]
          }
        },
        {
          "range": {
            "@timestamp": {
              "gte": t3,
              "lte": t_new2,
              "format": "strict_date_optional_time"
            }
          }
        }
      ],
      "should": [],
      "must_not": [
        {
          "match_phrase": {
            "method.keyword": "HEAD"
          }
        },
        {
          "match_phrase": {
            "http_x_forwarded_for.keyword": "\"-\""
          }
        }
      ]
    }
  }
}


def client_ip_number(body):
    print(body)
    """
    因为有多种格式的es查询,所以将此段代码编写为函数
    执行查询
    index为索引
    """
    # 索引后缀为当天日期
    to_day = t_new.strftime('%Y.%m.%d')
    query = es.search(index="logstash-nginx_log-" + to_day, body=body)

    '''
    字典取值
    列表遍历,取出大于指定值的ip
    '''
    aaa = query['aggregations']
    bbb = aaa['86cde0c2-9312-4698-bc61-1097cb8e2af8']
    ccc = bbb['buckets']


    for value1 in ccc:
        dict2 = (value1['key'], value1['doc_count'])
        client_number = (dict2[1])
        client_ip = (dict2[0])
        if client_number > 2000:
            print('客户端IP:' + str(client_ip) + ' 十五分钟内访问次数:' + str(client_number) + '\r')
        else:
            continue



client_ip_number(es_js)




上面的py写的很一般,于我而言es查询语句才是难点,因为不会。。。

我是怎么投机取巧的呢?如下:

1、打开kibana

2、制作图表,也就是获取客户端排名的图

 3、保存后回到查看页面如图点击,

视图改选为请求,点击请求就会出现查询语句,复制粘贴到代码中即可。。。

注意更改"gte": t3,  "lte": t_new2, 这两个时间为上面编写的时间变量

   以上就是我这次使用python调用ES的完整配置,还可以添加通过企业微信发送输出信息,便于知道是否有人恶意访问公司网站,当然这是被动的选择。

  还可以加入淘宝地址库,直接查询出客户端IP地址归属地,因为有些服务是ToB的,可能存在很多人同时使用一个公网出口访问网站。

  另外安装完elasticsearch模块后调用Elasticsearch时说找不到,最后发现是我自己的脚本名称定义成了elasticsearch.py 。。。改成其他的就行了。

  最后,有没有人能告诉我代码中 aaa、bbb、ccc这三个递归字典应该咋写高大上一点,哭唧唧。。。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐