项目部署要连接外部系统(如苏研平台)的大数据平台hive和hbase组件,大数据平台配置了kerberos安全认证,但对方没有提供相关的连接示例代码。

由于是第一次对接,部署调试过程中遇到很多问题,记录下来,仅供参考。

(遇到的问题比较多,如:gss initiate failed,gss initiate failed,unable to obtain passwor,peer indicated failure: Unsupported machanism type PLAIN,no rules applied to hive,unable to obtain password from user,malformed representation of principal,GSS initiate failed no service creds,failed to find any kerberos tgt等,还有创建hive外表时:hbase.tablenotfoundexception)

 

1、hive连接配置代码:


    @Value("${spring.datasource.hive.driver-class-name}")
    private String hiveDriverClassName;
    @Value("${spring.datasource.hive.url}")
    private String hiveJdbcUrl;
    @Value("${spring.datasource.hive.username}")
    private String hiveUsername;
    @Value("${spring.datasource.hive.password}")
    private String hivePassword;
    // 是否需要kerberos认证
    @Value("${spring.datasource.kerberos.auth}")
    private Boolean auth;
    // kerberos认证
    @Value("${spring.datasource.kerberos.authentication}")
    private String authentication;
    // kerberos认证的用户principal名称
    @Value("${spring.datasource.kerberos.principal}")
    private String principal;
    // 用户的keytab认证文件
    @Value("${spring.datasource.kerberos.keytab}")
    private String keytab;
    // kerberos5的配置文件
    @Value("${spring.datasource.kerberos.krb5.conf}")
    private String krb5Conf;   

 /**
     * @return
     */
    @Bean(name = "datasourceHive")
    public DataSource getDataSourceHive() {
        //
        if (auth) {
            log.info("------start Kerberos auth-----");
            kerberosLogin();
            log.info("------end Kerberos auth-----");
        }
        //
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setDriverClassName(hiveDriverClassName);
        dataSource.setUrl(hiveJdbcUrl);
        dataSource.setUsername(hiveUsername);
        dataSource.setPassword(hivePassword);
        dataSource.setTestWhileIdle(true);
        dataSource.setValidationQuery("SELECT 1");
        log.info("------------------------datasourceHive dataSource.getUrl(): {}", dataSource.getUrl());
        return dataSource;
    }

    /**
     *
     */
    public void kerberosLogin() {
        log.info("kerberos login info: {}, {}, {}, {}", authentication, principal, krb5Conf, keytab);
        //
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set("hadoop.security.authentication", authentication);
        System.setProperty("java.security.krb5.conf", krb5Conf);
        try {
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(principal, keytab);
            log.info("the login user: {}", UserGroupInformation.getCurrentUser().getUserName());
        } catch (Exception e) {
            log.error("Kerberos login fail.", e);
        }
    }

相关配置信息:

#hive配置
spring.datasource.hive.driver-class-name=org.apache.hive.jdbc.HiveDriver
spring.datasource.hive.url=jdbc:hive2://7226:2181,4200-7227:2181,7228:2181/default;principal=hive/hivecluster@BCH;serviceDiscoveryMode=zookeeper;zookeeperNamespace=hiveserver2;sasl.qop=auth;auth=KERBEROS?mapreduce.job.queuename=root.aaa.xxxx
spring.datasource.hive.username=hive
spring.datasource.hive.password=hive
#kerberos认证
spring.datasource.kerberos.auth=true
spring.datasource.kerberos.authentication=Kerberos
spring.datasource.kerberos.principal=xxxx/bdoc@BCH
spring.datasource.kerberos.keytab=/home/xxxx/shmc.keytab
spring.datasource.kerberos.krb5.conf=/home/xxxx/krb5_liu.conf
spring.datasource.kerberos.hbase.principal=hbase/_HOST@BBBB

hive多数据源配置可以查看上一篇文章:springBoot多数据源配置,如mysql、hive,使用druid连接池

 

2、hbase连接配置代码:

package ····.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;

/**
 * @author Lourier
 * @Description
 * @created 2020-01-09 17:40
 */
@Slf4j
@Configuration
public class HBaseConfig {

    @Value("${spring.hbase.zookeeper.quorum}")
    private String zookeeperQuorum;

    @Value("${spring.hbase.zookeeper.property.clientPort}")
    private String clientPort;

    @Value("${spring.hbase.zookeeper.znode.parent}")
    private String znodeParent;

    // 是否需要kerberos认证
    @Value("${spring.datasource.kerberos.auth}")
    private Boolean auth;
    // kerberos认证
    @Value("${spring.datasource.kerberos.authentication}")
    private String authentication;
    // kerberos认证的用户principal名称
    @Value("${spring.datasource.kerberos.principal}")
    private String principal;
    // 用户的keytab认证文件
    @Value("${spring.datasource.kerberos.keytab}")
    private String keytab;
    // kerberos5的配置文件
    @Value("${spring.datasource.kerberos.krb5.conf}")
    private String krb5Conf;
    @Value("${spring.datasource.kerberos.hbase.principal}")
    private String hbasePrincipal;


    //
    @Value("${spring.hbase.table.video}")
    private String videoTable;
    // 连接
    public static Connection conn = null;

    /**
     * @return
     */
    public Connection getConnection() {
        try {
            if (null == conn || conn.isClosed()) {
                log.warn("conn is null or closed...");
                conn = ConnectionFactory.createConnection(getConfiguration());
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return conn;
    }

    /**
     * @return
     */
    public org.apache.hadoop.conf.Configuration getConfiguration() {
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
        conf.set("hbase.zookeeper.property.clientPort", clientPort);
        conf.set("zookeeper.znode.parent", znodeParent);
        // KeyValue size too large
        conf.set("hbase.client.keyvalue.maxsize", "20971520");

        if (auth) {
            log.info("......Hbase Kerberos auth start......");
            conf.set("hadoop.security.authentication", authentication);
            conf.set("hbase.security.authentication", authentication);
            conf.set("hbase.security.authorization", "true");
            conf.set("hbase.master.kerberos.principal", hbasePrincipal);
            conf.set("hbase.regionserver.kerberos.principal", hbasePrincipal);

            System.setProperty("java.security.krb5.conf", krb5Conf);
            try {
                UserGroupInformation.setConfiguration(conf);
                UserGroupInformation.loginUserFromKeytab(principal, keytab);
                log.info(" the login user: {}", UserGroupInformation.getCurrentUser().getUserName());
            } catch (Exception e) {
                log.error("Kerberos login fail.", e);
            }
            log.info("......Hbase Kerberos auth end......");
        }
        return conf;
    }

    /**
     * test
     *
     * @return
     */
    public org.apache.hadoop.conf.Configuration getConfiguration11111() {
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "xxx-apollo4200-7226,xxx-apollo4200-7227,xxx-apollo4200-7228");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("zookeeper.znode.parent", "/hbase-secure");
        // KeyValue size too large
        conf.set("hbase.client.keyvalue.maxsize", "20971520");

        if (auth) {
            log.info("......Hbase Kerberos auth start......");
            conf.set("hadoop.security.authentication", "Kerberos");
            conf.set("hbase.security.authentication", "Kerberos");
            conf.set("hbase.security.authorization", "true");
            conf.set("hbase.master.kerberos.principal", "hbase/_HOST@BCHKDC");
            conf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@BCHKDC");

            System.setProperty("java.security.krb5.conf", "/home/xxxx/krb5_liu.conf");
            try {
                UserGroupInformation.setConfiguration(conf);
                UserGroupInformation.loginUserFromKeytab("xxxx/bdoc@BCH", "/home/xxxx/xxxx.keytab");
                log.info(" the login user: {}", UserGroupInformation.getCurrentUser().getUserName());
            } catch (Exception e) {
                log.error("Kerberos login fail.", e);
            }
            log.info("......Hbase Kerberos auth end......");
        }
        return conf;
    }


    /**
     * set HbaseTemplate
     *
     * @return
     */
    @Bean
    public HbaseTemplate hbaseTemplate() {

        org.apache.hadoop.conf.Configuration conf = getConfiguration();

        try {
            log.info("---------------test start Hbase ConnectionFactory----------");
            conn = ConnectionFactory.createConnection(conf);

            //
            Admin admin = conn.getAdmin();
            String tableNameStr = videoTable;
            TableName tableName = TableName.valueOf(tableNameStr);
            if (admin.tableExists(tableName)) {
                log.info("..............table exist: {}", tableNameStr);
            }
            //数据表描述对象
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
            log.info("..............hTableDescriptor:" + hTableDescriptor);

//            Table table = conn.getTable(tableName);
//            ResultScanner scanner = table.getScanner(new Scan());
//            //循环输出表中的数据
//            int i = 0;
//            for (Result result : scanner) {
//                i++;
//                byte[] row = result.getRow();
//                System.out.println("row key is:" + new String(row));
//                List<Cell> listCells = result.listCells();
                for (Cell cell : listCells) {
                    byte[] familyArray = cell.getFamilyArray();
                    byte[] qualifierArray = cell.getQualifierArray();
                    byte[] valueArray = cell.getValueArray();
                    System.out.println("row value is:" + new String(familyArray) +
                            new String(qualifierArray) + new String(valueArray));
                }
//                if (i == 10) {
//                    break;
//                }
//            }
            log.info("---------------test end Hbase ConnectionFactory----------");
        } catch (Exception e) {
            log.error("hbase ConnectionFactory error...", e);
        }

        //
        HbaseTemplate hbaseTemplate = new HbaseTemplate();
        hbaseTemplate.setConfiguration(conf);
        hbaseTemplate.setAutoFlush(true);
        return hbaseTemplate;
    }
}

项目中没有用springboot原生封装的HbaseTemplate,使用的是ConnectionFactory,因为HbaseTemplate会有一些坑,具体原因可网上查下。

配置信息:

# hbase
spring.hbase.zookeeper.quorum=7226,7227,7228
spring.hbase.zookeeper.property.clientPort=2181
spring.hbase.zookeeper.znode.parent=/hbase-secure
spring.hbase.table.video=xxxx:video_info_table
spring.hbase.table.video.culumnFamily=info

hbase查询代码示例:

// 通过Connection方式
    private static Connection conn = HBaseConfig.conn;


    public void initial() {
        if (null == conn || conn.isClosed()) {
            log.info("conn is null/close,,, initial hbase Connection...");
//            conn = HBaseConfig.conn;
            conn = hBaseConfig.getConnection();

            log.info("after getConnection, conn:{}", conn);
        }
    }


    public static void close() {
        if (conn != null) {
            try {
                conn.close();
                conn = null;
                log.info("close hbase Connection...");
            } catch (Exception e) {
                log.error("close error:", e);
            }
        }
    }

    /**
     * 通过connect查询
     *
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @return
     */
    public Map<String, String> queryHbaseData(String tableName, String rowKey, String columnFamily) {
        Table table = null;
        Map<String, String> map = new HashMap<>();
        try {
            table = conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(rowKey.getBytes());
            Result result = table.get(get);
            Map<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes(columnFamily));
            if (null != familyMap && !familyMap.isEmpty()) {
                for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) {
                    map.put(Bytes.toString(entry.getKey()), Bytes.toString(entry.getValue()));
                }
            }
        } catch (Exception e) {
            log.error("queryData1 error:", e);
        } finally {
            try {
                if (table != null) {
                    table.close();
                }
            } catch (Exception e) {
                log.error("queryData1 error:", e);
            }
        }
        return map;
    }

执行任务前可以先初始化hive和hbase的连接,不然会报错(如:gss initiate failed,failed to find any kerberos tgt等):

        log.info("start executeTask....");
        // 认证
        hBaseService.initial();
        dataSourceConfig.kerberosLogin();
        // 业务逻辑
        videoInfoService.execute();
        hBaseService.close();
        log.info("end of executeTask....");

以上以上,仅供参考:

 

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐