1.添加依赖

<properties>
        <flink.version>1.13.6</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
        <fastjson.version>1.2.68</fastjson.version>
        <lombok.version>1.18.12</lombok.version>
        <hbase.version>2.1.0</hbase.version>
    </properties>
    <dependencies>
        <!--hbase客户端-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!--flink流环境-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

2.Hbase连接工具类

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;

import java.util.List;

public class HbaseUtils extends RichSourceFunction<String> {

    private Connection connection = null;
    private ResultScanner rs = null;
    private Table table = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.quorum", "IP:port");//192.168.23.39:2181
        hconf.set("zookeeper.znode.parent", "/hbase");

        //指定用户名为hbase的用户去访问hbase服务
        UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("username");
        connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));

        table = connection.getTable(TableName.valueOf("权限:表名"));//hbase:user
    }

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        Scan scan = new Scan();
//        scan.addFamily(Bytes.toBytes("user_info"));
//        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_name"));

        // Set the cache size.
        scan.setCaching(100);
        rs = table.getScanner(scan);
        
        for (Result result : rs) {
            StringBuilder sb = new StringBuilder();
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) {
                String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                sb.append(value).append("-");
            }
            String value = sb.replace(sb.length() - 1, sb.length(), "").toString();
            sourceContext.collect(value);
        }
    }

    @Override
    public void cancel() {

    }

    @Override
    public void close() throws Exception {
        if (rs != null) rs.close();
        if (table != null) table.close();
        if (connection != null) connection.close();
    }
}

3.执行读取Hbase数据

import com.data.utils.HbaseUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkHbaseApp {
    public static void main(String[] args) throws Exception {
    	//1.获取流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		//2.从hbase读取数据
        DataStream<String> stream = env.addSource(new HbaseUtils());
        stream.print("==============");
        //3.提交任务,开始执行
        env.execute();
    }
}

4.错误日志,及解决方式

4.1AccessDeniedException

  • 错误日志

禁止访问

在这里插入图片描述

  • 解决方案

在默认情况情况,我们使用hbase的java api去访问hbase的服务时,使用的hbase的服务的用户名为启动java程序的系统用户名。
在有些情况下,我们要指定用户。


//指定用户名为hbase的用户去访问hbase服务
UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hbase");
connection = ConnectionFactory.createConnection(configuration, User.create(userGroupInformation));

4.2NoSuchColumnFamilyException

  • 错误日志
    在这里插入图片描述
  • 解决方案

其实报错信息已经很明显了 ,就是HBase表中不存在我读取的那个列族, 然后仔细检查了下, 发现是由于自己写列族名写错了 , 然后更改过来即可

在这里插入图片描述

我读取的那两个表其实是没有info这个列族的 之前的表用到了,然后马虎,这里记下来 避免再次发生错误.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐