Redis踩坑系列(一)Spark Redis连接池报错Pool not open解决
用Spark将大量HDFS数据批量写入Redis需求批量读入HDFS离线数据将数据按照partition分区写入redis中redis集群是哨兵模式。使用pipelined方法代码import java.util.Propertiesimport java.utilimport org.apache.commons.pool2.impl.GenericObjectPoolConfigimport
·
用Spark将大量HDFS数据批量写入Redis
需求
- 批量读入HDFS离线数据
- 将数据按照partition分区分批写入redis中
- redis集群是哨兵模式。使用pipelined方法
代码
- 注意下面save2RedisBatchBad的写法不可以,获取redis链接时候不能分为两段,否则一个partition执行完毕会调用destroy导致同一个executor里的连接池JedisSentinelPool不可用,其他的partition会报错。例如,文件在hdfs中的一台机器上,repartition(2)以后,运行还是在同一个executor里,实际初始化的redisSentinelPool只有一个实例,当第一个partition执行完成后,调用了redisSentinelPool.destroy(),那么另一个partition的getResource会报错Caused by: java.lang.IllegalStateException: Pool not open,详细信息见后面*
import java.util.Properties
import java.util
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.rdd.RDD
import redis.clients.jedis.{HostAndPort, Jedis, JedisSentinelPool}
object RedisUtils extends Serializable {
@volatile private var redisSentinel: JedisSentinelPool = _
def getRedisInstanceSentinel(props: Properties): JedisSentinelPool = {
if (redisSentinel == null) {
RedisUtils.synchronized {
if (redisSentinel == null) {
val gPoolConfig = new GenericObjectPoolConfig
gPoolConfig.setMaxTotal(props.getProperty("redis.maxTotal").toInt)
gPoolConfig.setMaxIdle(props.getProperty("redis.maxIdle").toInt)
gPoolConfig.setMaxWaitMillis(props.getProperty("redis.maxWaitMillis").toInt)
gPoolConfig.setJmxEnabled(true)
gPoolConfig.setTestWhileIdle(true)
gPoolConfig.setTestOnBorrow(true)
gPoolConfig.setTestOnReturn(true)
gPoolConfig.setTimeBetweenEvictionRunsMillis(30000)
val sentinels = new util.LinkedHashSet[String]
val hosts = props.getProperty("redis.host").split(",")
for (host <- hosts) {
val hostAndPort = host.split(":")
sentinels.add(new HostAndPort(hostAndPort(0), hostAndPort(1).toInt).toString)
}
redisSentinel = new JedisSentinelPool(props.getProperty("redis.masterName"), sentinels, gPoolConfig, props.getProperty("redis.password"))
//释放资源 在executor的JVM关闭之前,千万不要忘记
val hook = new Thread {
override def run(): Unit = redisSentinel.destroy()
}
sys.addShutdownHook {
hook.run()
}
}
}
}
redisSentinel
}
def getRedisInstance(props: Properties): Jedis = {
getRedisInstanceSentinel(props: Properties).getResource
}
//使用的批量写入代码
def save2RedisBatch(props: Properties, dbName: String, tableName: String, rdd: RDD[(String, String)]): Unit = {
val redisKeyPre: String = dbName + ":" + tableName + ":"
rdd.foreachPartition(partition => {
val redisPipeLine = RedisUtils.getRedisInstance(props).pipelined()
partition.foreach(row => {
redisPipeLine.setex(redisKeyPre + row._1, 60 * 60 * 24 * 1, row._2)
})
redisPipeLine.sync()
redisPipeLine.close()
})
}
// 注意下面这种写法不可以,获取redis链接时候不能分为两段,否则一个partition执行完毕会调用destroy导致同一个executor里的链接不可用,
// 其他的partition会报错。例如,文件在hdfs中的一台机器上,repartition(2)以后,实际初始化的redisSentinelPool只有一个实例,
// 当第一个partition执行完成后,调用了redisSentinelPool.destroy(),那么另一个partition的getResource会报错
//Caused by: java.lang.IllegalStateException: Pool not open,详细信息见后面
private def save2RedisBatchBad(props: Properties, dbName: String, tableName: String, rdd: RDD[(String, String)]): Unit = {
val redisKeyPre: String = dbName + ":" + tableName + ":"
rdd.foreachPartition(partition => {
val redisSentinelPool = RedisUtils.getRedisInstanceSentinel(props)
val redisPipeLine = redisSentinelPool.getResource.pipelined()
partition.foreach(row => {
redisPipeLine.setex(redisKeyPre + row._1, 60 * 60 * 24 * 1, row._2)
})
redisPipeLine.sync()
redisPipeLine.close()
redisSentinelPool.destroy()
})
}
}
报错信息
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 12.0 failed 4 times, most recent failure: Lost task 1.3 in stage 12.0 (TID 61, slave24.ai.cloud.test.we, executor 2): redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisSentinelPool.getResource(JedisSentinelPool.java:214)
at cn.test.client.RedisUtils$$anonfun$save2RedisBatch$1.apply(RedisUtils.scala:87)
at cn.test.client.RedisUtils$$anonfun$save2RedisBatch$1.apply(RedisUtils.scala:85)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Pool not open
at org.apache.commons.pool2.impl.BaseGenericObjectPool.assertOpen(BaseGenericObjectPool.java:704)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:410)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:361)
at redis.clients.util.Pool.getResource(Pool.java:49)
... 15 more
更多推荐
已为社区贡献1条内容
所有评论(0)