FusionInsight平台——Spark连接MPPDB(LibrA、GuassDB)

前言

  • 目前国内大数据行业盛行,华为推出了自己的大数据发行版FusionInsight,属于国内领先的发行版。FusionInsight产品其中就包括了Hadoop、Hive、Spark、HBase、MPPDB等组件。对于多数大数据从业人员来说,对大数据主流产品都有一定了解,但这款MPPDB相对来说较为陌生,笔者也在这上面遇到了坑,特此分享。

  • MPPDB:简单来说,它一款分布式的关系型数据库,基于Postgres-XL框架,支持JDBC、ODBC

  • MPPDB已更名为LibrA(GuassDB)


遇到的问题

  • 在FusionInsight平台,Spark使用JDBC连接MPPDB失败
  • 报错内容大致为“Invalid username/password;logon denied”
  • 注意:在C80(不包含)以后的新版本已无该问题(但是不同的部署版本,还会存在jdbc驱动包的选择的问题,有问题请私信)

问题分析

  1. 起初一看,还以为是"用户名/密码"错了。笔者在同一环境下使用该"用户名/密码"尝试直接登录数据库、尝试使用纯Java(或Scala)方式用JDBC连接数据库、尝试使用第三方软件工具登录数据库,均无任何问题,都可以正常使用。看来问题出在了Spark上。
  2. 经询问华为相关工程师得知,FusionInsight平台还有一个DBService组件。而该组件要用到的JDBC包的Driver的包名(org.posetgresql.Driver)和MPPDB的JDBC包一致,但实际两个JDBC包是有区别的。
  3. 同时,为了便于访问DBService组件,Spark的lib下也提供了该JDBC的jar包,导致jar包冲突。故可知,用户在使用spark-submit提交任务时添加的jar包与Spark lib下的jar包发生了冲突,而Spark优先使用了lib下的JDBC包。

问题解决

  • 较差方案:删除集群中所有Spark节点下lib路径中的DBService的JDBC包,再由用户提交任务时指定需要用到的jar包。(该方案能够解决问题,但是会导致其他不知情的用户访问DBservice组件时出现问题,同时节点太多删除也很麻烦)

  • 良好方案:修改客户端spark/conf/spark-default.conf文件,或者在代码中的conf指定

    1. 修改spark.executor.userClassPathFirst = true
    2. 修改spark.driver.userClassPathFirst = true
    3. 最后在提交应用时指定jar包。(示例: spark-submit --jars gsjdbc.jar,jars前面是两个减号)
    4. 这样即可让jar包冲突时,优先使用用户提交的jar包
  • 未知方案:修改配置的lib加载的路径即可(让该路径不包括冲突的包)。例如YARN模式下,修改spark.yarn.lib指定的路径。

  • 相关源码(下面对应的是Driver的,Executor同理)

    // SparkSubmit中的方法
    private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
      // 省略部分代码
    
       // 决定使用的ClassLoader,由参数spark.driver.userClassPathFirst决定,默认为false
      val loader =
        if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
          // 该ClassLoader会优先使用用户提供的jar包
          new ChildFirstURLClassLoader(new Array[URL](0),
            Thread.currentThread.getContextClassLoader)
        } else {
            // 默认的ClassLoader
            new MutableURLClassLoader(new Array[URL](0),
            Thread.currentThread.getContextClassLoader)
        }
      Thread.currentThread.setContextClassLoader(loader)
    
      for (jar <- childClasspath) {
        addJarToClasspath(jar, loader)
      }
    
      // 省略部分代码
    
    }
    

Spark从数据库"读/写"的代码示例

注1:如果数据量很大, 速度较慢,推荐使用MPPDB的导入工具
注2:部分版本需要选择合适的jdbc驱动包(华为提供了2个jdbc驱动包,Driver全路径、填写的url都不同,有问题请私信)

  • Spark从MPPDB读取数据(其他关系型数据库类似)
import java.util.Properties

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark从关系型数据库读取数据
  *
  * @author ALion
  * @version 2018/1/7 18:06
  */
object ReadDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ReadDemo")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc) // 或者SQLContext

    // 写好JDBC连接相关信息
    val url = "jdbc:postgresql://192.168.1.100/testDB"
    val table = "tb_test"
    val props = new Properties()
    props.setProperty("user", "jerry")
    props.setProperty("password", "123456")
    props.setProperty("driver", "org.postgresql.Driver")

    // 开始读取数据,并显示
    sqlContext.read.jdbc(url, table, props).show()

    sc.stop()
  }

}
  • Spark向MPPDB库写入数据(其他关系型数据库类似)
import java.util.Properties

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark向关系型数据库写入数据
  *
  * @author ALion
  * @version 2018/1/7 18:23
  */
object WriteDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WriteDemo ")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc) // 或者SQLContext

    // 首先,需要拿到一个DataFrame(可以自己创建、可以从其他RDD转换而来、可以从Hive库或关系型数据库中读取等)
    // 在这里,自己创建了一个RDD,并转换为DataFrame
    val personList = List(("xiaoming", 18, "shanghai"),
                          ("laowang", 30, "beijing"),
                          ("lilei", 22, "chongqing"))
    val personRDD = sc.parallelize(personList)
    case class Person(name: String, age: Int, addr: String) // 也可以写schema: StructType
    val personDF = sqlContext.createDataFrame(personRDD, Person.getClass)

    // 然后,写好JDBC连接相关信息
    val url = "jdbc:postgresql://192.168.1.100/testDB"
    val table = "tb_person"
    val props = new Properties()
    props.setProperty("user", "jerry")
    props.setProperty("password", "123456")
    props.setProperty("driver", "org.postgresql.Driver")

    // 最后,开始写入数据
   personDF.write.mode(SaveMode.Append).jdbc(url, table, props)
    
    sc.stop()
  }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐