本节主要对于100M以上的csv大数据文件高效导入到es中提供一种解决方案.

fastcsv

FastCSV是基于Java语言的超快速和简单的CSV库,RFC 4180兼容,项目许可为Apache2.0版.
该项目的性能与opencsv及 Super CSV等项目基准测试相比,有如下参考(来自FastCSV项目的自主测试,本人并未验证):
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wfzT8Z3C-1615126373613)(http://res.limuqiao.com/images?imageName=tech.Tech-bigdata-elasticsearch-dscsv-benchmark.png)]
使用该项目请在maven项目中使用以下依赖

<dependency>
	<groupId>de.siegmar</groupId>
	<artifactId>fastcsv</artifactId>
	<version>1.0.3</version>
</dependency>

elasticsearch客户端

将数据传送给elasticsearch,其操作也需要es的java客户端的依赖包

<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-high-level-client</artifactId>
	<version>6.6.2</version>
</dependency>

关键代码

  1. CSV文件的读取
File file = new File("foo.csv");
CsvReader csvReader = new CsvReader();
csvReader.setContainsHeader(true);

CsvContainer csv = csvReader.read(file, StandardCharsets.UTF_8);
for (CsvRow row : csv.getRows()) {
    System.out.println("First column of line: " + row.getField("name"));
}
  1. es的bulkProcessor处理
public class ESProcessor {

	private RestHighLevelClient client;
	private BulkProcessor bulkProcessor;

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ESProcessor esp = new ESProcessor();
		esp.queryTest();

	}

	public ESProcessor() {
		client = new RestHighLevelClient(RestClient.builder(new HttpHost(
				"127.0.0.1", 9200, "http")));// 初始化
	}

	/**
	 * 创建索引
	 * 
	 * @param indexName
	 * @throws IOException
	 */
	private static void createIndex(String indexName) throws IOException {
		RestHighLevelClient client = new RestHighLevelClient(
				RestClient.builder(new HttpHost("es01", 9200, "http")));
		// ES 索引默认需要小写,故笔者将其转为小写
		CreateIndexRequest requestIndex = new CreateIndexRequest(
				indexName.toLowerCase());
		// 注: 设置副本数为0,索引刷新时间为-1对大批量索引数据效率的提升有不小的帮助
		requestIndex.settings(Settings.builder()
				.put("index.number_of_shards", 5)
				.put("index.number_of_replicas", 0)
				.put("index.refresh_interval", "-1"));

		// CreateIndexResponse createIndexResponse =
		// client.indices().create(requestIndex, RequestOptions.DEFAULT);
		client.close();
	}

	/**
	 * 创建bulkProcessor并初始化
	 * 
	 * @param client
	 * @return
	 */
	public BulkProcessor createProcessor() {
		try {

			BulkProcessor.Listener listener = new BulkProcessor.Listener() {
				@Override
				public void beforeBulk(long executionId, BulkRequest request) {
					System.out.println("Try to insert data number : "
							+ request.numberOfActions());
				}

				@Override
				public void afterBulk(long executionId, BulkRequest request,
						BulkResponse response) {
					System.out
							.println("************** Success insert data number : "
									+ request.numberOfActions()
									+ " , id: "
									+ executionId);
				}

				@Override
				public void afterBulk(long executionId, BulkRequest request,
						Throwable failure) {
					System.out.println("Bulk is unsuccess : " + failure
							+ ", executionId: " + executionId);
				}
			};

			BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (
					request, bulkListener) -> client.bulkAsync(request,
					RequestOptions.DEFAULT, bulkListener);

			// bulkProcessor = BulkProcessor.builder(bulkConsumer,
			// listener).build();


			BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
					listener);
			builder.setBulkActions(5000);   //每添加10000个request,执行一次bulk操作
			//builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); //每达到5M的请求size时,执行一次bulk操作
			builder.setConcurrentRequests(10); //默认是1,表示积累bulk requests和发送bulk是异步的,其数值表示发送bulk的并发线程数,设置为0表示二者同步的
			builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(
					TimeValue.timeValueSeconds(1L), 3));  //当ES由于资源不足发生异常EsRejectedExecutionException重試策略:默认(50ms, 8),
		
			bulkProcessor = builder.build();

		} catch (Exception e) {
			e.printStackTrace();
		}

		return bulkProcessor;
	}

	public void closeProcessor() {
		try {
			bulkProcessor.awaitClose(70L, TimeUnit.SECONDS);
			client.close();
		} catch (Exception e1) {
			e1.printStackTrace();
		}
	}
}

100M以上大数据文件读取缓慢的解决方式

碰到缓慢的情况,先观察一下内存和CPU的使用情况,若内存使用不再升高,而CPU一直处于90%以上的使用率,请试着调节Java应用启动的内存分配:

java -Xms128m -Xmx3g ReaderApp

将初始化堆内存为 128M,最大堆内存为 3G,这样Java在每次gc后会重新分配大小,将Xmx规定的上限推高即可.

注:如果更大文件则可尝试用FastCSV的流式读取法

声明:文章由作者本人的亲历及经验总结,属个人原创文章,欢迎转载并注明出处,个博原文的链接地址见:
https://tech.limuqiao.com/archives/12.html

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐