写在前面

事务一致性一定要你的数据库引擎支持,我用的数据库是mysql,常见两种搜索引擎,MyISAM和InnoDb,关于它们的区别,网上很多人罗列了,我这里最重要的就是InnoDb支持事务,MyISAM不支持事务。在mysql里面,可以单独为每张表指定不同的搜索引擎,在执行下面的事务操作时,一定要确保你操作的表是支持事务的!!!(写在前面是因为,我因为一开始没有注意,所以整了两套方案,最后发现都不行,才去排查这个问题,大家不要犯跟我一样的错误呜呜呜呜呜~)。

下面开始正文:

小小的提一下关于分布式事务

事务嘛,大家都知道,一个大的操作由很多个小小的操作组成,执行事务的时候,要么一起成功,要么一起失败。
单应用单库的情况下,实现事务一致性很简单,开启事务,成功提交,失败回滚。
出现分布式事务有两种情况:

  1. 当数据量上来的时候,我们会考虑分库分表,当一个操作修改了多个库的时候,而且要保证数据库一致性,就要用到分布式事务。
  2. 微服务下的业务,一个业务要调用两个微服务,分别处理不同的库,这里我们也会用到分布式事务。

单应用多库(也就是第一种情况),即程序里面有多个数据源,使用原生jdbc的话,我们是可以进行手动操控的,当我们在spring里面,就需要做一些另外的调整。
我这里主要就是解决这样的情况,两套处理方案,一个是整合dynamic-datasource-spring-boot-starter,还有一个是自己使用注解加切面。

解决方案一使用dynamic-datasource-spring-boot-starter

我的架构是springboot+mybatisplus+dynamic-datasource
pom关键包

<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.3.1</version>
</dependency>

yml 配置文件

server:
  port: 8012
spring:
  datasource:
    dynamic:
      primary: device   #指定默认数据库device
      datasource:
        device:
          url: xxxx
          username: xxxx
          password: xxxx
          driver-class-name: com.mysql.cj.jdbc.Driver
        emq:    #配置数据源 emq
          url: xxxx
          username: xxxx
          password: xxxx
          driver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #开启sql日志

文件夹框架
在这里插入图片描述
mybatisplus配置

@Configuration
//添加两个扫描路径
@MapperScan({"主库mapper扫描路径,从库mapper扫描路径"})
public class MybatisPlusConfig {
    @Bean
    @ConditionalOnClass(value = {PaginationInterceptor.class})
    public PaginationInterceptor paginationInterceptor() {
        PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
        return paginationInterceptor;
    }
}

测试代码

@Service
public class XiotDevDeviceServiceImpl extends ServiceImpl<XiotDevDeviceMapper, XiotDevDevice> implements XiotDevDeviceService {
    @Resource
    private MqttUserService mqttUserService;

    @Override
    //注意:多数据源实现事务一致性使用@DSTransactional注解而不是@Transactional注解,类上面也不要添加@Transactional,不然会导致数据源无法切换
    @DSTransactional
    //注意:@DSTransactional需要在方法上显示指定@DS不然也会报错找不到xxx
    @DS("device")
    public void test() {
        UpdateWrapper<XiotDevDevice> updateWrapper = new UpdateWrapper<>();
        updateWrapper.lambda().set(XiotDevDevice::getRemark, "62623")
                .eq(XiotDevDevice::getId, 2L);
        this.update(updateWrapper);
        MqttUser mqttUser = new MqttUser();
        mqttUser.setUsername("futy");
        mqttUser.setPassword("123456");
        mqttUser.setIsSuperuser(false);
        mqttUserService.insertUser(mqttUser);
        //throw new RuntimeException("故意抛出的异常");

    }
}
@Service
//切换数据源使用@DS("xxx") yml配置里面指定的名字,可以用在类上面也可以加在方法上面
@DS("emq")
public class MqttUserServiceImpl extends ServiceImpl<MqttUserMapper, MqttUser> implements MqttUserService {

    @Override
    public void insertUser(MqttUser mqttUser) {
        Assert.isTrue(CharSequenceUtil.isNotBlank(mqttUser.getUsername()), "mqtt用户名不能为空");
        Assert.isTrue(CharSequenceUtil.isNotBlank(mqttUser.getPassword()), "mqtt密钥不能为空");
        String salt = mqttUser.getSalt();
        mqttUser.setSalt(salt == null ? "" : salt);
        String secretPwd = SecureUtil.sha256(mqttUser.getPassword() + mqttUser.getSalt());
        mqttUser.setPassword(secretPwd);
        mqttUser.insert();
    }
}

代码正常运行打印日志
在这里插入图片描述

将异常取消注释打印日志
在这里插入图片描述
执行异常运行会发现数据库保持一致性

要点总结:

  1. dynamic-datasource-spring-boot-starter与mybatisplus版本一致(不做强制要求)
  2. dynamic-datasource-spring-boot-starter不支持开启了事务之后再切换数据源,比如A 服务使用a数据源,B服务使用b数据源,A的方法里面调用B的方法,如果A方法开启了事务,则B方法很可能无法切换数据源。
  3. 如果A需要事务支持,但是也要调用B的话,可以在B的方法上上面加注解 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW),使用REQUIRES_NEW的事务传播特性,强制开启新事务也可以切换数据源,但是有一个弊端就是,比如A调用B,然后A调用C,发生在B里面的异常A可以回滚,B也可以回滚,发生在B后面的异常,A可以回滚,B就无法回滚,就会造成事务不一致。
  4. 如果要求发生在B后面的异常也可以回滚,则需要再A的方法上面使用@DSTransactional注解(使用这个注解要显示调用@DS不然也会报错),不能与@Transactional一起使用,不然会违背第二条。然后B方法上面不用加任何注解。这个注解的事务不由spring进行管理,由多数据源内部自己进行维护,可以保证事务的统一提交和回滚。

解决方案二,使用aop切面和注解自己完成

这个工作量比较大, 原理就是使用aop动态代理进行方法的增强
我的架构springboot+druid+mybatis
关键jar包

<dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid-spring-boot-starter</artifactId>
   <version>1.2.8</version>
</dependency>
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.3.0</version>
</dependency>

依旧是两套数据文件
在这里插入图片描述
在这里插入图片描述

yml 两套连接

spring:
  main:
    allow-bean-definition-overriding: true #当遇到同样名字的时候,是否允许覆盖注册
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    device:
      url: xxxx
      username: xxxx
      password: xxxx
      driver-class-name: com.mysql.cj.jdbc.Driver
    emq:    #配置数据源 emq
      url: xxxx
      username: xxxx
      password: xxxx
      driver-class-name: com.mysql.cj.jdbc.Driver
 #由于多数据源的一些特殊性,里面的mybatis配置只配置主数据源
mybatis-plus:
  global-config:
    db-config:
      logic-delete-value: 1 # 逻辑已删除值(默认为 1)
      logic-not-delete-value: 0 # 逻辑未删除值(默认为 0)
  mapper-locations: classpath*:/mapper/*.xml		#主数据源的mapper文件地址

多数据源配置

public class DataSourceConfig {
    private static DataSource createDataSource(String username, String password, String url, String driverClassName) {
        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        dataSource.setUrl(url);
        dataSource.setDriverClassName(driverClassName);
        return dataSource;
    }

    @Configuration
    @MapperScan(basePackages = "主数据源mapper路径", sqlSessionTemplateRef = "sqlSessionTemplate")
    @Data
    @ConfigurationProperties(prefix = "spring.datasource.device")
    public static class Db1 {
        private String url;
        private String username;
        private String password;
        private String driverClassName;
        @Resource
        private MybatisPlusAutoConfiguration mybatisPlusAutoConfiguration;

        @Bean
        @Primary
        public DataSource deviceDatasource() {
            return createDataSource(username, password, url, driverClassName);
        }

        @Bean
        @Primary
        public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
        //主数据源可以直接使用mybatisplus的入口
            return mybatisPlusAutoConfiguration.sqlSessionFactory(dataSource);
        }

        @Bean
        @Primary
        public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        //主数据源可以直接使用mybatisplus的入口
            return mybatisPlusAutoConfiguration.sqlSessionTemplate(sqlSessionFactory);
        }

        @Bean
        @Primary
        public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) {
            return new DataSourceTransactionManager(dataSource);
        }
    }

    @Configuration
    @MapperScan(basePackages = "其他数据源mapper路径", sqlSessionTemplateRef = "emqSqlSessionTemplate")
    @ConfigurationProperties(prefix = "spring.datasource.emq")
    @Data
    public static class DataSource2Config {
        private static final ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
        @Resource
        private final ApplicationContext applicationContext;
        private String url;
        private String username;
        private String password;
        private String driverClassName;

        @Bean(name = "emqDatasource")
        public DataSource emqDatasource() {
            return createDataSource(username, password, url, driverClassName);
        }

        @Bean(name = "emqSqlSessionFactory")
        public SqlSessionFactory emqSqlSessionFactory(@Qualifier("emqDatasource") DataSource dataSource) throws Exception {
        //其他数据源的mybatis配置需要自己进行定义,不然数据源会使用混乱
            MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
            factory.setDataSource(dataSource);
            factory.setVfs(SpringBootVFS.class);
            //设置configuration
            MybatisConfiguration configuration = new MybatisConfiguration();
            configuration.setLogImpl(StdOutImpl.class);
            factory.setConfiguration(configuration);
            //设备 global-config
            GlobalConfig globalConfig = GlobalConfigUtils.defaults();
            this.getBeanThen(MetaObjectHandler.class, globalConfig::setMetaObjectHandler);
            this.getBeanThen(IKeyGenerator.class, i -> globalConfig.getDbConfig().setKeyGenerator(i));
            this.getBeanThen(ISqlInjector.class, globalConfig::setSqlInjector);
            this.getBeanThen(IdentifierGenerator.class, globalConfig::setIdentifierGenerator);
            factory.setGlobalConfig(globalConfig);
            //设置xml文件位置
            String[] mapperLocations = {"classpath*:/mapper/emq/*.xml"};
            factory.setMapperLocations(resolveMapperLocations(mapperLocations));
            return factory.getObject();
        }

        public org.springframework.core.io.Resource[] resolveMapperLocations(String[] mapperLocations) {
            return Stream.of(Optional.ofNullable(mapperLocations).orElse(new String[0]))
                    .flatMap(location -> Stream.of(getResources(location)))
                    .toArray(org.springframework.core.io.Resource[]::new);
        }

        private org.springframework.core.io.Resource[] getResources(String location) {
            try {
                return resourceResolver.getResources(location);
            } catch (IOException e) {
                return new org.springframework.core.io.Resource[0];
            }
        }


        private <T> void getBeanThen(Class<T> clazz, Consumer<T> consumer) {
            if (this.applicationContext.getBeanNamesForType(clazz, false, false).length > 0) {
                consumer.accept(this.applicationContext.getBean(clazz));
            }
        }

        @Bean(name = "emqTransactionManager")
        public DataSourceTransactionManager emqTransactionManager(@Qualifier("emqDatasource") DataSource dataSource) {
            return new DataSourceTransactionManager(dataSource);
        }

        @Bean(name = "emqSqlSessionTemplate")
        public SqlSessionTemplate emqSqlSessionTemplate(@Qualifier("emqSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
            return new SqlSessionTemplate(sqlSessionFactory);
        }
    }


}

至此已经可以进行多库的查询了,下面是实现多库的事务一致性(来自其他人的分享)
自定义注解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface DataSourceTransactional {
    /**
     * 事务管理器数组
     */
    String[] transactionManagers();
}

切面

@Component
@Aspect
public class DataSourceTransactionAspect {
    /**
     * 线程本地变量:为什么使用栈?※为了达到后进先出的效果※
     */
    private static final ThreadLocal<Stack<Pair>> THREAD_LOCAL = new ThreadLocal<>();
    /**
     * 用于获取事务管理器
     */
    @Autowired
    private ApplicationContext applicationContext;
    /**
     * 事务声明
     */
    private DefaultTransactionDefinition def = new DefaultTransactionDefinition();

    {
        // 非只读模式
        def.setReadOnly(false);
        // 事务隔离级别:采用数据库的
        def.setIsolationLevel(TransactionDefinition.ISOLATION_DEFAULT);
        // 事务传播行为
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
    }

    /**
     * 切面
     */
    @Pointcut("@annotation(com.yeexun.xiot.device.srv.config.multi_datasource.DataSourceTransactional)")
    public void pointcut() {
    }

    /**
     * 声明事务
     *
     * @param transactional 注解
     */
    @Before("pointcut() && @annotation(transactional)")
    public void before(DataSourceTransactional transactional) {
        // 根据设置的事务名称按顺序声明,并放到ThreadLocal里
        String[] transactionManagerNames = transactional.transactionManagers();
        Stack<Pair> pairStack = new Stack<>();
        for (String transactionManagerName : transactionManagerNames) {
            DataSourceTransactionManager transactionManager = applicationContext.getBean(transactionManagerName, DataSourceTransactionManager.class);
            TransactionStatus transactionStatus = transactionManager.getTransaction(def);
            pairStack.push(new Pair(transactionManager, transactionStatus));
        }
        THREAD_LOCAL.set(pairStack);
    }

    /**
     * 提交事务
     */
    @AfterReturning("pointcut()")
    public void afterReturning() {
        // ※栈顶弹出(后进先出)
        Stack<Pair> pairStack = THREAD_LOCAL.get();
        while (!pairStack.empty()) {
            Pair pair = pairStack.pop();
            pair.dataSourceTransactionManager.commit(pair.transactionStatus);
        }
        THREAD_LOCAL.remove();
    }

    /**
     * 回滚事务
     */
    @AfterThrowing(value = "pointcut()")
    public void afterThrowing() {
        // ※栈顶弹出(后进先出)
        Stack<Pair> pairStack = THREAD_LOCAL.get();
        while (!pairStack.empty()) {
            Pair pair = pairStack.pop();
            pair.dataSourceTransactionManager.rollback(pair.transactionStatus);
        }
        THREAD_LOCAL.remove();
    }

    static class Pair {
        DataSourceTransactionManager dataSourceTransactionManager;
        TransactionStatus transactionStatus;

        public Pair(DataSourceTransactionManager dataSourceTransactionManager, TransactionStatus transactionStatus) {
            this.dataSourceTransactionManager = dataSourceTransactionManager;
            this.transactionStatus = transactionStatus;
        }
    }
}

代码使用
在这里插入图片描述

要点总结

  1. 准备两套数据文件
  2. yml定义两套连接url,
  3. 将两套数据源都配置到spring容器中(里面细节较多,主要是多个sqlSessionFactory和sqlSessionTemplate的定义容易冲突,上面的代码我试过了,可以正常运行。弊端是从库的mybatis使用一些自定义的功能时,这里需要做相应的代码改动,如果忘记,则可能新加爱的mybatis配置不生效)
  4. 自定义注解和切面(逻辑不难,使用队列先进后出)
  5. 使用时,添加注解到要使用的方法里面
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐