go语言批量bulk操作elasticsearch8报错解决:The bulk request must be terminated by a newline [\n],es8bulk批量插入问题解决
go语言批量bulk操作elasticsearch8报错解决The bulk request must be terminated by a newline [\n]问题
最近在用go做项目的时候,需要批量插入数据到es8进行测试。传统的单条数据插入太慢,效率太低,再看es8官网发现,es8其实是支持批量bulk操作的。详情请见es官网bulk操作
于是用go简单的实现了一下代码
func TestEsBulk(t *testing.T) {
AuctionTime := time.Now().UnixNano() / 1e6
BidTime := AuctionTime + 1800
TargetUrl := "http//192.168.56.103:9200/home_item_emb/_bulk"
for i := 0; i < 20; i++ {
RequestBodyArray := make([]interface{}, 0, 1000)
for j := 0; j < 500; j++ {
SingleItemData, SingleItemIndexData := GenerateBulkData(AuctionTime, BidTime)
RequestBodyArray = append(RequestBodyArray, SingleItemIndexData)
RequestBodyArray = append(RequestBodyArray, SingleItemData)
}
BulkData := BulkData{
Data: RequestBodyArray,
}
RequestBody, _ := json.Marshal(BulkData)
result, err := utils.FastHttpRequest(TargetUrl, RequestBody, "POST") //
if err == nil {
fmt.Printf("<----------------------------第%v组数据插入成功,已完成%v条数据插入---------------------------->", i+1, (i+1)*500)
}
}
}
从这段代码中可以知道我使用的普通的json数据格式,所以接下来es bulk操作响应返回就报错了:
map[error:map[reason:The bulk request must be terminated by a newline [\n] root_cause:[map[reason:The bulk request must be terminated by a newline [\n] type:illegal_argument_exception]] type:illegal_argument_exception] status:400]
大概的意思就是,bulk请求的时候,必须以换行符(\n)结束。这可让人一头雾水,然后我仔细看es8官网的bulk的相关文档,其中有一行引起了我的注意:
The actions are specified in the request body using a newline delimited JSON (NDJSON) structure
大概的意思就是,请求体是一种叫NDJSON的结构,然后搜索了一下什么是NDJSON。
-
行分隔符是 '\n'
这意味着 '\r\n' 也受支持,因为在解析 JSON 值时会忽略尾随空格。 -
每一行都是一个有效的JSON值
最常见的值是对象或数组,但允许使用任何 JSON 值。
看到这我相信大家也就清楚了,于是我想了一个办法就是,将批量操作的每一条数据先转成字符串,并在后面加上 \n 换行符,将最后所有的拼接成一个大字符串,因为go Fasthttp库请求体是[]byte 类型 ,最后将整个字符串转成 []byte 就可以啦 。以下新改的批量插入代码。
func TestEsBulk(t *testing.T) {
AuctionTime := time.Now().UnixNano() / 1e6
BidTime := AuctionTime + 1800
TargetUrl := "http//192.168.56.103:9200/home_item_emb/_bulk"
for i := 0; i < 10; i++ {
RequestBody := ""
for j := 0; j < 500; j++ {
SingleItemData, SingleItemIndexData := GenerateBulkData(AuctionTime, BidTime)
SingleItemIndexDataStr, _ := json.Marshal(SingleItemIndexData)
SingleItemDataStr, _ := json.Marshal(SingleItemData)
RequestBody = RequestBody + string(SingleItemIndexDataStr) + "\n" + string(SingleItemDataStr) + "\n"
}
result, err := utils.FastHttpRequest(TargetUrl, []byte(RequestBody), "POST")
if err == nil {
fmt.Printf("<----------------------------第%v组数据插入成功,已完成%v条数据插入---------------------------->", i+1, (i+1)*500)
}
}
}
截取一部分批量插入的结果看看,可以看出全是成功的操作,通过kibana面板查看,数据确实插入成功了。
map[errors:false items:[map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2779 _shards:map[failed:0 successful:1 total:3] _version:1 result:crea
ted status:201]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2780 _shards:map[failed:0 successful:1 total:3] _version:2 resul
t:updated status:200]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2781 _shards:map[failed:0 successful:1 total:3] _version:3
result:updated status:200]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2782 _shards:map[failed:0 successful:1 total:3] _ver
sion:4 result:updated status:200]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2783 _shards:map[failed:0 successful:1 total:3
] _version:5 result:updated status:200]] .............
以上就是es8批量bulk操作的一次问题解决,如果哪位小伙伴有更好的解决方案,可以评论区一起讨论。
更多推荐
所有评论(0)