spark 读取 Hbase数据(put、scan)

Put

package huorong.utils

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName, client}
import org.apache.hadoop.hbase.client.{BufferedMutator, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.{DataFrame, Row}

object HbaseUtils {
  //hbase put参数

  private val hbaseConfig: Config = ConfigFactory.load("hbase.properties")
  private val ZOOKEEPER_QUORUM_2: String = hbaseConfig.getString("hbase.zookeeper.quorum")
  private val ZOOKEEPER_QUORUM_CLIENT: String = hbaseConfig.getString("hbase.zookeeper.property.clientPort")
  private val MUTE_SIZE: String = hbaseConfig.getString("mute.bufferSize")
  private val HBASE_MASTER: String = hbaseConfig.getString("hbase.master")

  /***
   * @Author: lzx
   * @Description:向hbase put数据
   * @Date: 2022/6/27
   * @Param dataFrame:
   * @Param tableSchema:
   * @Param table:
   * @return: void
   **/

  val conn: Configuration = HBaseConfiguration.create()
  val hconf: client.Connection = ConnectionFactory.createConnection(conn)
  conn.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM_2)
  conn.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_QUORUM_CLIENT)
  conn.set("hbase.master", HBASE_MASTER)
  conn.set("hbase.client.write.buffer", MUTE_SIZE)

  def putHbase(dataFrame: DataFrame,
               tableSchema: String,
               table: String): Unit = {

    dataFrame.foreachPartition((partition:Iterator[Row]) => {
      val mutate: BufferedMutator = hconf.getBufferedMutator(TableName.valueOf(s"$tableSchema:$table"))
      try {
        partition.foreach(row => {
          val put: Put = new Put(Bytes.toBytes(row.getAs[String]("rk")))
          val scan_time: Long = row.getAs[Long]("scan_time")
          println(scan_time) // 1656368784999  2022-06-28 06:26:24
          row.schema.fieldNames
            .filter(!_.equals("rk"))
            .foreach(f => {
              if (f.equals("engine_dbtime") || f.equals("scan_time") || f.equals("task_id")) {
                put.addColumn(Bytes.toBytes("i"), Bytes.toBytes(s"$f"), scan_time, Bytes.toBytes(row.getAs[Long](s"$f")))
              } else {
                put.addColumn(Bytes.toBytes("i"), Bytes.toBytes(s"$f"), scan_time, Bytes.toBytes(row.getAs[String](s"$f")))
              }
            })
          put.setTimestamp(scan_time)
          mutate.mutate(put)
        })
        mutate.flush()
      } finally {
        mutate.close()
      }
    })
  }
}

scan

package huorong

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object sparkReadHbase {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().master("local[*]").appName("readHbase").getOrCreate()

    val hconf: Configuration = HBaseConfiguration.create()
    hconf.set("hbase.zookeeper.property.clientPort", "2181");
    hconf.set("hbase.zookeeper.quorum","xxx");

    //设置读取HBase表的名称和读取数量
    hconf.set(TableInputFormat.INPUT_TABLE, "TMP:lzxtest");
    hconf.set(TableInputFormat.SCAN_BATCHSIZE,"100")

    val rdd: RDD[(ImmutableBytesWritable, Result)] = session.sparkContext.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    import session.implicits._

    val df: DataFrame = rdd.map(f => {
      val rowkey: String = Bytes.toString(f._1.get())
      val result: Result = f._2
      val sha1: String = Bytes.toString(result.getValue(Bytes.toBytes("i"), Bytes.toBytes("sha1")))
      val task_id: Long = Bytes.toLong(result.getValue(Bytes.toBytes("i"), Bytes.toBytes("task_id")))
      val scan_time: Long = Bytes.toLong(result.getValue(Bytes.toBytes("i"), Bytes.toBytes("scan_time")))
      (rowkey, sha1, task_id,scan_time)
    }).toDF("rk", "sha1", "task_id","scan_time")

    df.show(false)
  }
}

TableInputFormat 的可选参数,自定义读取数据


  /** Job parameter that specifies the input table. */
  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
  /**
   * If specified, use start keys of this table to split.
   * This is useful when you are preparing data for bulkload.
   */
  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
  /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
   * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
   */
  public static final String SCAN = "hbase.mapreduce.scan";
  /** Scan start row */
  public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
  /** Scan stop row */
  public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
  /** Column Family to Scan */
  public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
  /** Space delimited list of columns and column families to scan. */
  public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
  /** The timestamp used to filter columns with a specific timestamp. */
  public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
  /** The starting timestamp used to filter columns with a specific range of versions. */
  public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
  /** The ending timestamp used to filter columns with a specific range of versions. */
  public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
  /** The maximum number of version to return. */
  public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
  /** Set to false to disable server-side caching of blocks for this scan. */
  public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
  /** The number of rows for caching that will be passed to scanners. */
  public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
  /** Set the maximum number of values to return for each call to next(). */
  public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
  /** Specify if we have to shuffle the map tasks. */
  public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐