kettle如何3秒内写入100万条数据到Redis
kettle如何3秒内写入100万条数据到Redis实现结果先来看下实现结果,如下图,本地写入100万数据,耗时2.3s,每秒44万。接下来说说如何实现:数据存储结构样例:生成记录用于生成测试数据:增加序列用于生成redis的key值Json输出用于将原始数据封装为一个json,存储到redis中:json输出:字段页签,用于说明json中包含的字段信息:Java 写入redis缓存主要使用到了P
·
kettle如何3秒内写入100万条数据到Redis
1.实现结果
先来看下实现结果,如下图,本地写入100万数据,耗时2.3s,每秒44万。接下来说说如何实现:
数据存储结构样例:
2.添加redis包放到kettle的lib目录
jedis-3.1.0.jar
3.生成记录步骤
用于生成测试数据:
4.增加序列步骤
用于生成redis的key值
5.Json输出
用于将原始数据封装为一个json,存储到redis中:
json输出:字段页签,用于说明json中包含的字段信息:
6.Java 写入redis缓存
主要使用到了Pipeline类,实现批量提交:
详细代码如下:
// etl-java-redis
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
// connect to redis server
String redis_ip = getVariable("redis.ip", "127.0.0.1");
String redis_port = getVariable("redis.port", "6379");
String redis_password = getVariable("redis.password", "");
cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));
logBasic(redis_ip+":"+redis_port);
logBasic("redis_password:"+redis_password);
// 连接池方式
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(18);
pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);
jedis = pool.getResource();
jedis.select(1);// 切换数据库
pipe = jedis.pipelined(); // 创建pipeline 对象
logBasic("Server is running: " + jedis.ping());
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
pipe.sync();
jedis.close();
pool.close();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
r = createOutputRow(r, data.outputRowMeta.size());
/*
Redis数据存储(Redis-String)
key : KEY
value : JsonData
*/
String key = get(Fields.In, "id").getString(r);
String value = get(Fields.In, "JsonData").getString(r);
logDebug(key + "\t" + value);
// 写入缓存
pipe.set(key, value);
cur_size++;
if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交
pipe.sync(); // 同步
cur_size=0; // 复位
}
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
}
7.命名参数
将可变参数存储到命名参数中,方便迁移:
– 本文结束 –
更多推荐
已为社区贡献2条内容
所有评论(0)