logstash to elasticsearch 踩坑记
[2021-10-21T23:40:21,387][WARN ][logstash.outputs.elasticsearch][main] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreach
1、Marking url as dead 问题排查
[2021-10-21T23:40:21,387][WARN ][logstash.outputs.elasticsearch][main] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsea
rch Unreachable: [http://xxxxxx:xxxxxx@192.168.2.120:9200/][Manticore::SocketTimeout] Read timed out {:url=>http://xxxxxx:xxxxxx@192.168.2.120:9200/, :error_message=>"Elasticsearch Un
reachable: [http://xxxxxx:xxxxxx@192.168.2.120:9200/][Manticore::SocketTimeout] Read timed out", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}
[2021-10-21T23:40:21,388][ERROR][logstash.outputs.elasticsearch][main] Attempted to send a bulk request to elasticsearch' but Elasticsearch appears to be unreachable or down! {:error_messag
e=>"Elasticsearch Unreachable: [http://xxxxxx:xxxxxx@192.168.2.120:9200/][Manticore::SocketTimeout] Read timed out", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnre
achableError", :will_retry_in_seconds=>2}
[2021-10-21T23:40:23,401][ERROR][logstash.outputs.elasticsearch][main] Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perh
aps Elasticsearch is unreachable or down? {:error_message=>"No Available connections", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError", :will_retry_
in_seconds=>4}
[2021-10-21T23:40:23,566][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://xxxxxx:xxxxxx@192.168.2.120:9200/"}
[2021-10-21T23:55:45,795][WARN ][logstash.outputs.elasticsearch][main] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsea
rch Unreachable: [http://xxxxxx:xxxxxx@192.168.2.120:9200/][Manticore::SocketTimeout] Read timed out {:url=>http://xxxxxx:xxxxxx@192.168.2.120:9200/, :error_message=>"Elasticsearch Un
reachable: [http://xxxxxx:xxxxxx@192.168.2.120:9200/][Manticore::SocketTimeout] Read timed out", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}
[2021-10-21T23:55:45,795][ERROR][logstash.outputs.elasticsearch][main] Attempted to send a bulk request to elasticsearch' but Elasticsearch appears to be unreachable or down! {:error_messag
e=>"Elasticsearch Unreachable: [http://xxxxxx:xxxxxx@192.168.2.120:9200/][Manticore::SocketTimeout] Read timed out", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnre
achableError", :will_retry_in_seconds=>2}
[2021-10-21T23:55:47,806][ERROR][logstash.outputs.elasticsearch][main] Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perh
aps Elasticsearch is unreachable or down? {:error_message=>"No Available connections", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError", :will_retry_
in_seconds=>4}
[2021-10-21T23:55:48,776][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://xxxxxx:xxxxxx@192.168.2.120:9200/"}
[2021-10-22T01:22:21,932][WARN ][logstash.runner ] SIGTERM received. Shutting down.[2021-10-22T01:22:22,286][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-3, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-3-64c64126-7aa4-4c4f-abfd-5adab1e4f92d sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,286][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-7, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-7-cee59e6e-a168-49df-b82b-28714c9cd63b sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,287][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-4, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-4-d8b42f2f-664b-47e5-a721-f044aafdbd8f sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,288][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-6, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-6-1a7a7683-c2cd-43e8-bf06-df32588f9c32 sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,288][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-13, groupId=appsflyer_simplified_to_
es_v2] Member wechat_info_to_es-13-9563b4c1-ccc8-4be6-8194-67a9bf110dfe sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,290][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-5, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-5-45932c5d-7dd9-4058-9497-a4ecf7b0ef91 sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,291][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-9, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-9-f780b7e0-ac0e-4aef-b83e-2637d8be34cf sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,291][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-8, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-8-ca6c115f-36e8-412c-908e-2a9ad27e9112 sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,293][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-2, groupId=appsflyer_simplified_to_e
s_v2] Member wechat_info_to_es-2-cb953ada-87af-40d5-9e27-6bd3efa05723 sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,294][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-14, groupId=appsflyer_simplified_to_
es_v2] Member wechat_info_to_es-14-835d5a91-dac3-4399-a108-9a127ab9e6a8 sending LeaveGroup request to coordinator 192.168.98.100:9092 (id: 2147483642 rack: null)
[2021-10-22T01:22:22,710][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-1, groupId=appsflyer_simplified_to_e
s_v2] Attempt to heartbeat failed since group is rebalancing
[2021-10-22T01:22:22,861][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-12, groupId=appsflyer_simplified_to_
es_v2] Attempt to heartbeat failed since group is rebalancing
[2021-10-22T01:22:23,011][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=wechat_info_to_es-10, groupId=appsflyer_simplified_to_
es_v2] Attempt to heartbeat failed since group is rebalancing
通过日志可以看出可能是 es 写入超时,导致数据无法写入,如此反复,又导致 kafka 触发 rebalance 操作。
如果是由于业务数据量过大,可对 logstash 写入进行限流,修改配置 config/logstash.yml:
降低 pipeline 中的 workers 的量,由原来的 32 -> 16, batch size 修改 1000 -> 500, delay 修改 200 -> 1000, output.workers: 16 -> 8。
pipeline:
workers: 32
batch:
size: 200
delay: 100
output:
workers: 8
unsafe_shutdown: false
queue:
type: persisted
page_capacity: 250mb
max_events: 10000
max_bytes: 1gb
checkpoint:
acks: 10000
writes: 10000
interval: 1000
如果是 es 集群节点异常,需对 es 异常节点进行处理。
2、Validation Failed: 1: this action would add [6] shards, but this cluster currently has [5000]/[5000] maximum normal shards open;
[2022-05-06T02:46:07,469][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"game-loginlog-2020.11.18", :
_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x657817af>], :response=>{"index"=>{"_index"=>"user-2020.11.18", "_type"=>"_doc", "_id"=>nil, "status"=>400, "error"=>{"type"=>"ill
egal_argument_exception", "reason"=>"Validation Failed: 1: this action would add [6] shards, but this cluster currently has [5000]/[5000] maximum normal shards open;"}}}}
由于 elasticsearch.yml 配置了每个集群节点的最大分片数,当集群索引分片数超过该限制时,则会写入失败。解决办法为修改集群最大节点数配置。
查看集群当前分片数:
GET /_cat/health?v
epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1652063713 02:35:13 my-elastic green 10 5 5000 2500 0 0 0 0 - 100.0%
可以看出集群的总分片数已经达到了5000
【临时方案】
修改 max_shards_per_node 参数:
PUT /_cluster/settings
{
"transient": {
"cluster": {
"max_shards_per_node":1200
}
}
}
查看集群配置:
GET /_cluster/settings?pretty
{
"persistent" : { },
"transient" : {
"cluster" : {
"max_shards_per_node" : "1200"
}
}
}
【永久方案】
修改 elasticsearch.yml配置,然后重启。或者定期清除不需要的索引数据。
修改前:
cluster.max_shards_per_node: 1000
修改后:
cluster.max_shards_per_node: 1200
3、logstash to elasticsearch,数据未持久化到本地。
logstash 开启了 persistent queue,但是发生了 400 错误状态,未持久化到本地,而是直接丢弃。
具体问题为 上一个问题2 导致。通过查询 es 官方文档:
也就是说 400/404 错误码是不可重试的状态,如 mapping 映射错误、索引创建失败等,是不会触发持久化配置的,它属于 dead letter queue,我们需要开启 dead_letter_queue:
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
dead_letter_queue.enable: true
dead_letter_queue 的数据默认保存在 path.data/dead/letter_queue/ 目录下,我们也可以进行修改,但是需要注意不能对不同的 logstash 实例配置相同的 dead_letter_queue 路径。
path.dead_letter_queue: "path/to/data/dead_letter_queue"
当我们需要处理在 dead letter queue 中的数据时,只需要使用 dead_letter_queue input plugin 即可。具体参数见官网说明。
input {
dead_letter_queue {
path => "/path/to/data/dead_letter_queue"
commit_offsets => true
pipeline_id => "main"
}
}
output {
stdout {
codec => rubydebug { metadata => true }
}
}
注意:
- 目前 logstash dead_letter_queue input plugin 只支持 output 为 elasticsearch 的 dead letter queue。
更多推荐
所有评论(0)