最近在用go做项目的时候,需要批量插入数据到es8进行测试。传统的单条数据插入太慢,效率太低,再看es8官网发现,es8其实是支持批量bulk操作的。详情请见es官网bulk操作

于是用go简单的实现了一下代码

func TestEsBulk(t *testing.T) {
	AuctionTime := time.Now().UnixNano() / 1e6
	BidTime := AuctionTime + 1800
	TargetUrl := "http//192.168.56.103:9200/home_item_emb/_bulk"
	for i := 0; i < 20; i++ {
		RequestBodyArray := make([]interface{}, 0, 1000)
		for j := 0; j < 500; j++ {
			SingleItemData, SingleItemIndexData := GenerateBulkData(AuctionTime, BidTime)
			RequestBodyArray = append(RequestBodyArray, SingleItemIndexData)
			RequestBodyArray = append(RequestBodyArray, SingleItemData)
		}
		BulkData := BulkData{
			Data: RequestBodyArray,
		}
		RequestBody, _ := json.Marshal(BulkData)
		result, err := utils.FastHttpRequest(TargetUrl, RequestBody, "POST")  //
		if err == nil {
			fmt.Printf("<----------------------------第%v组数据插入成功,已完成%v条数据插入---------------------------->", i+1, (i+1)*500)
		}
	}
}

从这段代码中可以知道我使用的普通的json数据格式,所以接下来es bulk操作响应返回就报错了:

 map[error:map[reason:The bulk request must be terminated by a newline [\n] root_cause:[map[reason:The bulk request must be terminated by a newline [\n] type:illegal_argument_exception]] type:illegal_argument_exception] status:400]

大概的意思就是,bulk请求的时候,必须以换行符(\n)结束。这可让人一头雾水,然后我仔细看es8官网的bulk的相关文档,其中有一行引起了我的注意:

The actions are specified in the request body using a newline delimited JSON (NDJSON) structure

大概的意思就是,请求体是一种叫NDJSON的结构,然后搜索了一下什么是NDJSON。

  1. 行分隔符是 '\n'
    这意味着 '\r\n' 也受支持,因为在解析 JSON 值时会忽略尾随空格。

  2. 每一行都是一个有效的JSON值
    最常见的值是对象或数组,但允许使用任何 JSON 值。

    NDJSON官网

 看到这我相信大家也就清楚了,于是我想了一个办法就是,将批量操作的每一条数据先转成字符串,并在后面加上 \n 换行符,将最后所有的拼接成一个大字符串,因为go Fasthttp库请求体是[]byte 类型 ,最后将整个字符串转成 []byte  就可以啦 。以下新改的批量插入代码。

func TestEsBulk(t *testing.T) {
	AuctionTime := time.Now().UnixNano() / 1e6
	BidTime := AuctionTime + 1800
	TargetUrl := "http//192.168.56.103:9200/home_item_emb/_bulk"
	for i := 0; i < 10; i++ {
		RequestBody := ""
		for j := 0; j < 500; j++ {
			SingleItemData, SingleItemIndexData := GenerateBulkData(AuctionTime, BidTime)
			SingleItemIndexDataStr, _ := json.Marshal(SingleItemIndexData)
			SingleItemDataStr, _ := json.Marshal(SingleItemData)
			RequestBody = RequestBody + string(SingleItemIndexDataStr) + "\n" + string(SingleItemDataStr) + "\n"
		}
		result, err := utils.FastHttpRequest(TargetUrl, []byte(RequestBody), "POST")
		if err == nil {
			fmt.Printf("<----------------------------第%v组数据插入成功,已完成%v条数据插入---------------------------->", i+1, (i+1)*500)
		}
	}
}

截取一部分批量插入的结果看看,可以看出全是成功的操作,通过kibana面板查看,数据确实插入成功了。

map[errors:false items:[map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2779 _shards:map[failed:0 successful:1 total:3] _version:1 result:crea
ted status:201]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2780 _shards:map[failed:0 successful:1 total:3] _version:2 resul
t:updated status:200]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2781 _shards:map[failed:0 successful:1 total:3] _version:3
 result:updated status:200]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2782 _shards:map[failed:0 successful:1 total:3] _ver
sion:4 result:updated status:200]] map[index:map[_id:521080087 _index:home_item_emb _primary_term:2 _seq_no:2783 _shards:map[failed:0 successful:1 total:3
] _version:5 result:updated status:200]] .............

 以上就是es8批量bulk操作的一次问题解决,如果哪位小伙伴有更好的解决方案,可以评论区一起讨论。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐