一、重试次数和超时

"es.http.timeout" -> "5m"
"es.http.retries" -> "50"


这两个参数是控制http接口层面的超时及重试,覆盖读请求和写请求,默认值比较小,默认超时时间为1分钟,重试次数为3,建议调整为超时时间5分钟,重试次数50次。

二、Spark节点访问ES集群权限配置

"es.nodes.wan.only" -> "true"
"es.nodes.discovery" -> "false"


这两个参数是控制Spark节点访问ES集群的node,可能会报权限错误。

默认值第一个是false,第二个是true,采用默认配置时,Spark会通过访问es.nodes中指定的host(可以为多个) 得到ES集群所有开启HTTP服务节点的ip和port,后续对数据的访问会直接访问分片数据所在的节点上,这样的话需要保证ES集群所有节点都能够被Spark集群访问到。

采用以上实际配置时,Spark发送给ES的所有请求,都需要通过这个Spark程序的Executor节点进行转发,效率相对比低一些,但是实测数据写入耗时也并没有怎么增加,完美解决了Spark程序往Es集群写入数据时,探活机制导致的权限异常的错误。

三、指定ID

"es.mapping.id" -> "id"


指定ES中每条文档的唯一标识id,即_id用Json中哪个key的值来赋值,finalJson.put("id", singleId)

四、refresh

"es.batch.write.refresh" -> "false"


ES是一个准实时的搜索引擎,意味着当写入数据之后,只有当触发refresh操作后,写入的数据才能被搜索到。

这里的参数是控制,是否每次bulk操作后都进行refresh。 每次refresh后,ES会将当前内存中的数据生成一个新的segment。如果refresh速度过快,会产生大量的小segment,大量segment在进行合并时,会消耗磁盘的IO。

默认值为开启,如果写时查询要求没那么高,建议设置为false。

在索引的settings中通过refresh_interval配置项进行控制,可以根据业务的需求设置为30s或更长。

五、设置写入速度

"es.batch.size.bytes" -> "10mb"
"es.batch.size.entries" -> "1000"

二选一


这两个参数可以控制单次批量写入的数据量大小和条数,数据积累量先达到哪个参数设置,都会触发一次批量写入。

增大单次批量写入的数据,可以提高写入ES的整体吞吐。

因为ES的写入一般是顺序写入,在一次批量写入中,很多数据的写入处理逻辑可以合并,大量的IO操作也可以合并。

默认值设置的比较小,可以适当根据集群的规模调大这两个值,建议为20MB和2w条。

当每条数据比较均匀的时候,用es.batch.size.entries限制批量写入条数比较合适,但是当每条数据不均匀时,建议用es.batch.size.bytes限制每批次的写入数据量比较合适。

当然,bulk size不能无限的增大,会造成写入任务的积压。

六、写入类型

"es.write.operation" -> writeType


写入类型,index、upsert或者upgrade,index是相同_id的文档会直接覆盖,upser是相同_id的文档会在后面追加

七、控制单次批量写入

"es.batch.write.retry.count" -> "30"
"es.batch.write.retry.wait" -> "120s"


这两个参数会控制单次批量写入请求的重试次数,以及重试间隔。

当超过重试次数后,Yarn任务管理会将该任务标记为failed,造成整个写数据任务的失败。默认值为3,为了防止集群偶发的网络抖动或压力过大造成的集群短暂熔断,建议将这个值调大,设置为50。

八、Index是否自动创建

"es.index.auto.create" -> "false"
这个参数是在数据写入ES前会判断index是否存在,如果不存在则会自动创建。

需要关闭此功能,自定义创建索引,设置为false

九、设置es的节点列表和端口号

"es.nodes" -> ipAndPort.split(":", 2)(0)
"es.port" -> ipAndPort.split(":", 2)(1))

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐