一、背景介绍

        项目中需要按需连接不同的数据源,原项目默认为mysql连接,现在要添加Hbase、Impala、Orcale三种数据源,以支持现有的查询(属于报表子系统)。 MySql、Orcale、Impala是关系型数据库,可以用bean来配置连接池,方便、快捷的完成数据库访问。而Hbase属于列存储数据库,是一个NoSQL数据库,可存储大量非关系型数据。我们这里用phoenix来连接Hbase,它相当于一个Java中间件,帮助开发者,像使用jdbc访问关系型数据库一些,访问NoSql数据库HBase。参考phoenix的官方文档发现它属于轻量级连接完全不需要连接池(官方参考地址:http://phoenix.apache.org/faq.html#Should_I_pool_Phoenix_JDBC_Connections)。

        下面我们通过上面的方式分别连接下Orcale、Impala、Hbase这三个数据源,MySql数据库是直接整合了MyBatis,这里不过多叙述。

方式一:传统的JDBC连接方式连接

1、Oracle数据连接池

<bean name="oracledataSource" class="com.alibaba.druid.pool.DruidDataSource"
        init-method="init" destroy-method="close">
        <!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
        <property name="driverClassName" value="${jdbc.oracleDriverClass}" />
        <property name="url" value="${jdbc.oracleurl}" />
        <property name="username" value="${jdbc.oracleusername}" />
        <property name="password" value="${jdbc.oraclepassword}" />
        <!-- 配置初始化大小、最小、最大 -->
        <property name="initialSize" value="${jdbc.initialSize}" />
        <property name="minIdle" value="${jdbc.minIdle}" />
        <property name="maxActive" value="${jdbc.maxActive}" />
        <!-- 配置获取连接等待超时的时间 -->
        <property name="maxWait" value="60000" />
        <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
        <property name="minEvictableIdleTimeMillis" value="300000" />
        <property name="validationQuery" value="SELECT 'x' FROM DUAL" />
        <property name="testWhileIdle" value="true" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
        <property name="poolPreparedStatements" value="true" />
        <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />
        <!-- 配置监控统计拦截的filters -->
        <property name="filters" value="stat" />
    </bean>

2、Impala数据连接池 

<bean id="impaldataSource" class="com.cloudera.impala.jdbc4.DataSource">
    <property name="loginTimeout" value="30000"/>
    <property name="URL" value="${jdbc.impalurl}"/>
    <property name="userID" value="${jdbc.impalusername}"/>
    <property name="password" value="${jdbc.impalpassword}"/>
</bean>


 3、Hbase用phoenix工具做连接和普通的MySql连接一样下面有案例供参考;

这里把三个数据源获取连接的方法做了整合,具体代码如下:

package com.xxx.xxx;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import javax.sql.DataSource; 
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.BeanHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.aspectj.lang.JoinPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.datasource.DataSourceUtils; 
import com.alibaba.druid.pool.DruidDataSource;
import com.ymdd.galaxy.report.utils.system.Param;
 
public final class DataBaseHelper {
	
    private static final Logger LOGGER = LoggerFactory.getLogger(DataBaseHelper.class);
    //用可插入策略处理SQL查询
    private static final QueryRunner QUERY_RUNNER = new QueryRunner();
    //不同的数据源连接池的id
    private static final String DATA_SOURCE_A = "oracledataSource";
    private static final String DATA_SOURCE_B = "impaldataSource";
    //Hbase的查询工具phoenix的数据驱动
	private static String HBASE_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
 
    //通过不同id获取不同的连接池从而拿到数据库连接
    public static Connection getImpalaConn(){
        Connection conn = null;
	 com.cloudera.impala.jdbc4.DataSource dataSource = (com.cloudera.impala.jdbc4.DataSource) ApplicationContextHelper.getBean(DATA_SOURCE_B);
        conn = DataSourceUtils.getConnection(dataSource);
        return conn;
    }
    
    public static Connection getOracleConn(){
    	Connection conn = null;
    	 DruidDataSource dataSource = (DruidDataSource) ApplicationContextHelper.getBean(DATA_SOURCE_A);
         conn = DataSourceUtils.getConnection(dataSource);
         return conn;
    }
    //这里没用连接池直接用传统的连接方式连接
    public static Connection getHbaseConn() {
		Connection con=null;
		try {
			Class.forName(HBASE_DRIVER);
			con = DriverManager.getConnection(Param.HbaswUrl);
		} catch (SQLException | ClassNotFoundException e) {
			e.printStackTrace();
		}
		return con;
    }
    /**
     *  关闭连接
     */
    public static void closeConn(Connection conn){
        if(conn!=null){
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     *  关闭连接
     */
    public static void closeConn(Connection conn,DataSource dataSource){
        if(conn!=null){
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        DataSourceUtils.releaseConnection(conn,  dataSource);
    }
 
    /**
     * 根据不同连接、查询结果映射的实体类、sql、参数来查询单个实体类的方法封装
     */
	private static <T> T aspectByBeanHanlder(Connection conn, Class<T> entityClass, String sql, Object... params) throws SQLException {
		Long startTime = System.currentTimeMillis();
		T entity = QUERY_RUNNER.query(conn,sql,new BeanHandler<T>(entityClass),params);
		Long endTime = System.currentTimeMillis();
		LOGGER.info("【执行查询ByBeanHanlder-SQL总用时】: {}ms", (endTime - startTime));
		return entity;
	}
    /**
     * 根据不同连接、查询结果映射的实体类、sql、参数来查询实体类列表的方法封装
     */
	private static <T> List<T> aspectByBeanListHandler(Connection conn, Class<T> entityClass, String sql, Object... params)
			throws SQLException {
		Long startTime = System.currentTimeMillis();
		List<T> entityList = QUERY_RUNNER.query(conn,sql,new BeanListHandler<T>(entityClass),params);
		Long endTime = System.currentTimeMillis();
		LOGGER.info("【执行查询ByBeanListHandler-SQL总用时】: {}ms", (endTime - startTime));
		return entityList;
	}
    
    /**
     * 通用查询实体列表
     */
    public  static <T> List<T> QueryEntityList( Connection conn,Class<T> entityClass , String sql, Object ...params ){
        List<T> entityList = null;
        if(conn==null) return entityList;
 
        try {
            entityList = aspectByBeanListHandler(conn, entityClass, sql, params);
 
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entityList;
    }
    
    /**
     * 通用查询实体
     */
    public  static <T> T QueryEntity(Connection conn,Class<T> entityClass , String sql, Object ...params ){
        T entity = null;
        if(conn==null) return entity;
        try {
            entity = aspectByBeanHanlder(conn, entityClass, sql, params);
 
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entity;
    }
 
    /**
     * Impala查询实体列表
     */
    public  static <T> List<T> QueryEntityList(Class<T> entityClass , String sql, Object ...params ){
        List<T> entityList = null;
        Connection conn = getImpalaConn();
 
        if(conn==null) return entityList;
 
        try {
            entityList = aspectByBeanListHandler(conn, entityClass, sql, params);
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entityList;
     }
 
    /**
     * Impala查询实体
     */
    public  static <T> T QueryEntity(Class<T> entityClass , String sql, Object ...params ){
        T entity = null;
        Connection conn = getImpalaConn();
        if(conn==null) return entity;
        try {
            entity = aspectByBeanHanlder(conn, entityClass, sql, params);
 
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entity;
    }
 
    /**
     * Oracle查询实体列表
     */
    public  static <T> List<T> QueryEntityListOracle(Class<T> entityClass , String sql, Object ...params ){
        List<T> entityList = null;
        Connection conn = getOracleConn();
 
        if(conn==null) return entityList;
 
        try {
            entityList = aspectByBeanListHandler(conn, entityClass, sql, params);
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entityList;
     }
 
    /**
     * Oracle查询实体
     */
    public  static <T> T QueryEntityOracle(Class<T> entityClass , String sql, Object ...params ){
        T entity = null;
        Connection conn = getOracleConn();
        if(conn==null) return entity;
        try {
            entity = aspectByBeanHanlder(conn, entityClass, sql, params);
 
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entity;
    }
 
    /**
     * Hbase查询实体
     */
    public  static <T> T QueryEntityHbase(Class<T> entityClass , String sql, Object ...params ){
        T entity = null;
        Connection conn = getHbaseConn();
        if(conn==null) return entity;
        try {
            entity = aspectByBeanHanlder(conn, entityClass, sql, params);
 
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entity;
    }
    /**
     * Hbase查询实体列表
     */
    public  static <T> List<T> QueryEntityListHbase(Class<T> entityClass , String sql, Object ...params ){
        List<T> entityList = null;
        Connection conn = getHbaseConn();
        if(conn==null) return entityList;
        try {
            entityList = aspectByBeanListHandler(conn, entityClass, sql, params);
        } catch (Exception e) {
            LOGGER.error("查询失败",e);
        }finally {
            closeConn(conn);
        }
        return  entityList;
    }
    
}

有这个类之后,在代码里可以直接通过写sql的形式来完成对数据库的查询以及映射 ,具体用法如下:

      以Hbase连接为例(上面类中的方法即可查询到你想要的数据):

CargoEntity assessmentMonthDayVo = DataBaseHelper.QueryEntityHbase(CargoEntity.class, sql.toString(), deptCode);
List<CargoEntity> assessmentDayRankList = DataBaseHelper.QueryEntityListHbase(CargoEntity.class, sql.toString(), deptCode);

方式二: spring cloud 上集成Hbase和mysql

1、其他spring sloud所需依赖自行添加即可 

2、datasource的相关配置:

  datasource:
    mysql:
      jdbcUrl: jdbc:mysql://127.0.0.1:3306/clouddo?useUnicode=true&characterEncoding=utf8&useJDBCCompliantTimezoneShift=true&serverTimezone=UTC
      driverClassName: com.mysql.jdbc.Driver
      username: root
      password: root
 
    phoenix:
      enable: true
      jdbcUrl: jdbc:phoenix:lyzk01:2181
      driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
      username:
      password:
      default-auto-commit: true

3、创建datasource配置类 :MysqlDataSource、PhoenixDataSource

注意:一定要指定一个datasource为@Primary

package com.ly.lydataphoenix.dataSource;

/** 
 * @description mysql数据源配置 
 */
@Configuration
@MapperScan(basePackages = "com.ly.lydataphoenix.dao.mysql", sqlSessionFactoryRef = "mysqlSessionFactory")
public class MysqlDataSource {
 
    @Bean(name = "mysqlJdbcDataSource")
    @Qualifier("mysqlJdbcDataSource")
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.mysql")
    public DataSource dataSource() {
        return DataSourceBuilder.create().build();
    }
 
    @Bean(name = "mysqlSessionFactory")
    @Primary
    public SqlSessionFactory test1SqlSessionFactory(@Qualifier("mysqlJdbcDataSource") DataSource datasource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(datasource);
        bean.setMapperLocations(
                // 设置mybatis的xml所在位置
                new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/mysql/*.xml"));
        return bean.getObject();
    }
 
 
    @Bean(name = "mysqlJdbcTemplate")
    public JdbcTemplate mysqlJdbcTemplate(@Qualifier("mysqlJdbcDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    } 
}
package com.ly.lydataphoenix.dataSource;

/** 
 * @description phoenix数据源配置 
 */
 
@Configuration
@MapperScan(basePackages = "com.ly.lydataphoenix.dao.hbase", sqlSessionFactoryRef = "hbaseSessionFactory")
public class PhoenixDataSource {
 
    @Bean(name = "phoenixJdbcDataSource")
    @Qualifier("phoenixJdbcDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.phoenix")
    public DataSource dataSource() {
        return DataSourceBuilder.create().build();
    }
 
    @Bean(name = "hbaseSessionFactory")
    public SqlSessionFactory test1SqlSessionFactory(@Qualifier("phoenixJdbcDataSource") DataSource datasource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(datasource);
        bean.setMapperLocations(
                // 设置mybatis的xml所在位置
                new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/hbase/*.xml"));
        return bean.getObject();
    }
 
    @Bean(name = "phoenixJdbcTemplate")
    public JdbcTemplate phoenixJdbcTemplate(@Qualifier("phoenixJdbcDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
} 

4、Dao代码路径跟mapper.xml的目录要对应上:

 

UserInfoMapper

package com.ly.lydataphoenix.dao.hbase;
 
import com.ly.lydataphoenix.domain.UserInfo;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface UserInfoMapper {
    public UserInfo getUserById(int userId);
} 

DeptMapper

package com.ly.lydataphoenix.dao.mysql;
 
import com.ly.lydataphoenix.domain.DeptDO;
import org.apache.ibatis.annotations.Mapper;

/** 部门管理  */
@Mapper
public interface DeptMapper {
	DeptDO get(int deptId);
}  

UserInfoMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 
    <mapper namespace="com.ly.lydataphoenix.dao.hbase.UserInfoMapper">
 
    <select id="getUserById" resultType="com.ly.lydataphoenix.domain.UserInfo">
        select * from TEST.USER_INFO WHERE ID=#{userId}
    </select>
</mapper>

DeptMapper.xml

<!--<?xml version="1.0" encoding="UTF-8"?>-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 
<mapper namespace="com.ly.lydataphoenix.dao.mysql.DeptMapper">
	<select id="get" resultType="com.ly.lydataphoenix.domain.DeptDO">
		select
		`dept_id`,`parent_id`,`name`,`order_num`,`del_flag` from sys_dept
		where dept_id = #{value}
	</select>
</mapper>

DeptDO


public class DeptDO implements Serializable {
	private static final long serialVersionUID = 1L;
	
	//
	private Long deptId;
	//上级部门ID,一级部门为0
	private Long parentId;
	//部门名称
	private String name;
	//排序
	private Integer orderNum;
	//是否删除  -1:已删除  0:正常
	private Integer delFlag;
………………set   get
 }

UserInfo


public class UserInfo { 
    private int id;
    private String name;

5、编写单元测试类

 
import com.ly.lydataphoenix.dao.hbase.UserInfoMapper;
import com.ly.lydataphoenix.dao.mysql.DeptMapper;
import com.ly.lydataphoenix.domain.DeptDO;
import com.ly.lydataphoenix.domain.UserInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; 
import javax.annotation.Resource;
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class MainTest {
 
//    @Autowired
//    @Qualifier("mysqlJdbcTemplate")
//    JdbcTemplate mysqlJdbcTemplate;
//
//    @Autowired
//    @Qualifier("phoenixJdbcTemplate")
//    JdbcTemplate phoenixJdbcTemplate;
 
    @Resource
    private DeptMapper deptMapper;
 
    @Resource
    private UserInfoMapper userInfoMapper;
 
    @Test
    public void mysqlTest() {
        DeptDO deptDO = deptMapper.get(6);
        System.out.println("name===" + deptDO.getName());
    }
 
    @Test
    public void hbaseTest() {
        UserInfo userInfo = userInfoMapper.getUserById(1);
        System.out.println(String.format("ID=%s;NAME=%s", userInfo.getId(), userInfo.getName()));
    }
 
//    @Test
//    public void DataSourceTest() {
//        String name = mysqlJdbcTemplate.queryForObject("select name from sys_user where user_id=1", String.class);
//        System.out.printf("name===" + name);
//    }
//
//    @Test
//    public void DataSourceTest2() {
//        String name = phoenixJdbcTemplate.queryForObject("select name from TEST.USER_INFO WHERE ID=1", String.class);
//        System.out.printf("name===" + name);
//    } 
}

mysqlTest测试结果

hbaseTest测试结果

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐