kettle如何3秒内写入100万条数据到Redis

1.实现结果

先来看下实现结果,如下图,本地写入100万数据,耗时2.3s,每秒44万。接下来说说如何实现:
在这里插入图片描述
数据存储结构样例:
在这里插入图片描述

2.添加redis包放到kettle的lib目录

jedis-3.1.0.jar

3.生成记录步骤

用于生成测试数据:
在这里插入图片描述

4.增加序列步骤

用于生成redis的key值
在这里插入图片描述

5.Json输出

用于将原始数据封装为一个json,存储到redis中:
在这里插入图片描述
json输出:字段页签,用于说明json中包含的字段信息:
在这里插入图片描述

6.Java 写入redis缓存

主要使用到了Pipeline类,实现批量提交:
在这里插入图片描述
详细代码如下:

// etl-java-redis
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;

private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
	if (first) {
		first = false;
		// connect to redis server
		String redis_ip = getVariable("redis.ip", "127.0.0.1");
		String redis_port = getVariable("redis.port", "6379");
		String redis_password = getVariable("redis.password", "");
		cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));
		logBasic(redis_ip+":"+redis_port);
		logBasic("redis_password:"+redis_password);

		// 连接池方式
		JedisPoolConfig config = new JedisPoolConfig();
		config.setMaxIdle(8);
		config.setMaxTotal(18);
		pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);
		jedis = pool.getResource();		
		jedis.select(1);// 切换数据库
		pipe = jedis.pipelined(); // 创建pipeline 对象
		
		logBasic("Server is running: " + jedis.ping());
	}

	Object[] r = getRow();
	if (r == null) {
		setOutputDone();
		
		pipe.sync();
		jedis.close();
		pool.close();
		return false;
	}

	// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
	// enough to handle any new fields you are creating in this step.
	r = createOutputRow(r, data.outputRowMeta.size());

	/*
	Redis数据存储(Redis-String)
	key : KEY
	value : JsonData
	*/
	String key = get(Fields.In, "id").getString(r);
	String value = get(Fields.In, "JsonData").getString(r);
	logDebug(key + "\t" + value);

	// 写入缓存
	pipe.set(key, value);
	cur_size++;
	if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交
		pipe.sync(); // 同步
		cur_size=0; // 复位
	}
			
	// Send the row on to the next step.
	putRow(data.outputRowMeta, r);

	return true;
}

7.命名参数

将可变参数存储到命名参数中,方便迁移:
在这里插入图片描述

– 本文结束 –

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐