公司规划了一个AIOT平台项目,希望将来可以提供SAAS服务,可设备接入、算法训练及算法接口发布。写此博文时,本人已经实现了IOT部分,多租户不同设备接入以及设备数据转发。本文着重介绍项目中多租户实现。主流的租户实现一般有两种方式,一种是所有表字段冗余租户ID字段,在增删改查时拦截sql语句进行拼接租户ID字段,达到不同租户展示不同数据的目录;另一种方式每个租户对应一个业务逻辑库,增删改查映射到租户对应的业务库,达到多租户效果。本人采用了第二种实现方式,不多说,直接上代码。

1、实现涉及技术及框架

springboot+nacos+redis+shardingjdbc(5.0.0-beta)+mysql

2、实现原理

a)项目实施:初始化iot库,为租户管理库,只存储租户相关信息,sysadmin可登录创建租户

b)项目启动:获取租户信息、动态创建已有租户数据源、存储有效数据源别名到redis,发布数源到nacos、创建数据源nacos监听器(从nacos监听数据源变化,更新最新数据源并别名写到redis,部署多个实例保持数据源一致)

b)新增租户:创建租户库并执行初始化脚本、插入租户库租户信息、更新shardingjdbc数据源、写入有效数据源别名到redis、发布数据源到nacos,其他实例监听nacos创建新的数据源、写入iot库租户信息

c)删除租户:删除iot库租户信息、删除redis缓存数据源别名、删除本地数据源、发布数据源到nacos,其他实例监听nacos删除数据源、删除租户库

d)租户登录增删改查:通过sharding-jdbc分库中间件,切面service,在执行sql时获取登录用户的租户ID,通过自定义的分库策略,切换到对应租户业务库执行sql

3、代码实现

1)pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>net.wfl</groupId>
        <artifactId>iot-user</artifactId>
        <version>1.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <artifactId>iot-user-biz</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>net.wfl</groupId>
            <artifactId>iot-user-api</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>net.wfl</groupId>
            <artifactId>iot-user-auth</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.jeecgframework</groupId>
            <artifactId>autopoi</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.jeecgframework</groupId>
            <artifactId>autopoi-web</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.jeecgframework.boot</groupId>
            <artifactId>codegenerate</artifactId>
            <version>1.3.2</version>
        </dependency>
        <!-- druid 数据源,一定不能用druid-spring-boot-starter -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.8</version>
        </dependency>
        <!--一定不能引入dynamic-datasource-spring-boot-starter包-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>3.4.1</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>
        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.21</version>
        </dependency>
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.0.0-beta</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.9.6</version>
        </dependency>
        <dependency>
            <groupId>net.wfl.framework.boot</groupId>
            <artifactId>base-tools</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.hibernate.validator</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>6.1.6.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
        <dependency>
            <groupId>org.jeecgframework</groupId>
            <artifactId>jeewx-api</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>com.jcraft</groupId>
            <artifactId>jsch</artifactId>
            <version>0.1.54</version>
        </dependency>
        <dependency>
            <groupId>com.github.oshi</groupId>
            <artifactId>oshi-core</artifactId>
            <version>3.12.2</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna-platform</artifactId>
            <version>5.2.0</version>
        </dependency>
    </dependencies>
    <profiles>
         <profile>
            <id>dev</id>
            <activation>
                <!--默认激活配置-->
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <!--当前环境-->
                <profile.name>dev</profile.name>
                <!--配置文件前缀-->
                <prefix.name></prefix.name>
                <!--连接nacos用户名-->
                <config.nacos.username>nacos</config.nacos.username>
                <!--连接nacos密码-->
                <config.nacos.passwd>nacos</config.nacos.passwd>
                <!--Nacos配置中心地址-->
                <config.server-addr>127.0.0.1:8868</config.server-addr>
                <!--Nacos配置中心命名空间,用于支持多环境.这里必须使用ID,不能使用名称,默认为空-->
                <config.namespace>iot</config.namespace>
                <!--Nacos配置分组名称-->
                <config.group>DEFAULT_GROUP</config.group>
                <!--Nacos服务发现地址-->
                <discovery.server-addr>127.0.0.1:8868</discovery.server-addr>
            </properties>
        </profile>
    </profiles>
    <build>
        <finalName>iot-user-biz</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2)sharding.yml的分库配置

spring:
  shardingsphere:
    enabled: true
    datasource:
    #指定数据源 名称可以自定义,注意:名称要跟后面的配置一致
      names: ds0
      #ds0为租户管理库,租户库动态生成
      ds0:
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-nam: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/iot?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
        username: root
        password: wfl_db
    rules:
      sharding:
        #绑定表规则列表,多个以逗号分割
        binding-tables[0]: biz_device,sys_area
        
        tables:
          tenant_user:
            actual-data-nodes: ds$->{0..999}.sys_tenant_user
            database-strategy:
              hint:
                sharding-algorithm-name: database-defaults
          device:
            actual-data-nodes: ds$->{0..999}.biz_device

        default-database-strategy:
          hint:
            sharding-algorithm-name: database-tenant
        default-table-strategy:
          none:

        #分库或者分表算法配置
        sharding-algorithms:
          database-tenant:
            type: CLASS_BASED
            props:
              strategy: HINT
              algorithmClassName: net.wfl.user.biz.modules.tenant.sharding.DatabaseShardingAlgorithm
          database-defaults:
            type: CLASS_BASED
            props:
              strategy: HINT
              algorithmClassName: net.wfl.user.biz.modules.tenant.sharding.DefaultShardingAlgorithm
    props:
      #是否输出sql
      sql-show: true

说明:涉及隐私,只截取两个表显示,使用自定义分库策略

3)bootstrap.yml

spring:
  profiles:
    active: @profile.name@
  application:
    name: iot-user
  cloud:
    nacos:
      discovery:
        server-addr: @config.server-addr@
        namespace: @config.namespace@
        group: @config.group@
        watch:
          enabled: false
      config:
        server-addr: @config.server-addr@
        namespace: @config.namespace@
        group: @config.group@
        file-extension: yaml
        prefix: ${spring.application.name}
        extension-configs:
          #分库分表实现
          - dataId: sharding.yaml
            refresh: true

4)NacosConfig.java

package net.wfl.user.biz.config;

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.Properties;

/**
 * nacos配置
 *
 * @author wangfenglei
 */
@Slf4j
@Configuration
public class NacosConfig {
    @Value("${spring.cloud.nacos.config.server-addr}")
    private String serverAddr;
    @Value("${spring.cloud.nacos.config.namespace}")
    private String namespace;

    @Bean
    @Primary
    public ConfigService configService() {
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        properties.put("namespace", namespace);
        try {
            return NacosFactory.createConfigService(properties);
        } catch (NacosException e) {
            log.error(e.toString(), e);
        }

        return null;
    }
}

说明:创建ConfigService Bean,在项目中可以直接引用使用,进行nacos相关操作

 5)DataSourceInitConfig.java

package net.wfl.user.biz.config.init;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import lombok.extern.slf4j.Slf4j;
import net.wfl.framework.boot.tools.redis.RedisToolsUtil;
import net.wfl.user.biz.modules.system.util.AiotConstants;
import net.wfl.user.biz.modules.tenant.entity.SysTenantUser;
import net.wfl.user.biz.modules.tenant.service.ISysTenantUserService;
import net.wfl.user.biz.modules.util.AiotCommonUtil;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
 * 项目启动后数据源根据租户动态加载
 *
 * @author wangfenglei
 */
@Slf4j
@Component
public class DataSourceInitConfig implements ApplicationRunner {
    @Resource
    private ISysTenantUserService sysTenantUserService;
    @Resource
    private ShardingSphereDataSource dataSource;
    @Resource
    private RedisToolsUtil redisToolsUtil;
    @Resource
    private ConfigService configService;
    @Value("${spring.cloud.nacos.config.group}")
    private String groupId;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        Map<String, DataSource> dataSourceMap = dataSource.getDataSourceMap();
        List<SysTenantUser> tenantUserList = sysTenantUserService.list(new LambdaQueryWrapper<SysTenantUser>().gt(SysTenantUser::getTenantId, 0));
        List<Map<String, String>> datasourceNameList = new ArrayList<>();
        Map<String, String> datasourceNameInitMap = new HashMap<>(16);
        datasourceNameInitMap.put("name", AiotConstants.DATASOURCE_ALL);
        //超级管理员用到,默认租户为0
        datasourceNameInitMap.put("tenant", "0");
        datasourceNameList.add(datasourceNameInitMap);

        StringBuffer sb = new StringBuffer(AiotConstants.DATASOURCE_ALL);

        if (CollectionUtils.isNotEmpty(tenantUserList)) {
            for (SysTenantUser data : tenantUserList) {
                Map<String, String> datasourceNameMap = new HashMap<>(16);
                String ds = AiotConstants.DATASOURCE_PREFIX + data.getTenantId();
                datasourceNameMap.put("name", ds);
                datasourceNameMap.put("tenant", data.getTenantId().toString());

                datasourceNameList.add(datasourceNameMap);

                if (null == dataSourceMap.get(ds)) {
                    DruidDataSource dataSource = (DruidDataSource) dataSourceMap.get(AiotConstants.DATASOURCE_ALL);
                    DruidDataSource druidDataSource = new DruidDataSource();
                    druidDataSource.setName(ds);
                    druidDataSource.setUsername(dataSource.getUsername());
                    druidDataSource.setPassword(dataSource.getPassword());
                    druidDataSource.setUrl(AiotCommonUtil.getTenantDatabaseUrl(dataSource.getUrl(), data.getTenantId().toString()));
                    druidDataSource.setDriverClassName(dataSource.getDriverClassName());
                    dataSourceMap.put(ds, druidDataSource);
                    sb.append(AiotConstants.COMMA_SEPARATOR).append(ds);
                }
            }
        }


        //数据源名称存储到redis缓存
        redisToolsUtil.set(AiotConstants.DATASOURCE_REDIS_KEY, sb.toString());
        //发布内容
        configService.publishConfig(AiotConstants.DATA_SOURCE_DATA_ID, groupId, JSON.toJSONString(datasourceNameList));
        //监听数据源变化,后续新增租户时会添加新的数据源
        listenerNacosDatasource();
    }

    /**
     * 监听数据源变化
     *
     * @throws Exception 异常
     */
    private void listenerNacosDatasource() throws Exception {
        configService.addListener(AiotConstants.DATA_SOURCE_DATA_ID, groupId, new Listener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                //最新有效数据源
                List<Map> datasourceList = JSON.parseArray(configInfo).toJavaList(Map.class);
                Map<String, DataSource> dataSourceMap = dataSource.getDataSourceMap();
                List<String> dataSourceNameList = dataSourceMap.keySet().stream().collect(Collectors.toList());

                datasourceList.forEach(data -> {
                    String ds = data.get("name").toString();

                    //如果新增了数据源,则需要添加
                    if (null == dataSourceMap.get(ds)) {
                        DruidDataSource dataSource = (DruidDataSource) dataSourceMap.get(AiotConstants.DATASOURCE_ALL);
                        DruidDataSource druidDataSource = new DruidDataSource();
                        druidDataSource.setUsername(dataSource.getUsername());
                        druidDataSource.setPassword(dataSource.getPassword());
                        druidDataSource.setUrl(AiotCommonUtil.getTenantDatabaseUrl(dataSource.getUrl(), data.get("tenant").toString()));
                        druidDataSource.setDriverClassName(dataSource.getDriverClassName());
                        dataSourceMap.put(ds, druidDataSource);
                    }

                    dataSourceNameList.remove(ds);
                });

                //删除多余数据源
                if (!dataSourceNameList.isEmpty()) {
                    dataSourceNameList.forEach(data -> {
                        dataSourceMap.remove(data);
                    });
                }
            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        });
    }
}

说明:项目启动,动态创建租户数据源,并监听nacos数据源变化,以便数据源发生变化实时更新。因为项目部署是多实例,通过nacos达到同步数据源的目的

6)TeantShardingAspect.java

package net.wfl.user.biz.aspect;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import lombok.extern.slf4j.Slf4j;
import net.wfl.framework.boot.model.vo.LoginUser;
import net.wfl.user.biz.modules.system.util.AiotConstants;
import net.wfl.user.biz.modules.thingsboard.data.StringUtils;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shiro.SecurityUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;

import java.util.Arrays;

/**
 * 租户切面,根据登录用户的租户ID,自动切换从数据源
 *
 * @author wangfenglei
 */
@Aspect
@Component
@Slf4j
public class TenantShardingAspect {
    /**
     * 数据源前缀
     */
    private static final String DATASOURCE_PREFIX = "ds";

    /**
     * 定义切点Pointcut
     */
    @Pointcut("execution(public * net..*.service..*.*(..)) || execution(public * com..*.service..*.*(..))")
    public void pointCut() {
    }

    /**
     * 切面执行方法
     *
     * @param point 切面
     * @return 执行结果
     * @throws Throwable 异常
     */
    @Around("pointCut()")
    public Object doAround(ProceedingJoinPoint point) throws Throwable {
        //如果已存在分库属性值说明service嵌套调用,不做切面,使用父接口的分库

        if (CollectionUtils.isNotEmpty(HintManager.getDatabaseShardingValues())) {
            log.info("=====sharding method:{}, database:{}", point.getTarget().getClass().getName() + ":" + point.getSignature().getName(), "use parent method database " + HintManager.getDatabaseShardingValues().toString());
            return point.proceed();
        }

        LoginUser loginUser = null;
        try {
            loginUser = (LoginUser) SecurityUtils.getSubject().getPrincipal();
        } catch (Exception e) {
            log.warn(e.getMessage());
        }

        HintManager hintManager = null;
        try {
            hintManager = HintManager.getInstance();
            //如果存在租户ID,则从租户逻辑库查询数据,否则查询所有库数据
            if (null != loginUser && !StringUtils.isEmpty(loginUser.getRelTenantIds())) {
                log.info("=====sharding method:{}, database:{}", point.getTarget().getClass().getName() + ":" + point.getSignature().getName(), DATASOURCE_PREFIX + loginUser.getRelTenantIds());
                hintManager.setDatabaseShardingValue(DATASOURCE_PREFIX + loginUser.getRelTenantIds());
            } else {
                hintManager.setDatabaseShardingValue(AiotConstants.DATASOURCE_ALL);
                log.info("=====sharding method:{}, database:{}", point.getTarget().getClass().getName() + ":" + point.getSignature().getName(), "++++ALL");
            }

            Object result = point.proceed();
            return result;
        } catch (Throwable e) {
            log.error("TenantShardingAspect failed!", e);
            throw e;
        } finally {
            if (null != hintManager) {
                hintManager.close();
            }
        }
    }
}

说明:拦截所有的service方法,从登录用户获取所属租户ID,利用shardingjdbc中间件切换到对应的租户业务库;如果service嵌套则使用父service库;如果没有登录用户信息则查询所有数据库

 7)DatabaseShardingAlgorithm.java

package net.wfl.user.biz.modules.tenant.sharding;

import net.wfl.framework.boot.tools.redis.RedisToolsUtil;
import net.wfl.framework.boot.util.util.SpringContextUtils;
import net.wfl.user.biz.modules.system.util.AiotConstants;
import org.apache.shardingsphere.sharding.api.sharding.hint.HintShardingAlgorithm;
import org.apache.shardingsphere.sharding.api.sharding.hint.HintShardingValue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

/**
 * 数据库强制理由分库算法
 *
 * @author wangfenglei
 */
public class DatabaseShardingAlgorithm implements HintShardingAlgorithm<String> {
    private RedisToolsUtil redisToolsUtil;

    @Override
    public Collection<String> doSharding(Collection<String> collection, HintShardingValue<String> hintShardingValue) {
        if (null == redisToolsUtil) {
            redisToolsUtil = SpringContextUtils.getBean("redisToolsUtil", RedisToolsUtil.class);
        }

        Collection<String> result = new ArrayList<>();

        for (String value : hintShardingValue.getValues()) {
            //如果分库值为ds0,默认查询所有有效数据库
            if (value.equals(AiotConstants.DATASOURCE_ALL)) {
                Object database = redisToolsUtil.get(AiotConstants.DATASOURCE_REDIS_KEY);

                //如果没有任何租户,则从默认库查询
                if (null == database || "".equals(database)) {
                    return Arrays.asList(AiotConstants.DATASOURCE_ALL);
                }

                //返回所有有效数据源
                return Arrays.asList(database.toString().split(AiotConstants.COMMA_SEPARATOR));
            }

            result.add(value);
        }

        return result;
    }

    @Override
    public void init() {

    }

    @Override
    public String getType() {
        return null;
    }
}

说明: 根据配置的表分库策略,获取hintManager.setDatabaseShardingValue的值,执行自定义分库策略

8)DefaultShardingAlgorithm.java

package net.wfl.user.biz.modules.tenant.sharding;

import net.wfl.user.biz.modules.system.util.AiotConstants;
import org.apache.shardingsphere.sharding.api.sharding.hint.HintShardingAlgorithm;
import org.apache.shardingsphere.sharding.api.sharding.hint.HintShardingValue;

import java.util.Arrays;
import java.util.Collection;

/**
 * 数据库强制理由分库算法
 *
 * @author wangfenglei
 */
public class DefaultShardingAlgorithm implements HintShardingAlgorithm<String> {

    @Override
    public Collection<String> doSharding(Collection<String> collection, HintShardingValue<String> hintShardingValue) {
        return Arrays.asList(AiotConstants.DATASOURCE_ALL);
    }

    @Override
    public void init() {

    }

    @Override
    public String getType() {
        return null;
    }
}

说明: AiotConstants.DATASOURCE_AL="ds0"

9) TenantUserServiceImpl.java

package net.wfl.user.biz.modules.tenant.service.impl;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.ConfigType;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import net.wfl.framework.boot.model.vo.LoginUser;
import net.wfl.framework.boot.tools.redis.RedisToolsUtil;
import net.wfl.framework.boot.util.util.DateUtils;
import net.wfl.framework.boot.util.util.OConvertUtils;
import net.wfl.framework.boot.util.util.PasswordUtil;
import net.wfl.user.biz.modules.system.util.AiotConstants;
import net.wfl.user.biz.modules.tenant.entity.SysTenantUser;
import net.wfl.user.biz.modules.tenant.mapper.SysTenantUserMapper;
import net.wfl.user.biz.modules.tenant.service.ISysTenantUserService;
import net.wfl.user.biz.modules.thingsboard.util.ThingsboardClientUtil;
import net.wfl.user.biz.modules.util.AiotCommonUtil;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shiro.SecurityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.stereotype.Service;

import javax.sql.DataSource;
import java.sql.PreparedStatement;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @Description: 租户用户管理员
 * @Author: wangfenglei
 */
@Service
public class TenantUserServiceImpl extends ServiceImpl<SysTenantUserMapper, SysTenantUser> implements ISysTenantUserService {
    @Autowired
    private ShardingSphereDataSource dataSource;
    @Autowired
    private RedisToolsUtil redisToolsUtil;
    @Value("classpath:sql/database.sql")
    private Resource dataBaseScript;
    @Autowired
    private ConfigService configService;
    @Value("${spring.cloud.nacos.config.group}")
    private String groupId;

    /**
     * 保存租户用户
     *
     * @param sysTenantUser 租户
     * @throws Exception 异常
     */
    @Override
    public void saveTenantUser(SysTenantUser sysTenantUser) throws Exception {
        LoginUser loginUser = (LoginUser) SecurityUtils.getSubject().getPrincipal();
        if (null == loginUser) {
            throw new Exception("User not login");
        }

        int tenantId = getMaxTenantId() + 1;
        sysTenantUser.setTenantId(tenantId);
        //添加新数据源
        addNewDatasource(sysTenantUser, tenantId);
        this.save(sysTenantUser);
    }
    
    /**
     * 修改租户
     *
     * @param sysTenantUser 租户信息
     * @throws Exception 异常
     */
    @Override
    public void editTenantUser(SysTenantUser sysTenantUser) throws Exception {
        SysTenantUser tenantUser = this.getById(sysTenantUser.getId());

        tenantUser.setTenantName(sysTenantUser.getTenantName());
        tenantUser.setPhone(sysTenantUser.getPhone());
        tenantUser.setDescription(sysTenantUser.getDescription());
        tenantUser.setStatus(sysTenantUser.getStatus());

        this.updateById(tenantUser);

        String sql = "update sys_user set status=" + sysTenantUser.getStatus() + " where username='" + sysTenantUser.getUsername() + "'";
        //更新结果
        PreparedStatement preparedStatement = dataSource.getDataSourceMap().get(AiotConstants.DATASOURCE_PREFIX + tenantUser.getTenantId()).getConnection().prepareStatement(sql);
        preparedStatement.executeUpdate();
    }

    /**
     * 删除租户
     *
     * @param id 租户ID
     * @throws Exception 异常
     */
    @Override
    public void deleteTenantUser(String id) throws Exception {
        SysTenantUser sysTenantUser = this.getById(id);
        //先删除用户,即使删除失败,该用户在系统中已经被删除
        this.removeById(id);

        String datasourceName = redisToolsUtil.get(AiotConstants.DATASOURCE_REDIS_KEY).toString();
        //直接arrays.asList无法add和remove元素,需要new一下
        List<String> datasourceNameList = Lists.newArrayList(Arrays.asList(datasourceName.split(AiotConstants.COMMA_SEPARATOR)));
        datasourceNameList.remove(AiotConstants.DATASOURCE_PREFIX + sysTenantUser.getTenantId());
        String newDatasourceName = datasourceNameList.stream().collect(Collectors.joining(AiotConstants.COMMA_SEPARATOR));
        //从redis缓存删除数据源名称
        redisToolsUtil.set(AiotConstants.DATASOURCE_REDIS_KEY, newDatasourceName);
        //删除本地数据源
        dataSource.getDataSourceMap().remove(AiotConstants.DATASOURCE_PREFIX + sysTenantUser.getTenantId());
        List<Map<String, String>> datasourceNacosList = new ArrayList<>();

        datasourceNameList.forEach(data -> {
            Map<String, String> datasourceNameMap = new HashMap<>(16);
            datasourceNameMap.put("name", data);
            //超级管理员用到,默认租户为0
            datasourceNameMap.put("tenant", data.substring(2, data.length()));
            datasourceNacosList.add(datasourceNameMap);
        });

        //发布内容到nacos,更新为删除后的最新数据源
        configService.publishConfig(AiotConstants.DATA_SOURCE_DATA_ID, groupId, JSON.toJSONString(datasourceNacosList));

        String sql = "DROP DATABASE IF EXISTS iot_" + sysTenantUser.getTenantId();
        //删除数据库
        PreparedStatement preparedStatement = dataSource.getDataSourceMap().get(AiotConstants.DATASOURCE_ALL).getConnection().prepareStatement(sql);
        preparedStatement.executeUpdate();
    }

    /**
     * 更新密码
     *
     * @param id          租户ID
     * @param newPassword 新密码
     * @throws Exception 异常
     */
    @Override
    public void updatePassword(String id, String newPassword) throws Exception {
        SysTenantUser sysTenantUser = this.getById(id);
        sysTenantUser.setPassword(newPassword);
        this.updateById(sysTenantUser);

        // 密码加密加盐
        String salt = OConvertUtils.randomGen(8);
        String passwordEncode = PasswordUtil.encrypt(sysTenantUser.getUsername(), newPassword, salt);
        DataSource druidDataSource = dataSource.getDataSourceMap().get(AiotConstants.DATASOURCE_PREFIX + sysTenantUser.getTenantId());
        String updatePasswordSql = "update sys_user set password='" + passwordEncode + "', salt='" + salt + "' where username='" + sysTenantUser.getUsername() + "'";
        //更新用户密码
        PreparedStatement preparedStatement = druidDataSource.getConnection().prepareStatement(updatePasswordSql);
        preparedStatement.executeUpdate();
    }

    /**
     * 获取最大租户ID
     *
     * @return 租户ID
     */
    @Override
    public Integer getMaxTenantId() {
        return this.baseMapper.getMaxTenantId();
    }

    /**
     * 添加新的数据源
     *
     * @param sysTenantUser 租户信息
     * @param tenantId      租户ID
     * @throws Exception 异常
     */
    private void addNewDatasource(SysTenantUser sysTenantUser, int tenantId) throws Exception {
        String newDatasourceName = AiotConstants.DATASOURCE_PREFIX + tenantId;
        String sql = "CREATE DATABASE IF NOT EXISTS iot_" + tenantId + " DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci";
        //创建数据库
        PreparedStatement preparedStatement = dataSource.getDataSourceMap().get(AiotConstants.DATASOURCE_ALL).getConnection().prepareStatement(sql);
        preparedStatement.executeUpdate();

        //添加数据源到sharding
        Map<String, DataSource> dataSourceMap = dataSource.getDataSourceMap();
        DruidDataSource dataSource = (DruidDataSource) dataSourceMap.get(AiotConstants.DATASOURCE_ALL);
        DruidDataSource druidDataSource = new DruidDataSource();
        druidDataSource.setUsername(dataSource.getUsername());
        druidDataSource.setPassword(dataSource.getPassword());
        druidDataSource.setUrl(AiotCommonUtil.getTenantDatabaseUrl(dataSource.getUrl(), String.valueOf(tenantId)));
        druidDataSource.setDriverClassName(dataSource.getDriverClassName());
        dataSourceMap.put(newDatasourceName, druidDataSource);

        //执行数据库脚本
        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(druidDataSource);
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScripts(dataBaseScript);
        initializer.setDatabasePopulator(populator);
        initializer.afterPropertiesSet();

        // 密码加密加盐
        String salt = OConvertUtils.randomGen(8);
        String passwordEncode = PasswordUtil.encrypt(sysTenantUser.getUsername(), AiotConstants.DEFAULT_TENANT_PASSWORD, salt);
        String userSql = "INSERT INTO `sys_user`(`id`, `username`, `realname`, `password`, `salt`, `avatar`, `birthday`, `sex`, `email`, `phone`, `org_code`, `status`, `del_flag`, `third_id`, `third_type`, `activiti_sync`, `work_no`, `post`, `telephone`, `create_by`, `create_time`, `update_by`, `update_time`, `user_identity`, `area_ids`, `dept_id`, `is_admin`, `rel_tenant_ids`, `client_id`) " +
                "VALUES ('" + IdWorker.getIdStr() + "', '" + sysTenantUser.getUsername() + "', '" + sysTenantUser.getUsername() + "', '" + passwordEncode + "', '" + salt + "', NULL, NULL, NULL, NULL, NULL, NULL, " + sysTenantUser.getStatus() + ", 0, NULL, NULL, 1, NULL, NULL, NULL,'" + sysTenantUser.getPhone() + "', '" + DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get()) + "', NULL, NULL, 1, '', NULL, 1, " + String.valueOf(tenantId) + ", NULL)";
        //插入租户管理员
        preparedStatement = druidDataSource.getConnection().prepareStatement(userSql);
        preparedStatement.executeUpdate();

        //设置数据源名称到redis内存
        String datasourceName = redisToolsUtil.get(AiotConstants.DATASOURCE_REDIS_KEY).toString();

        if (!datasourceName.contains(newDatasourceName)) {
            datasourceName = datasourceName + AiotConstants.COMMA_SEPARATOR + AiotConstants.DATASOURCE_PREFIX + tenantId;
            redisToolsUtil.set(AiotConstants.DATASOURCE_REDIS_KEY, datasourceName);
        }

        //发布到nacos
        configService.publishConfig(AiotConstants.DATA_SOURCE_DATA_ID, groupId, getDateSourceName(), ConfigType.JSON.getType());
    }

    /**
     * 获取设备数据源名称
     *
     * @return 数据名称
     */
    public String getDateSourceName() {
        Map<String, DataSource> dataSourceMap = dataSource.getDataSourceMap();
        List<Map<String, String>> dataSourceNameList = new ArrayList<>();

        dataSourceMap.keySet().forEach(data -> {
            Map<String, String> map = new HashMap<String, String>() {{
                put("name", data);
                put("tenant", data.substring(2, data.length()));
            }};

            dataSourceNameList.add(map);

        });

        return JSON.toJSONString(dataSourceNameList);
    }
}

说明:

1、此类是主要的租户业务逻辑实现类,实现了租户的增删改查,对应业务逻辑库的创建及数据源的创建

2、初始化租户库脚本存放路径

4、优化

 以上算是实现了多租户的功能,但是需要在yml文件中提前配置table的真实节点,如下。存在的问题是最多可支持999个租户,虽然可以再次改大配置支持更多租户。还有一个缺点就是项目启动时会在DataSource中缓存999个真实节点数据,但实际没有这么多节点。对于有代码洁癖或者追求完美的人来说,应该想实现的更完美点,下面将介绍动态更改表实际节点实现。

 device:
            actual-data-nodes: ds$->{0..999}.biz_device

代码实现如下:

 /**
     * 设置表真实节点和规则
     *
     * @param dataSource 数据源
     */
    @Override
    public void reSetTableActualConfig(ShardingSphereDataSource dataSource) {
        //如果少于两个数据源,说明没有租户,不做任何操作
        if (dataSource.getDataSourceMap().size() < 2) {
            return;
        }
        MetaDataContexts contexts = dataSource.getMetaDataContexts();
        List<RuleConfiguration> list = (List<RuleConfiguration>) contexts.getMetaDataMap().get("logic_db").getRuleMetaData().getConfigurations();
        String actualDatabaseIndex = dataSource.getDataSourceMap().keySet().stream().map(p -> p.substring(2)).collect(Collectors.joining(","));
        //获取sharding分库分表规则配置
        AlgorithmProvidedShardingRuleConfiguration configuration = (AlgorithmProvidedShardingRuleConfiguration) list.get(0);
        //获取表真实节点配置
        Collection<ShardingTableRuleConfiguration> tableList = configuration.getTables();
        LinkedList<ShardingTableRuleConfiguration> newTableList = new LinkedList<>();

        tableList.forEach(data -> {
            String actualTableNodes = getTableActualDataNodes(data.getActualDataNodes(), actualDatabaseIndex);
            //重写生成表规则配置
            ShardingTableRuleConfiguration tableRuleConfiguration = new ShardingTableRuleConfiguration(data.getLogicTable(), actualTableNodes);
            tableRuleConfiguration.setDatabaseShardingStrategy(data.getDatabaseShardingStrategy());
            tableRuleConfiguration.setTableShardingStrategy(data.getTableShardingStrategy());
            tableRuleConfiguration.setKeyGenerateStrategy(data.getKeyGenerateStrategy());

            newTableList.add(tableRuleConfiguration);
        });


        tableList.clear();
        //更新表真实节点配置
        tableList.addAll(newTableList);

        DatabaseType databaseType = new org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType();
        //重写生成表规则
        ShardingRule shardingRule = new ShardingRule(configuration, databaseType, dataSource.getDataSourceMap());
        //获取已有的表规则
        Collection<ShardingSphereRule> shardingRuleList = contexts.getMetaDataMap().get("logic_db").getRuleMetaData().getRules();
        shardingRuleList.clear();
        //刷新为真实的表规则
        shardingRuleList.add(shardingRule);

        //刷新SQL路由缓存的表节点实际配置和表规则
        OrderedServicesCache.cacheServices(shardingRuleList, SQLRouter.class, getRegisteredServices(shardingRuleList, SQLRouter.class));
        //刷新SQL重写缓存的表节点实际配置和表规则
        OrderedServicesCache.cacheServices(shardingRuleList, SQLRewriteContextDecorator.class, getRegisteredServices(shardingRuleList, SQLRewriteContextDecorator.class));
        //刷新ResultProcessEngine缓存的表节点实际配置和表规则
        OrderedServicesCache.cacheServices(shardingRuleList, ResultProcessEngine.class, getRegisteredServices(shardingRuleList, ResultProcessEngine.class));
    }

    /**
     * 获取shardingspere缓存的服务
     *
     * @param types           集合类型
     * @param orderedSPIClass class类型
     * @param <K>             k
     * @param <V>             v
     * @return 服务
     */
    private <K, V extends OrderedSPI<?>> Map<K, V> getRegisteredServices(Collection<K> types, Class<V> orderedSPIClass) {
        Collection<V> registeredServices = OrderedSPIRegistry.getRegisteredServices(orderedSPIClass);
        Map<K, V> result = new LinkedHashMap(registeredServices.size(), 1.0F);
        Iterator iterator = registeredServices.iterator();

        while (iterator.hasNext()) {
            V each = (V) iterator.next();
            types.stream().filter((type) -> {
                return each.getTypeClass() == type.getClass();
            }).forEach((type) -> {
                result.put(type, each);
            });
        }

        return result;
    }

    /**
     * 获取数据库表真实节点,有默认的0库,替换为真实的所有的逻辑库,如:ds$->{[0]}替换为ds$->{[0,1,2,3]}
     *
     * @param tableDateNodes 表节点
     * @return 数据库真实节点
     */
    private String getTableActualDataNodes(String tableDateNodes, String actualDatabaseIndex) {
        StringBuilder sb = new StringBuilder(tableDateNodes);
        Pattern p = Pattern.compile("\\[0.*\\]");
        Matcher matcher = p.matcher(tableDateNodes);

        if (matcher.find()) {
            sb.replace(matcher.start(), matcher.end(), "[" + actualDatabaseIndex + "]");
        }

        return sb.toString();
    }

 在数据源发生改变时调用reSetTableActualConfig即可,如项目启动动态生成数据源、nacos监听数据源发生改变、新增租户、删除租户四个地方。

yml配置改为如下 

rules:
      sharding:
        #绑定表规则列表,多个以逗号分割
        binding-tables[0]: biz_device,sys_area
        
        tables:
          tenant_user:
            actual-data-nodes: ds$->{[0]}.sys_tenant_user
            database-strategy:
              hint:
                sharding-algorithm-name: database-defaults
          device:
            actual-data-nodes: ds$->{[0]}.biz_device

项目启动时只加载ds0的表节点,项目启动成功或者数据源发生变化时会自动刷新表的真实节点和表规则,如此可以完美实现多租户功能,有多少数据源就更新为真实数据源。 

总结:动态创建数据源是拷贝替换,这样所有的业务逻辑库都在同一台服务器上,这样还是有局限的,后续可以优化在创建租户的时候,可以选择在哪台服务器上创建业务逻辑库或者写个均衡算法识别在哪个服务器创建业务库。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐