用Spark将大量HDFS数据批量写入Redis

需求

  1. 批量读入HDFS离线数据
  2. 将数据按照partition分区分批写入redis中
  3. redis集群是哨兵模式。使用pipelined方法

代码

  • 注意下面save2RedisBatchBad的写法不可以,获取redis链接时候不能分为两段,否则一个partition执行完毕会调用destroy导致同一个executor里的连接池JedisSentinelPool不可用,其他的partition会报错。例如,文件在hdfs中的一台机器上,repartition(2)以后,运行还是在同一个executor里,实际初始化的redisSentinelPool只有一个实例,当第一个partition执行完成后,调用了redisSentinelPool.destroy(),那么另一个partition的getResource会报错Caused by: java.lang.IllegalStateException: Pool not open,详细信息见后面*
import java.util.Properties
import java.util
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.rdd.RDD
import redis.clients.jedis.{HostAndPort, Jedis, JedisSentinelPool}

object RedisUtils extends Serializable {
  @volatile private var redisSentinel: JedisSentinelPool = _
  def getRedisInstanceSentinel(props: Properties): JedisSentinelPool = {
    if (redisSentinel == null) {
      RedisUtils.synchronized {
        if (redisSentinel == null) {
          val gPoolConfig = new GenericObjectPoolConfig
          gPoolConfig.setMaxTotal(props.getProperty("redis.maxTotal").toInt)
          gPoolConfig.setMaxIdle(props.getProperty("redis.maxIdle").toInt)
          gPoolConfig.setMaxWaitMillis(props.getProperty("redis.maxWaitMillis").toInt)
          gPoolConfig.setJmxEnabled(true)
          gPoolConfig.setTestWhileIdle(true)
          gPoolConfig.setTestOnBorrow(true)
          gPoolConfig.setTestOnReturn(true)
          gPoolConfig.setTimeBetweenEvictionRunsMillis(30000)

          val sentinels = new util.LinkedHashSet[String]
          val hosts = props.getProperty("redis.host").split(",")
          for (host <- hosts) {
            val hostAndPort = host.split(":")
            sentinels.add(new HostAndPort(hostAndPort(0), hostAndPort(1).toInt).toString)
          }
          redisSentinel = new JedisSentinelPool(props.getProperty("redis.masterName"), sentinels, gPoolConfig, props.getProperty("redis.password"))
          //释放资源 在executor的JVM关闭之前,千万不要忘记
          val hook = new Thread {
            override def run(): Unit = redisSentinel.destroy()
          }
          sys.addShutdownHook {
            hook.run()
          }
        }
      }
    }
    redisSentinel
  }

  def getRedisInstance(props: Properties): Jedis = {
    getRedisInstanceSentinel(props: Properties).getResource
  }
//使用的批量写入代码
  def save2RedisBatch(props: Properties, dbName: String, tableName: String, rdd: RDD[(String, String)]): Unit = {
    val redisKeyPre: String = dbName + ":" + tableName + ":"
    rdd.foreachPartition(partition => {
      val redisPipeLine = RedisUtils.getRedisInstance(props).pipelined()
      partition.foreach(row => {
        redisPipeLine.setex(redisKeyPre + row._1, 60 * 60 * 24 * 1, row._2)
      })
      redisPipeLine.sync()
      redisPipeLine.close()
    })
  }

  // 注意下面这种写法不可以,获取redis链接时候不能分为两段,否则一个partition执行完毕会调用destroy导致同一个executor里的链接不可用,
  // 其他的partition会报错。例如,文件在hdfs中的一台机器上,repartition(2)以后,实际初始化的redisSentinelPool只有一个实例,
  // 当第一个partition执行完成后,调用了redisSentinelPool.destroy(),那么另一个partition的getResource会报错
  //Caused by: java.lang.IllegalStateException: Pool not open,详细信息见后面
  private def save2RedisBatchBad(props: Properties, dbName: String, tableName: String, rdd: RDD[(String, String)]): Unit = {
    val redisKeyPre: String = dbName + ":" + tableName + ":"
    rdd.foreachPartition(partition => {
      val redisSentinelPool = RedisUtils.getRedisInstanceSentinel(props)
      val redisPipeLine = redisSentinelPool.getResource.pipelined()
      partition.foreach(row => {
        redisPipeLine.setex(redisKeyPre + row._1, 60 * 60 * 24 * 1, row._2)
      })
      redisPipeLine.sync()
      redisPipeLine.close()
      redisSentinelPool.destroy()
    })
  }
}

报错信息

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 12.0 failed 4 times, most recent failure: Lost task 1.3 in stage 12.0 (TID 61, slave24.ai.cloud.test.we, executor 2): redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisSentinelPool.getResource(JedisSentinelPool.java:214)
at cn.test.client.RedisUtils$$anonfun$save2RedisBatch$1.apply(RedisUtils.scala:87)
at cn.test.client.RedisUtils$$anonfun$save2RedisBatch$1.apply(RedisUtils.scala:85)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Pool not open
at org.apache.commons.pool2.impl.BaseGenericObjectPool.assertOpen(BaseGenericObjectPool.java:704)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:410)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:361)
at redis.clients.util.Pool.getResource(Pool.java:49)
... 15 more

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐