大数据-elasticsearch-csv文件的高效导入
本节主要对于100M以上的csv大数据文件高效导入到es中提供一种解决方案.fastcsvFastCSV是基于Java语言的超快速和简单的CSV库,RFC 4180兼容,项目许可为Apache2.0版.该项目的性能与opencsv及 Super CSV等项目基准测试相比,有如下参考(来自FastCSV项目的自主测试,本人并未验证):[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上
本节主要对于100M以上的csv大数据文件高效导入到es中提供一种解决方案.
fastcsv
FastCSV是基于Java语言的超快速和简单的CSV库,RFC 4180兼容,项目许可为Apache2.0版.
该项目的性能与opencsv及 Super CSV等项目基准测试相比,有如下参考(来自FastCSV项目的自主测试,本人并未验证):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wfzT8Z3C-1615126373613)(http://res.limuqiao.com/images?imageName=tech.Tech-bigdata-elasticsearch-dscsv-benchmark.png)]
使用该项目请在maven项目中使用以下依赖
<dependency>
<groupId>de.siegmar</groupId>
<artifactId>fastcsv</artifactId>
<version>1.0.3</version>
</dependency>
elasticsearch客户端
将数据传送给elasticsearch,其操作也需要es的java客户端的依赖包
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.6.2</version>
</dependency>
关键代码
- CSV文件的读取
File file = new File("foo.csv");
CsvReader csvReader = new CsvReader();
csvReader.setContainsHeader(true);
CsvContainer csv = csvReader.read(file, StandardCharsets.UTF_8);
for (CsvRow row : csv.getRows()) {
System.out.println("First column of line: " + row.getField("name"));
}
- es的bulkProcessor处理
public class ESProcessor {
private RestHighLevelClient client;
private BulkProcessor bulkProcessor;
/**
* @param args
*/
public static void main(String[] args) {
ESProcessor esp = new ESProcessor();
esp.queryTest();
}
public ESProcessor() {
client = new RestHighLevelClient(RestClient.builder(new HttpHost(
"127.0.0.1", 9200, "http")));// 初始化
}
/**
* 创建索引
*
* @param indexName
* @throws IOException
*/
private static void createIndex(String indexName) throws IOException {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("es01", 9200, "http")));
// ES 索引默认需要小写,故笔者将其转为小写
CreateIndexRequest requestIndex = new CreateIndexRequest(
indexName.toLowerCase());
// 注: 设置副本数为0,索引刷新时间为-1对大批量索引数据效率的提升有不小的帮助
requestIndex.settings(Settings.builder()
.put("index.number_of_shards", 5)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "-1"));
// CreateIndexResponse createIndexResponse =
// client.indices().create(requestIndex, RequestOptions.DEFAULT);
client.close();
}
/**
* 创建bulkProcessor并初始化
*
* @param client
* @return
*/
public BulkProcessor createProcessor() {
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("Try to insert data number : "
+ request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
System.out
.println("************** Success insert data number : "
+ request.numberOfActions()
+ " , id: "
+ executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
System.out.println("Bulk is unsuccess : " + failure
+ ", executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (
request, bulkListener) -> client.bulkAsync(request,
RequestOptions.DEFAULT, bulkListener);
// bulkProcessor = BulkProcessor.builder(bulkConsumer,
// listener).build();
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
listener);
builder.setBulkActions(5000); //每添加10000个request,执行一次bulk操作
//builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); //每达到5M的请求size时,执行一次bulk操作
builder.setConcurrentRequests(10); //默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(
TimeValue.timeValueSeconds(1L), 3)); //当ES由于资源不足发生异常EsRejectedExecutionException重試策略:默认(50ms, 8),
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
}
return bulkProcessor;
}
public void closeProcessor() {
try {
bulkProcessor.awaitClose(70L, TimeUnit.SECONDS);
client.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
100M以上大数据文件读取缓慢的解决方式
碰到缓慢的情况,先观察一下内存和CPU的使用情况,若内存使用不再升高,而CPU一直处于90%以上的使用率,请试着调节Java应用启动的内存分配:
java -Xms128m -Xmx3g ReaderApp
将初始化堆内存为 128M,最大堆内存为 3G,这样Java在每次gc后会重新分配大小,将Xmx规定的上限推高即可.
注:如果更大文件则可尝试用FastCSV的流式读取法
声明:文章由作者本人的亲历及经验总结,属个人原创文章,欢迎转载并注明出处,个博原文的链接地址见:
https://tech.limuqiao.com/archives/12.html
更多推荐