概述

在大型项目开发中,当数据量达到一定程度后,我们一般采用分库分表来实现数据层的拓展,面对复杂的存储层,相应的数据库中间件和多数据源切换的需求就应运而生。

这里我们重点讨论下多数据源切换,多数据源的动态切换可以方便我们快速的实现主从读写分离、异构纯粹多库等动态数据库操作。

常见的多数据源一般有以下两种解决方案:

1、通过显示模式声明多数据源,应用切换。优点是简单,缺点是不易于拓展,强耦合。

2、通过优先代理类实现动态切换,Spring的AbstractRoutingDataSource就是采用这种架构。

这里我们重点讨论第二种方式,我们会引入和Mybatis-Plus师出同门的dynamic-datasource-spring-boot-starter多数据源实现框架,通过样例逐步分析其实现原理,同时我们也将拓展分析讨论下Spring的AbstractRoutingDataSource的实现原理。

使用案例

下面我们将基于dynamic-datasource-spring-boot-starter实现动态多数据源切换。

1、引入动态数据源服务

<dependency>
  <groupId>com.baomidou</groupId>
  <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  <version>${version}</version>
</dependency>

2、配置数据源

spring:
  datasource:
    dynamic:
      primary: master #设置默认的数据源或者数据源组,默认值即为master
      strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
      datasource:
        master:
          url: jdbc:mysql://xx.xx.xx.xx:3306/dynamic
          username: root
          password: 123456
          driver-class-name: com.mysql.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
        slave:
          url: jdbc:mysql://xx.xx.xx.xx:3307/dynamic
          username: root
          password: 123456
          driver-class-name: com.mysql.jdbc.Driver

其他的一主多从、多主多从、纯粹多库和混合配置等的配置项,请参考官网dynamic-datasouce

3、使用 @DS 切换数据源。

@DS 可以注解在方法上或类上,同时存在就近原则 方法上注解 优先于 类上注解。

注解

结果

没有@DS

默认数据源

@DS("dsName")

dsName可以为组名也可以为具体某个库的名称

项目还提供了常用的@Master和@Slave注解,

    /**
     * 从 主库 查询用户列表,等同于@DS("master")
     */
	@Master 
    @Override
    public List<User> selectMasterList() {

        LambdaQueryWrapper<User> queryWrapper = Wrappers.lambdaQuery();
        List<User> list = this.list(queryWrapper);
        return list;
    }

    /**
     * 从 从库 查询用户列表,等同于@DS("slave")
     */
    @Slave
    @Override
    public List<User> selectSlaveList() {
        LambdaQueryWrapper<User> queryWrapper = Wrappers.lambdaQuery();
        List<User> list = this.list(queryWrapper);
        return list;
    }

dynamic-datasource-spring-boot-starter源码分析

源码结构

 

源码分析

1、一般starter项目都是从META-INF/spring.factories指定的自动化配置文件开始入手

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceAutoConfiguration

2、自动配置类:DynamicDataSourceAutoConfiguration

/**
 * 动态数据源核心自动配置类
 *
 * @author TaoYu Kanyuxia
 * @see DynamicDataSourceProvider
 * @see DynamicDataSourceStrategy
 * @see DynamicRoutingDataSource
 * @since 1.0.0
 */
@Slf4j
@Configuration
@AllArgsConstructor
//自定义动态数据源配置项, 以spring.datasource.dynamic为前缀读取配置
@EnableConfigurationProperties(DynamicDataSourceProperties.class)
//声明需要再Spring Boot的DataSourceAutoConfiguration配置前执行
//详情参考:spring.boot.autoconfigure#DataSourceAutoConfiguration,实现了spring boot关于JDBC的自动化配置
@AutoConfigureBefore(value = DataSourceAutoConfiguration.class, name = "com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure")
@Import(value = {DruidDynamicDataSourceConfiguration.class, DynamicDataSourceCreatorAutoConfiguration.class, DynamicDataSourceHealthCheckConfiguration.class})
@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class DynamicDataSourceAutoConfiguration {

    private final DynamicDataSourceProperties properties;

    
    // 多数据源加载接口,默认从yml中读取多数据源配置
    @Bean
    @ConditionalOnMissingBean
    public DynamicDataSourceProvider dynamicDataSourceProvider() {
        Map<String, DataSourceProperty> datasourceMap = properties.getDatasource();
        return new YmlDynamicDataSourceProvider(datasourceMap);
    }

    //创建动态多数据源DataSource
    @Bean
    @ConditionalOnMissingBean
    public DataSource dataSource(DynamicDataSourceProvider dynamicDataSourceProvider) {
        DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
        dataSource.setPrimary(properties.getPrimary());
        dataSource.setStrict(properties.getStrict());
        dataSource.setStrategy(properties.getStrategy());
        dataSource.setProvider(dynamicDataSourceProvider);
        dataSource.setP6spy(properties.getP6spy());
        dataSource.setSeata(properties.getSeata());
        return dataSource;
    }

     //AOP切面,对DS注解过的方法进行增强,达到切换数据源的目的
    @Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)
    @Bean
    public Advisor dynamicDatasourceAnnotationAdvisor(DsProcessor dsProcessor) {
        DynamicDataSourceAnnotationInterceptor interceptor = new DynamicDataSourceAnnotationInterceptor(properties.isAllowedPublicOnly(), dsProcessor);
        DynamicDataSourceAnnotationAdvisor advisor = new DynamicDataSourceAnnotationAdvisor(interceptor);
        advisor.setOrder(properties.getOrder());
        return advisor;
    }

    //AOP切面,对DSTransactional注解过的方法进行增强,实现事务,基于seata
    @Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)
    @ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false", matchIfMissing = true)
    @Bean
    public Advisor dynamicTransactionAdvisor() {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");
        return new DefaultPointcutAdvisor(pointcut, new DynamicLocalTransactionAdvisor());
    }

    //动态参数解析器链
    @Bean
    @ConditionalOnMissingBean
    public DsProcessor dsProcessor(BeanFactory beanFactory) {
        DsHeaderProcessor headerProcessor = new DsHeaderProcessor();
        DsSessionProcessor sessionProcessor = new DsSessionProcessor();
        DsSpelExpressionProcessor spelExpressionProcessor = new DsSpelExpressionProcessor();
        spelExpressionProcessor.setBeanResolver(new BeanFactoryResolver(beanFactory));
        headerProcessor.setNextProcessor(sessionProcessor);
        sessionProcessor.setNextProcessor(spelExpressionProcessor);
        return headerProcessor;
    }
}

3、核心动态数据源组件:DynamicRoutingDataSource

/**
 * 核心动态数据源组件
 *
 * @author TaoYu Kanyuxia
 * @since 1.0.0
 */
@Slf4j
public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {

    private static final String UNDERLINE = "_";
	
    /**
     * 所有数据库
     */
    private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
    /**
     * 分组数据库
     */ 
    private final Map<String, GroupDataSource> groupDataSources = new ConcurrentHashMap<>();
   /**
    * 动态DataSource提供器
    */
    @Setter
    private DynamicDataSourceProvider provider;
   /**
    * 动态DataSource切换策略
    */
    @Setter
    private Class<? extends DynamicDataSourceStrategy> strategy = LoadBalanceDynamicDataSourceStrategy.class;
    @Setter
    private String primary = "master";
    @Setter
    private Boolean strict = false;
    @Setter
    private Boolean p6spy = false;
    @Setter
    private Boolean seata = false;

    @Override
    public DataSource determineDataSource() {
        String dsKey = DynamicDataSourceContextHolder.peek();
        return getDataSource(dsKey);
    }

    private DataSource determinePrimaryDataSource() {
        log.debug("dynamic-datasource switch to the primary datasource");
        DataSource dataSource = dataSourceMap.get(primary);
        if (dataSource != null) {
            return dataSource;
        }
        GroupDataSource groupDataSource = groupDataSources.get(primary);
        if (groupDataSource != null) {
            return groupDataSource.determineDataSource();
        }
        throw new CannotFindDataSourceException("dynamic-datasource can not find primary datasource");
    }

省略...

    /**
     * 获取数据源
     *
     * @param ds 数据源名称
     * @return 数据源
     */
    public DataSource getDataSource(String ds) {
        if (StringUtils.isEmpty(ds)) {
            return determinePrimaryDataSource();
        } else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return groupDataSources.get(ds).determineDataSource();
        } else if (dataSourceMap.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return dataSourceMap.get(ds);
        }
        if (strict) {
            throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
        }
        return determinePrimaryDataSource();
    }

    /**
     * 添加数据源
     *
     * @param ds         数据源名称
     * @param dataSource 数据源
     */
    public synchronized void addDataSource(String ds, DataSource dataSource) {
        DataSource oldDataSource = dataSourceMap.put(ds, dataSource);
        // 新数据源添加到分组
        this.addGroupDataSource(ds, dataSource);
        // 关闭老的数据源
        if (oldDataSource != null) {
            try {
                closeDataSource(oldDataSource);
            } catch (Exception e) {
                log.error("dynamic-datasource - remove the database named [{}]  failed", ds, e);
            }
        }
        log.info("dynamic-datasource - add a datasource named [{}] success", ds);
    }

    /**
     * 新数据源添加到分组
     *
     * @param ds         新数据源的名字
     * @param dataSource 新数据源
     */
    private void addGroupDataSource(String ds, DataSource dataSource) {
        if (ds.contains(UNDERLINE)) {
            String group = ds.split(UNDERLINE)[0];
            GroupDataSource groupDataSource = groupDataSources.get(group);
            if (groupDataSource == null) {
                try {
                    groupDataSource = new GroupDataSource(group, strategy.getDeclaredConstructor().newInstance());
                    groupDataSources.put(group, groupDataSource);
                } catch (Exception e) {
                    throw new RuntimeException("dynamic-datasource - add the datasource named " + ds + " error", e);
                }
            }
            groupDataSource.addDatasource(ds, dataSource);
        }
    }

省略...

    @Override
    public void afterPropertiesSet() throws Exception {
        // 检查开启了配置但没有相关依赖
        checkEnv();
        // 添加并分组数据源
        Map<String, DataSource> dataSources = provider.loadDataSources();
        for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
            addDataSource(dsItem.getKey(), dsItem.getValue());
        }
        // 检测默认数据源是否设置
        if (groupDataSources.containsKey(primary)) {
            log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
        } else if (dataSourceMap.containsKey(primary)) {
            log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
        } else {
            log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
        }
    }
  
  省略...
}

DynamicRoutingDataSource类实现了InitializingBean接口,Bean初始化后调用afterPropertiesSet方法,把通过Provider配置生成的DataSource添加到Map中(DynamicDataSourceProvider.loadDataSources()),后续通过Map实现数据源的动态获取和切换。

4、数据源提供者:DynamicDataSourceProvider

Providert通过以下声明,内部同个多类型连接池的构造器,创建数据源:

    @Bean
    @ConditionalOnMissingBean
    public DynamicDataSourceProvider dynamicDataSourceProvider() {
        Map<String, DataSourceProperty> datasourceMap = properties.getDatasource();
        return new YmlDynamicDataSourceProvider(datasourceMap);
    }

dynamic-datasource支持多个连接池,如常用的Druid,HikariCp,BeeCp,Dbcp2等等,

 

5、抽象动态获取数据源:AbstractRoutingDataSource

DynamicRoutingDataSource类继承了AbstractRoutingDataSource(类似Spring实现),AbstractRoutingDataSource继承

springframework.jdbc.datasource.AbstractDataSource,分别重写getConnection和determineDataSource方法,实现根据DS Key动态获取DataSource,从而实现动态获取数据源的逻辑。

    public DataSource determineDataSource() {
        //从线程上下文获取ds key,并通过key获取DataSource
        String dsKey = DynamicDataSourceContextHolder.peek();
        return this.getDataSource(dsKey);
    }

6、@DS注解AOP实现

 

DS Key的获取和传递是通过AOP实现,具体核心逻辑如下,声明拦截器:

    @Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)
    @Bean
    public Advisor dynamicDatasourceAnnotationAdvisor(DsProcessor dsProcessor) {
        DynamicDataSourceAnnotationInterceptor interceptor = new DynamicDataSourceAnnotationInterceptor(properties.isAllowedPublicOnly(), dsProcessor);
        DynamicDataSourceAnnotationAdvisor advisor = new DynamicDataSourceAnnotationAdvisor(interceptor);
        advisor.setOrder(properties.getOrder());
        return advisor;
    }

核心切面逻辑在DynamicDataSourceAnnotationInterceptor#invoke

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
       //获取注解的ds key,并保持到线程上下文中
        String dsKey = determineDatasourceKey(invocation);
        DynamicDataSourceContextHolder.push(dsKey);
        try {
            return invocation.proceed();
        } finally {
            DynamicDataSourceContextHolder.poll();
        }
    }

拓展:

在 Spring AOP 中,有 3 个常用的概念,Advices 、 Pointcut 、 Advisor ,解释如下:

Advices :表示一个 method 执行前或执行后的动作。

Pointcut :表示根据 method 的名字或者正则表达式等方式去拦截一个 method 。

Advisor : Advice 和 Pointcut 组成的独立的单元,并且能够传给 proxy factory 对象。

7、负载均衡

从getDataSource方法可以看出,数据组优先于单数据源。组数据源的DataSource实现为GroupDataSource,组数据源通过负载均衡策略动态获取数据源。

默认负载均衡算法为:

/**
 * 多数据源选择算法clazz,默认负载均衡算法
 */
private Class<? extends DynamicDataSourceStrategy> strategy = LoadBalanceDynamicDataSourceStrategy.class;

DynamicRoutingDataSource#getDataSouce


    public DataSource getDataSource(String ds) {
        //优先从Group中获取,不存在则从单数据源Map获取
        if (StringUtils.isEmpty(ds)) {
            return determinePrimaryDataSource();
        } else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return groupDataSources.get(ds).determineDataSource();
        } else if (dataSourceMap.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return dataSourceMap.get(ds);
        }
        if (strict) {
            throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
        }
        return determinePrimaryDataSource();
    }

Spring-JDBC的DataSource实现案例

 

在 spring-jdbc 下,DataSource 最顶级的类是 AbstractDataSource(dynamic-datasouce-spring-boot-starter也是拓展这个抽象类),AbstractDataSource对 DataSource 的所有父接口方法都做了实现。但保留 getConnection() 方法由子类实现。

如果需要基于JDBC实现多数据源切换,一般可以通过拓展AbstractRoutingDataSource抽象类实现,AbstractRoutingDataSource是Spring提供的多数据源切换方案,相当于在项目运行时根据相应的Key值切换数据源DataSource,从而实现动态切换。我们看下AbstractRoutingDataSource的源码:

/**
 * Abstract {@link javax.sql.DataSource} implementation that routes {@link #getConnection()}
 * calls to one of various target DataSources based on a lookup key. The latter is usually
 * (but not necessarily) determined through some thread-bound transaction context.
 *
 * @author Juergen Hoeller
 * @since 2.0.1
 * @see #setTargetDataSources
 * @see #setDefaultTargetDataSource
 * @see #determineCurrentLookupKey()
 */
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean
 {

	@Nullable
	private Map<Object, Object> targetDataSources;

	@Nullable
	private Object defaultTargetDataSource;

	private boolean lenientFallback = true;

	private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();

	@Nullable
	private Map<Object, DataSource> resolvedDataSources;

	@Nullable
	private DataSource resolvedDefaultDataSource;

	省略...

	@Override
	public Connection getConnection() throws SQLException {
		return determineTargetDataSource().getConnection();
	}

	@Override
	public Connection getConnection(String username, String password) throws SQLException {
		return determineTargetDataSource().getConnection(username, password);
	}

	/**
	 * Retrieve the current target DataSource. Determines the
	 * {@link #determineCurrentLookupKey() current lookup key}, performs
	 * a lookup in the {@link #setTargetDataSources targetDataSources} map,
	 * falls back to the specified
	 * {@link #setDefaultTargetDataSource default target DataSource} if necessary.
	 * @see #determineCurrentLookupKey()
	 */
	protected DataSource determineTargetDataSource() {
		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
		Object lookupKey = determineCurrentLookupKey();
		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
			dataSource = this.resolvedDefaultDataSource;
		}
		if (dataSource == null) {
			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
		}
		return dataSource;
	}

	/**
	 * Determine the current lookup key. This will typically be
	 * implemented to check a thread-bound transaction context.
	 * <p>Allows for arbitrary keys. The returned key needs
	 * to match the stored lookup key type, as resolved by the
	 * {@link #resolveSpecifiedLookupKey} method.
	 */
	@Nullable
	protected abstract Object determineCurrentLookupKey();
	
	省略...

}

从源码可以看出,AbstractRoutingDataSource继承了InitializingBean并实现了afterPropertiesSet,初始化默认DataSource。同时继承了AbstractDataSource,实现了getConnection方法,getConnection调用了determineTargetDataSource方法,determineTargetDataSource方法的主要逻辑是根据determineCurrentLookupKey方法获取数据源的Key,并通过Key从数据源组Map中获取对应的数据源,其中determineCurrentLookupKey为抽象方法,所以如果要实现动态数据源切换,只需拓展AbstractRoutingDataSource并实现determineCurrentLookupKey方法即可。例如:

/**
 * 获得数据源
 */
public class MultipleDataSource extends AbstractRoutingDataSource{

    @Override
    protected Object determineCurrentLookupKey() {         
        return DynamicDataSourceHolder.getRouteKey();
    }
}

关于DynamicDataSourceHolder的实现和如何实现多数据源配置和初始化,可以参考文末的链接。

设计思想都是基于AOP通过ThreadLocal实现数据源Key的传递,在调用getConnection时根据key动态获取不同的DataSource,实现多数据源切换。

参考资料

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐