1.写入hive的指定分区中写入数据

ResulToOra2.createTempView("tmp")
    spark.sql(
      s"""
         |insert overwrite table pdr.T_PSSC_TRAN_PORTRAIT_CENTER partition(dt='${sdfDay.format(date)}') select * from  tmp
         |
                   """.stripMargin)

    println("write hive end")

2.写入HDFS

          ResulToES
          .coalesce(1)
          .write.format("csv")
          .mode("overwrite")
          .option("header", "true")
          .option("encoding", "utf-8")
          .save(s"hdfs://master01:8020/tmp/tran_es_${sdfMonth.format(date)}")
          
          println("write hdfs end")
2.1 这里coalesce(1)的作用是:一次生成一个文件,如果数据量太大,建议多建各个分区

3.写入ES

 val cfg = Map(
      "es.nodes" -> FileProperties.ES_NODES, // 应用所在的服务器地址
      "es.port" -> FileProperties.ES_PORT,  //  端口号
      //      "es.nodes.wan.only" -> "true",
      "es.resource" -> s"pssc_tran_portrait-${sdfMonth.format(date)}",//模板
      "es.mapping.id" -> "TRAN_ID",//主键
      "es.write.operation" -> "upsert",
      "es.net.http.auth.user" -> "账号",
      "es.net.http.auth.pass" -> "密码"
    )
    //写入ES
    EsSparkSQL.saveToEs(ResulToES, cfg)
    println("write es end")
3.1DataFrame写入ES只需要写一个map
3.2导入import org.elasticsearch.spark.sql.EsSparkSQL
3.3EsSparkSQL.saveToEs(ResulToES, cfg)
3.4前3步都是代码中的,ES中也需要写一个模板,下面是我工作中写过的一个模板,可以参考一下
PUT /_template/pssc_tran_portrait_template
{
    "index_patterns": "pssc_tran_portrait-*", 
    "settings": {
        "number_of_shards": 3
    }, 
    "mappings": {
        "_source": {
            "enabled": true
        }, 
        "properties": {
            "10102": {
                "type": "keyword", 
                "store": true
            }, 
            "10107": {
                "type": "keyword", 
                "store": true
            }, 
            "10108": {
                "type": "keyword", 
                "store": true
            }, 
            "10109": {
                "type": "keyword", 
                "store": true
            }, 
            "10110": {
                "type": "keyword", 
                "store": true
            }, 
            "10111": {
                "type": "keyword", 
                "store": true
            }, 
            "10112": {
                "type": "keyword", 
                "store": true
            }, 
            "10113": {
                "type": "keyword", 
                "store": true
            }, 
            "10114": {
                "type": "keyword", 
                "store": true
            }, 
            "10115": {
                "type": "keyword", 
                "store": true
            }, 
            "10116": {
                "type": "keyword", 
                "store": true
            }, 
            "10117": {
                "type": "keyword", 
                "store": true
            }, 
            "10118": {
                "type": "keyword", 
                "store": true
            }, 
            "10119": {
                "type": "keyword", 
                "store": true
            }, 
            "10120": {
                "type": "keyword", 
                "store": true
            }, 
            "10121": {
                "type": "keyword", 
                "store": true
            }, 
            "10210": {
                "type": "keyword", 
                "store": true
            }, 
            "10213": {
                "type": "keyword", 
                "store": true
            }, 
            "10214": {
                "type": "keyword", 
                "store": true
            }, 
            "10215": {
                "type": "keyword", 
                "store": true
            }, 
            "10216": {
                "type": "keyword", 
                "store": true
            }, 
            "10217": {
                "type": "keyword", 
                "store": true
            }, 
            "10220": {
                "type": "keyword", 
                "store": true
            }, 
            "10221": {
                "type": "keyword", 
                "store": true
            }, 
            "30102": {
                "type": "keyword", 
                "store": true
            }, 
            "30107": {
                "type": "keyword", 
                "store": true
            }, 
            "tran_id": {
                "type": "keyword", 
                "store": true, 
                "ignore_above": 80
            }, 
            "tran_name": {
                "type": "keyword", 
                "store": true
            }, 
            "currYearPQ": {
                "type": "nested", 
                "properties": {
                    "year_month": {
                        "type": "integer", 
                        "store": true
                    }, 
                    "P_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }, 
                    "Q_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }
                }
            }, 
            "lastYearPQ": {
                "type": "nested", 
                "properties": {
                    "last_year_month": {
                        "type": "integer", 
                        "store": true
                    }, 
                    "LAST_P_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }, 
                    "LAST_Q_MONTH_MAX": {
                        "type": "double", 
                        "store": true
                    }
                }
            }
        }
    }, 
    "aliases": {
        "search-pssc_tran_portrait": { }
    }
}

4.写入oracle

4.1公司中一般对删除Oracle中数据进行封装
def delAnyTable(tabName: String, condition: String): Unit = {
    var conn: Connection = null
    var delPS: PreparedStatement = null
    val delSql = s"delete  $tabName where $condition "
    println("delSql:"+delSql)
    try {
      Class.forName(FileProperties.oracleDriver) 
      conn = DriverManager.getConnection(ORA_PDR_URL, ORA_PDR_CONNP)
      delPS = conn.prepareStatement(delSql)
      delPS.execute()
      delPS.close()
//      conn.commit()
      println(s"====================删除 $tabName 完毕!")
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (delPS != null) {
        delPS.close()
     //   conn.commit()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }
4.2 这里删除Oracle数据时,因为Oracle的url和配置被写死,只需要传入了表名和过滤条件,就可以对表进去删除操作,但无法对不同的Oracle数据库进行删除,你也可以增加参数url和Properties,来达到对不同数据库进行删除操作
def delAnyTable(tabName: String, condition: String,url :String,connp :Properties): Unit = {
    var conn: Connection = null
    var delPS: PreparedStatement = null
    val delSql = s"delete  $tabName where $condition "
    println("delSql:"+delSql)
    try {
      Class.forName(FileProperties.oracleDriver)
      conn = DriverManager.getConnection(url, connp)
      delPS = conn.prepareStatement(delSql)
      delPS.execute()
      delPS.close()
      //      conn.commit()
      println(s"====================删除 $tabName 完毕!")
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (delPS != null) {
        delPS.close()
        //   conn.commit()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }
4.3同样对插入Oracle中的数据进行封装

//private def getType(df: DataFrame) = df.schema.fields.map(_.dataType.typeName).map(r => if (r.startsWith("decimal")) "decimal" else r)

//public static final Integer BATCH_COMMIT_CNT = 10000;

//public static final Integer OVER_COMMIT_CNT = 1000000;

def toOracleInsert(df: DataFrame, tableName: String) {

    val schema: Array[String] = getType(df)
    val fields = df.schema.fieldNames.mkString(",")
    val fields1 = schema.map(r => "?").mkString(",")
//      schema.foreach(println)
    val sql = s"""insert into $tableName ($fields) values($fields1)"""

    df.repartition(16).foreachPartition(row => {
      var conn: Connection = null
      var ps: PreparedStatement = null

      try {
        Class.forName(FileProperties.oracleDriver)
        conn = DriverManager.getConnection(ORA_PDR_URL, ORA_PDR_CONNP)
        ps = conn.prepareStatement(sql)
        conn.setAutoCommit(false)
        var iCnt: Int = 0
        row.foreach(data => {
          iCnt += 1
          ExecPS(data, ps, schema)
          ps.addBatch()

          if (iCnt % FileProperties.BATCH_COMMIT_CNT == 0 && iCnt != 0) {
            println(s"============================$iCnt")
            ps.executeBatch()
            ps.clearBatch()
          }
          if (iCnt % FileProperties.OVER_COMMIT_CNT == 0 && iCnt != 0){
            conn.commit()
          }
        })

        println(s"============================$iCnt")
        ps.executeBatch()
        ps.clearBatch()
        conn.commit()

      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    })
  }
4.4 同样也可以增加参数,达到对不同Oracle数据库中的数据进行删除操作
//private def getType(df: DataFrame) = df.schema.fields.map(_.dataType.typeName).map(r => if (r.startsWith("decimal")) "decimal" else r)

//public static final Integer BATCH_COMMIT_CNT = 10000;

//public static final Integer OVER_COMMIT_CNT = 1000000;


def toOracleInsert(df: DataFrame, tableName: String,url :String,connp :Properties) {

    val schema = getType(df)
    val fields = df.schema.fieldNames.mkString(",")
    val fields1 = schema.map(r => "?").mkString(",")
//      schema.foreach(println)
    val sql = s"""insert into $tableName ($fields) values($fields1)"""

    df.repartition(16).foreachPartition(row => {
      var conn: Connection = null
      var ps: PreparedStatement = null

      try {
        Class.forName(FileProperties.oracleDriver)
        conn = DriverManager.getConnection(url, connp)
        ps = conn.prepareStatement(sql)
        conn.setAutoCommit(false)
        var iCnt: Int = 0
        row.foreach(data => {
          iCnt += 1
          ExecPS(data, ps, schema)
          ps.addBatch()

          if (iCnt % FileProperties.BATCH_COMMIT_CNT == 0 && iCnt != 0) {
            println(s"============================$iCnt")
            ps.executeBatch()
            ps.clearBatch()
          }
          if (iCnt % FileProperties.OVER_COMMIT_CNT == 0 && iCnt != 0){
            conn.commit()
          }
        })

        println(s"============================$iCnt")
        ps.executeBatch()
        ps.clearBatch()
        conn.commit()

      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    })
  }
4.5 以上的方法一般同时调用
ResaveToOracle.delAnyTable("T_PSSC_TRAN_PORTRAIT_CENTER", s"STAT_DATE=${sdfMonth.format(date)}")
ResaveToOracle.toOracleInsert(ResulToOra2, "T_PSSC_TRAN_PORTRAIT_CENTER")

5.总结

以上都是企业实战,支持回写。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐