前言

谷粒商场高级篇时至今日已经看完了,一路跟下来感觉收获颇多,现打算对在其中学到的有用的知识点进行总结,以下内容着重于对一些技术栈的使用方法进行总结,像项目中涉及到的 ElasticSearch 、登录框架、支付、性能压测等第三方工具和接口的使用我们则不在此处再次介绍了。。。。想要了解的可以查看之前的专栏笔记。

如此接下来将会介绍在项目中学到的那些令我受益匪浅的技术或方法


一、Stream

Stream 是 Java8 中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作。使用Stream API 对集合数据进行操作,就类似于使用 SQL 执行的数据库查询。也可以使用 Stream API 来并行执行操作。简而言之,Stream API 提供了一种高效且易于使用的处理数据的方式。

Stream 语法 在项目中使用的也比较多,常常结合 Lambda 表达式进行使用,如不太懂的话在做此项目的时候回一脸懵,我们来看看其具体使用:

其实对 Stream 的使用就是实现一个 filter-map-reduce 过程,产生一个最终结果,或者导致一个副作用

A、流创建

创建流的语法我们单独使用的很少,我们更多的是结合创建流使用其中间和终端操作

List<Stream> list = new ArrayList<>();
Stream<String> stream = list.stream();

B、中间操作

map: 它的作用就是把 InputStream 的每个元素映射成 OutputStream 的另外一个元素

例如:

List<Integer> nums = Arrays.asList(1, 2, 3, 4);
List<Integer> squareNums = nums.stream().map(n -> n * n).collect(Collectors.toList());

n 就相当于遍历集合时的每一个元素,后面是对每一个元素进行的操作。。这里是用了 lambda 表达式

lambda 表达式语法:

当参数个数为 1 时,() 可以省略
当操作数为1时,{} 可以省略

(parameters) -> { statements; }

filter: filter对原始Stream进行某项测试,通过测试的元素被留下来生成一个新Stream。

例如:

List<Integer> nums = Arrays.asList(1, 2, 3, 4);
List<Integer> newNums = nums.stream().filter(n -> n > 2).collect(Collectors.toList());

forEach: forEach方法接收一个Lambda表达式,然后在Stream的每一个元素上执行该表达式

例如:

// 对一个人员集合遍历,找出男性并打印姓名。
roster.stream().filter(p -> p.getGender() == Person.Sex.MALE).forEach(p -> System.out.println(p.getName()));

C、终止操作

主要就是前面用到的收集操作

.collect(Collectors.toList())

匹配聚合操作:

/*
匹配、聚合操作
	allMatch:接收一个 Predicate 函数,当流中每个元素都符合该断言时才返回true,否则返回false
	noneMatch:接收一个 Predicate 函数,当流中每个元素都不符合该断言时才返回true,否则返回false
	anyMatch:接收一个 Predicate 函数,只要流中有一个元素满足该断言则返回true,否则返回false
	findFirst:返回流中第一个元素
	findAny:返回流中的任意元素
	count:返回流中元素的总个数
	max:返回流中元素最大值
	min:返回流中元素最小值
*/

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

boolean allMatch = list.stream().allMatch(e -> e > 10); //false
boolean noneMatch = list.stream().noneMatch(e -> e > 10); //true
boolean anyMatch = list.stream().anyMatch(e -> e > 4);  //true

Integer findFirst = list.stream().findFirst().get(); //1
Integer findAny = list.stream().findAny().get(); //1

long count = list.stream().count(); //5
Integer max = list.stream().max(Integer::compareTo).get(); //5
Integer min = list.stream().min(Integer::compareTo).get(); //1

二、MP 相关操作

项目中持久层框架是用的 MyBatis Plus ,由于之前没有学过 MP 只学过 MyBatis,有的从数据库的查询操作也是看不懂的,在此特此记录。。。而且 MP 不需要写 mapper 映射文件的方式,也是十分方便的。

开发中使用 MP 要导入其相关依赖

<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.3.1.tmp</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.18</version>
</dependency>

A、常用注解

@TableName

  • 描述:表名注解,标识实体类对应的表
  • 使用位置:实体类
// 即此实体类对应 sys_user 这张表
@TableName("sys_user")
public class User {
    private Long id;
    private String name;
    private Integer age;
    private String email;
}

@TableId

  • 描述:主键注解,表明此字段是表中主键
  • 使用位置:实体类主键字段
@TableName("sys_user")
public class User {
    @TableId
    private Long id;
    private String name;
    private Integer age;
    private String email;
}

注解参数:

属性类型必须指定默认值描述
valueString“”主键字段名
typeEnumIdType.NONE指定主键类型

IdType 中包含主键自增策略

描述
AUTO数据库 ID 自增
NONE无状态,该类型为未设置主键类型(注解里等于跟随全局,全局里约等于 INPUT)
INPUTinsert 前自行 set 主键值
ASSIGN_ID分配 ID (主键类型为 Number(Long 和 Integer) 或 String) (since 3.3.0),使用接口 IdentifierGenerator 的方法 nextId (默认实现类为 DefaultIdentifierGenerato 雪花算法)
ASSIGN_UUID分配 UUID,主键类型为 String(since 3.3.0),使用接口 IdentifierGenerator 的方法 nextUUID (默认 default 方法)
ID_WORKER分布式全局唯一 ID 长整型类型(please use ASSIGN_ID)
UUID32 位 UUID 字符串(please use ASSIGN_UUID)
ID_WORKER_STR分布式全局唯一 ID 字符串类型(please use ASSIGN_ID)
package com.example.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.*;

//所有无参构造和get方法和set方法还有哈希库方法注解(lombok)
//有参构造方法注解(@AllArgsConstructor)
@Data
//设置实体类所对应的表名
//@TableName("t_user")
public class User {
    //将该属性对应的字段指定为主键
    @TableId(value = "uid",type = IdType.AUTO)
    //@TableId注解的value属性用于指定主键的字段
    //@TableId注解的type属性用于设置主键的生成策略(一定要开启自增)
    //最终插入的主键不再是雪花算法生成的
    private Long id;
    private String name;
    private Integer age;
    private String email;
}

@TableField

  • 描述:字段注解 (非主键), 可省略
@TableName("sys_user")
public class User {
    @TableId
    private Long id;
    @TableField("nickname")
    private String name;
    private Integer age;
    private String email;
    // 常用: 标注其为非数据库, 但又是必须使用的, 防止 BaseMapper 基础查询时误加上此条件
    @TableField(exist = false)
    private String phone;
}

另有:

@TableLogic:表字段逻辑处理注解(逻辑删除)

@Version:乐观锁注解

@OrderBy:内置 SQL 默认指定排序,优先级低于 wrapper 条件查询

B、常用方法

我们的 mapper 包下可以继承 MP 封装好的 BaseMapper 类,其提供了基础的 CRUD 方法

package mapper;

import bean.User;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

public interface UserMapper extends BaseMapper<User> {} 

BaseMapper 中常用方法为:

/**
 * Mapper 继承该接口后,无需编写 mapper.xml 文件,即可获得CRUD功能
 * 这个 Mapper 支持 id 泛型
 */
public interface BaseMapper<T> {
 
    /**
     * 插入一条记录
     * @param entity
     * 实体对象
     * @return int
     */
    Integer insert(T entity);
 
    /**
     * 根据 ID 删除
     * @param id
     * 主键ID
     * @return int
     */
    Integer deleteById(Serializable id);
 
    /**
     * 根据 columnMap 条件,删除记录
     * @param columnMap
     * 表字段 map 对象
     * @return int
     */
    Integer deleteByMap(@Param("cm") Map<String, Object> columnMap);
 
    /**
     * 根据 entity 条件,删除记录
     * @param wrapper
     * 实体对象封装操作类(可以为 null)
     * @return int
     */
    Integer delete(@Param("ew") Wrapper<T> wrapper);
 
    /**
     * 删除(根据ID 批量删除)
     * @param idList
     * 主键ID列表
     * @return int
     */
    Integer deleteBatchIds(List<? extends Serializable> idList);
 
    /**
     * 根据 ID 修改
     * @param entity
     * 实体对象
     * @return int
     */
    Integer updateById(T entity);
 
    /**
     * 根据 whereEntity 条件,更新记录
     * @param entity
     * 实体对象
     * @param wrapper
     * 实体对象封装操作类(可以为 null)
     * @return
     */
    Integer update(@Param("et") T entity, @Param("ew") Wrapper<T> wrapper);
 
    /**
     * 根据 ID 查询
     * @param id
     * 主键ID
     * @return T
     */
    T selectById(Serializable id);
 
    /**
     * 查询(根据ID 批量查询)
     * @param idList
     * 主键ID列表
     * @return List<T>
     */
    List<T> selectBatchIds(List<? extends Serializable> idList);
 
    /**
     * 查询(根据 columnMap 条件)
     * @param columnMap
     * 表字段 map 对象
     * @return List<T>
     */
    List<T> selectByMap(@Param("cm") Map<String, Object> columnMap);
 
    /**
     * 根据 entity 条件,查询一条记录
     * @param entity
     * 实体对象
     * @return T
     */
    T selectOne(@Param("ew") T entity);
 
    /**
     * 根据 Wrapper 条件,查询总记录数
     * @param wrapper
     * 实体对象
     * @return int
     */
    Integer selectCount(@Param("ew") Wrapper<T> wrapper);
 
    /**
     * 根据 entity 条件,查询全部记录
     * @param wrapper
     * 实体对象封装操作类(可以为 null)
     * @return List<T>
     */
    List<T> selectList(@Param("ew") Wrapper<T> wrapper);
 
    /**
     * 根据 Wrapper 条件,查询全部记录
     * @param wrapper
     * 实体对象封装操作类(可以为 null)
     * @return List<T>
     */
    List<Map<String, Object>> selectMaps(@Param("ew") Wrapper<T> wrapper);
 
    /**
     * 根据 Wrapper 条件,查询全部记录
     * @param wrapper
     * 实体对象封装操作类(可以为 null)
     * @return List<Object>
     */
    List<Object> selectObjs(@Param("ew") Wrapper<T> wrapper);
 
    /** 
     * 用法:(new RowBounds(offset, limit), ew);
     * 根据 entity 条件,查询全部记录(并翻页)
     * @param rowBounds
     * 分页查询条件(可以为 RowBounds.DEFAULT)
     * @param wrapper
     * 实体对象封装操作类(可以为 null)
     * @return List<T>
     */
    List<T> selectPage(RowBounds rowBounds, @Param("ew") Wrapper<T> wrapper);
 
    /** -- 不常用,
     * 根据 Wrapper 条件,查询全部记录(并翻页)
     * @param rowBounds
     * 分页查询条件(可以为 RowBounds.DEFAULT)
     * @param wrapper
     * 实体对象封装操作类
     * @return List<Map<String, Object>>
     */
    List<Map<String, Object>> selectMapsPage(RowBounds rowBounds, @Param("ew") Wrapper<T> wrapper);
}

使用:

@Test
public void testSelect() {
	User user = userMapper.selectById(1L);
	System.out.println(user);
}

在这里插入图片描述
C、条件构造器

通过使用条件构造器 QueryWrapper,我们可以与上面的 BaseMapper 中封装好的 CRUD 方法结合起来,作为方法的参数即可使用代码的方式进行数据库的相关操作。。。

queryWrapper.lt() ——小于
queryWrapper.le() ——小于等于
queryWrapper.gt() ——大于
queryWrapper.ge() ——大于等于
queryWrapper.eq() ——等于
queryWrapper.ne() ——不等于
queryWrapper.betweeen("age",10,20) ——age在值1020之间
queryWrapper.notBetweeen("age",10,20) ——age不在值1020之间
queryWrapper.like("属性", "值") ——模糊查询匹配值‘%%’
queryWrapper.notLike("属性","值") ——模糊查询不匹配值‘%%’
queryWrapper.likeLeft("属性","值") ——模糊查询匹配最后一位值‘%值’
queryWrapper.likeRight("属性", "值")——模糊查询匹配第一位值‘值%’
queryWrapper.isNull() ——值为空或null
queryWrapper.isNotNull() ——值不为空或null
queryWrapper.in("属性", 条件, 条件)——符合多个条件的值
queryWrapper.notIn("属性" , 条件, 条件) ——不符合多个条件的值
queryWrapper.or() ——或者
queryWrapper.and() ——和
queryWrapper.orderByAsc("属性")——根据属性升序排序
queryWrapper.orderByDesc("属性")——根据属性降序排序
@Test
public void squery() {
    //创建QueryWrapper 对象
    QueryWrapper<User> queryWrapper = new QueryWrapper<>();

    //ge gt le lt: 大于等于 大于 小于等于 小于
    queryWrapper.ge("age",29);
    List<User> users = userMapper.selectList(queryWrapper);
    System.out.println(users);
    
    //eq ne  等于 不等于

    //between 介于查询
    queryWrapper.between("age",28,30);
    List<User> users1 = userMapper.selectList(queryWrapper);
    System.out.println(users1);
    
    //模糊查询
    queryWrapper.like("nam","l");
    queryWrapper.select("id","nam");
    List<User> users2 = userMapper.selectList(queryWrapper);
    System.out.println(users2);
    
    // orderByDesc 降序查找
    queryWrapper.orderByDesc;
    
    //last 在sql语句后面拼接
    queryWrapper.last("limit 1");

}

在项目中使用到的例如:

AttrAttrgroupRelationEntity attrId = relationDao.selectOne(new QueryWrapper<AttrAttrgroupRelationEntity>().eq("attr_id", attrEntity.getAttrId()))

详解:

在这里插入图片描述

三、Redis 的使用

Redis 缓存中间件,在项目中的主要应用场景为请求频繁向数据库查询数据时,我们可以将数据存到 Redis 中,查询数据时先到 Redis 中查,如果没有则再到数据库去查。在使用 Redis 的时候,我们需要考虑缓存失效问题和数据一致性问题(数据库和 Redis 中数据一致)。当然如果不是分布式系统,我们可以做本地缓存来代替 Redis 的使用。。。

具体使用:

A、安装 Redis

1. 拉去镜像
docker pull redis

2. 创建外部挂载的目录和文件
mkdir -p /mydata/redis/conf
touch /mydata/redis/conf/redis.conf

3. 初始化redis镜像并启动, 同时配置了redis的外部挂载 
docker run -p 6379:6379 --name redis -v /mydata/redis/data:/data \
-v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf

4. 说明
-p 6379:6379: 将容器的6379端口映射到主机的6379端口
-v /mydata/redis/data:/data: 将日志文件挂载到主机
-v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf: 将配置文件挂载到主机
-d redis: 后台运行, redis为镜像名称
redis-server /etc/redis/redis.conf: 容器启动时使用的配置文件

B、添加依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

C、配置 redis

spring:
  redis:
	host: 192.168.56.10	
	port: 6379

D、Redis 存取数据

Redis 中有五种数据类型,例如 String、list、hash等,对于不同数据类型的操作,我们在使用 RedisTemplate 时应使用不同的方法,如 opsForValue、opsForList、opsForHash 等进行操作。

在项目中我们主要使用的是 RedisTemplate 的子类:StringRedisTemplate

关于两者的使用场景区别:

  • 当你的redis数据库里面本来存的是字符串数据或者你要存取的数据就是字符串类型数据的时候,那么你就使用StringRedisTemplate即可。

  • 但是如果你的数据是复杂的对象类型,而取出的时候又不想做任何的数据转换,直接从Redis里面取出一个对象,那么使用RedisTemplate是更好的选择。

@Autowired
StringRedisTemplate stringRedisTemplate;

public void redisString(){
    //存入数据
    stringRedisTemplate.opsForValue().set("key","value");
    //存入数据设置缓存时间 TimeUnit.SECONDS 单位:秒
    stringRedisTemplate.opsForValue().set("key","value",1200, TimeUnit.SECONDS);
    //如果不存在则插入,返回true为插入成功,false失败
    Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent("key", "value");

    Map<String,String> map = new HashMap<>();
    map.put("key1", "value1");
    map.put("key2", "value2");
    //批量插入,key值存在会覆盖原值
    stringRedisTemplate.opsForValue().multiSet(map);

    //批量插入,如果里面的所有key都不存在,则全部插入,返回true,如果其中一个在redis中已存在,全不插入,返回false
    Boolean absent1 = stringRedisTemplate.opsForValue().multiSetIfAbsent(map);

    //获取值,key不存在返回null
    Object object = stringRedisTemplate.opsForValue().get("key");
    //批量获取,key不存在返回null
    List<String> list = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("key1", "key2"));

    //获取指定字符串的长度
    Long size = stringRedisTemplate.opsForValue().size("key");
    //原有的值基础上新增字符串到末尾 返回长度
    Integer integer = stringRedisTemplate.opsForValue().append("key", "lemon");

    //获取原来key键对应的值并重新赋新值 返回原来值
    String str =stringRedisTemplate.opsForValue().getAndSet("key","lemon");

    //获取指定key的值进行减1,如果value不是integer类型,会抛异常,如果key不存在会创建一个,默认value为0
    stringRedisTemplate.opsForValue().decrement("keyint");
    //获取指定key的值进行加1,如果value不是integer类型,会抛异常,如果key不存在会创建一个,默认value为0
    stringRedisTemplate.opsForValue().increment("keyint");
    //删除指定key,成功返回true,否则false
    Boolean delete = stringRedisTemplate.opsForValue().getOperations().delete("key");
    //删除多个key,返回删除key的个数
    Long deleteL = stringRedisTemplate.opsForValue().getOperations().delete(Arrays.asList("key1", "key2"));
}

E、设置存入数据的过期时间

stringRedisTemplate.opsForValue().set("userNum", "100", 60 * 10, TimeUnit.SECONDS);

F、分布式锁

使用分布式锁可以解决缓存击穿问题。。。

我们首先来看使用 StringRedisTemplate (RedisTemplate) 来操作分布式锁。。。。

public Map<String, String> testRedisLock() {
   String uuid = UUID.randomUUID().toString();
   Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
   if (lock) {
       try {
           // 业务逻辑代码
       } finally {
           // 删除锁
           String script = "if redis.call('get', KEYS[1]) == ARGV[1] thenreturn redis.call('del', KEYS[1]) else return 0 end";
           Long res = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
       }
       return ..;
   } else {
       //加锁失败...重试。synchronized ()
       //休眠 100ms 重试
       System.out.println("获取分布式锁失败...等待重试");
       try{
           Thread.sleep(200);
       } catch (Exception e){

       }
       return testRedisLock();//自旋的方式
   }
}

注:redisTemplate 操作分布式锁的关键是:加锁(占位+过期时间) 及 删锁 (判断+删除) 操作的完整性

前者我们使用了 setIfAbsent 来保证完整性,后者我们使用了 lua 脚本。至于为什么加过期时间,是如果服务器宕机导致删锁失败,锁一直占着,造成死锁问题。。而为什么进行判断然后进行删除,则是我们只有自己才能删除自己的锁。。。

但这样还是有很多问题,如锁的自动续期。。。其实我们可用 Redisson 来完成分布式锁

Redission 配置:

package com.fancy.gulimall.seckill.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Configuration
public class MyRedissonConfig {

    /**
     * 所有对Redisson的使用都是通过RedissonClient对象
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson(@Value("${spring.redis.host}") String url) throws IOException {
        //1、创建配置
        //Redis url should start with redis:// or rediss://
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+url+":6379");
        //2、根据Config创建出RedissonClient示例
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }
}

使用:

@Autowired
RedissonClient redissonClient;

private final static String LOCK = "REDISSON_LOCK";
public void testRedisson() {
    //定义锁
    RLock lock = redissonClient.getLock(LOCK);
    // 加锁
    //lock.lock();
    try {
        //尝试加锁,最大等待时间300毫秒,上锁30毫秒自动解锁
        if (lock.tryLock(300, 30, TimeUnit.MILLISECONDS)) {

        }
    } catch (Exception e) {

    } finally {
        //释放锁
        lock.unlock();
    }
}

Redisson 锁的使用方式类似于 JUC 锁。。。。

四、SpringCache 的使用

使用 SpringCache 的相关注解,我们可以简化缓存的相关操作。。

A、引入依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-cache</artifactId>
</dependency>

B、配置 Redis 作为缓存

spring.cache.type=redis

C、开启缓存注解

主启动类上加上 @EnableCaching

D、使用

在想要处理缓存的方法上加上以下注解

@Cacheable 注解会先查询是否已经有缓存,有会使用缓存,没有则会执行方法并缓存。

注:放入缓存中的数据是方法的返回值

@Override
@Cacheable("menu")
public Menu findById(String id) {
    Menu menu = this.getById(id);
    if (menu != null){
        System.out.println("menu.name = " + menu.getName());
    }
    return menu;
}

在这个例子中,findById 方法与一个名为 menu 的缓存关联起来了。调用该方法时,会检查 menu 缓存,如果缓存中有结果,就不会去执行方法了。其实是对于参数相同的方法就不在执行,参数相同这一点应该是很容易理解的,因为缓存不关心方法的执在这里插入代码片行逻辑,它能确定的是:对于同一个方法,如果参数相同,那么返回结果也是相同的。

@CacheEvict 注解用于清除缓存

参数:

  • value:缓存位置名称,不能为空
  • key:缓存的 key,默认为空
  • condition:触发条件,只有满足条件的情况才会清除缓存,默认为空,支持SpEL
  • allEntries:true 表示清除 value 中的全部缓存,默认为 false
//清除掉指定key的缓存  
@CacheEvict(value="andCache",key="#user.userId + 'findById'")  
public void modifyUserRole(SystemUser user) {  
	System.out.println("hello andCache delete"+user.getUserId());  
}  

//清除掉全部缓存  
@CacheEvict(value="andCache",allEntries=true)  
public final void setReservedUsers(String[] reservedUsers) {  
	System.out.println("hello andCache deleteall");  
}

E、缓存失效问题解决

  • 缓存穿透:空结果缓存
spring.cache.redis.cache-null-values=true
  • 缓存击穿:@Cacheable 中 sync 参数设置为 true

  • 缓存雪崩:加上过期时间

spring.cache.redis.time-to-live=3600000

五、异步及线程池

在开发中我们使用多线程往往通过线程池的方式,使用线程池便于对多线程进行管理。

参数配置:

package com.fancy.gulimall.order.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolConfigProperties {
    private Integer coreSize;
    private Integer maxSize;
    private Integer keepAliveTime;
}

通过此文件我们将线程池的核心参数放到 application.properties 中进行配置。。

gulimall.thread.core-size=20
gulimall.thread.max-size=200
gulimall.thread.keep-alive-time=10

线程池配置:

package com.fancy.gulimall.order.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool){
       return new ThreadPoolExecutor(pool.getCoreSize(),
                pool.getMaxSize(),pool.getKeepAliveTime(),
                TimeUnit.SECONDS,new LinkedBlockingDeque<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    }
}

使用:

@Autowired
ThreadPoolExecutor executor;
executor.execute(() -> {
   //业务代码
});

线程池常常配合异步编排来使用:

@Autowired
ThreadPoolExecutor executor;

// runAsync() 方法用于执行异步任务
CompletableFuture<Void> task1 = CompletableFuture.runAsync(()->{
	// 业务方法一
}, executor); 

CompletableFuture<Void> task2 = CompletableFuture.runAsync(()->{
	// 业务方法二
	// thenRunAsync 表示业务二执行完后再异步执行业务三, 同时忽略上一个线程的执行结果和逻辑
}, executor).thenRunAsync(()->{
   // 业务方法三
}, executor); 

// 所以异步任务执行完之后抛出异常
CompletableFuture.allOf(task1, task2).get();

六、RabbitMQ 的使用

在项目中我们用 RabbitMQ 作服务间通信解耦,在分布式事务中用于解决事务的最终的一致性,在高并发场景下用于削峰填谷。

我们来看其具体使用:

A、安装 RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

如果 docker 中没有 RabbitMQ 相关镜像,会先对镜像进行下载。。。

B、添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

C、配置文件

# RabbitMQ配置
spring.rabbitmq.host=192.168.38.130
spring.rabbitmq.port=5672

# 虚拟主机配置
spring.rabbitmq.virtual-host=/

# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true

# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true

# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true

# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

D、开启 RabbitMQ 功能注解

在主启动类上加上:@EnableRabbit

E、使用示例

路由键:

每个接收端在绑定交换机的时候可以设置相应路由键,每个发送端在发送消息的时候可以指明路由键,交换机可以根据路由键将数据发送到指定的队列中,这样接收端就能从指定的队列获取到相应的数据。

测试类中:

@Autowired
AmqpAdmin amqpAdmin;

@Autowired
RabbitTemplate  rabbitTemplate;


// 创建交换机
@Test
public void createExchange() {
    DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
    amqpAdmin.declareExchange(directExchange);
}

// 创建队列
@Test
public void createQueue() {
    Queue queue = new Queue("hello-java-queue", true, false, false);
    amqpAdmin.declareQueue(queue);
}

// 创建绑定关系
@Test
public void createBinding() {
    // String destination:目的地,
    // DestinationType destinationType: 目的地类型,
    // String exchange: 交换机,
    // String routingKey: 路由键,
    // Map<String, Object> arguments: 自定义参数
    // 将 exchange 指定的交换机和 destination 目的地进行绑定, 使用 routingKey 作为指定的路由键
    Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
    amqpAdmin.declareBinding(binding);
}


@Test
public void sendMessage() {
    User user = new User();
    user.setName("Mike");
    user.setAge(18);
    user.setEmail("1796952157@qq.com")
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", user, new CorrelationData(UUID.randomUUID().toString());
}

rabbitTemplate.convertAndSend()方法可以将对象格式发送到队列中,示例中的 User 实体类必须实现序列化。。。

接收消息:

在一个方法上添加 @RabbitListener 参数为队列的名,方法参数就是收到的消息。

手动签收:channel 的 basicAck 方法手动签收,通过 basicNack 方法拒绝签收。。。

@RabbitListener(queues = {"hello-java-queue"})
public void recieveMessage(User user, Message message, Channel channel) throws InterruptedException  {
    // 这里看到具体消息需要配置 JSON 序列化
	System.out.println("接收到了。。" + user);
    
    // 消息属性
    byte[] body = message.getBody();
    MessageProperties properties = message.getMessageProperties();

    // 获取之前我们传递过去的消息唯一标识
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
    // rabbitmq 默认是自动签收, 如在配置文件中配置了  spring.rabbitmq.listener.simple.acknowledge-mode=manual 则需要手动签收
    // 手动签收
    channel.basicAck(deliveryTag,false);
    // long deliveryTag, boolean multiple, boolean requeue 
    // 消息标识         积压消息批量处理         是否重新发送
    // 手动拒签
    channel.basicNack(deliveryTag,false,true);
}

使用 RabbitMQ 的延时队列可以实现定时任务的功能,这点我们在下面详细说明。。。

另外,关于RabbitMQ 异步、解耦及削峰三者的说明:

解耦:

A ----> B : A 服务远程调用B
A -----> MQ -----> B :A服务将消息发送到 MQ 中B服务接收执行

前者 B 服务 crash 掉 A 服务会受到影响(调用异常),后者的话B服务crash掉,由于A发送消息到队列中,因此A服务不受影响

异步:

用户点击支付 ----> 积分折扣 ----> 消费券 ----> 短信验证 ----->支付完成

中间三步不是串行的,可以异步并行处理

削峰:

双十一订单写入队列一条一条进行处理

同时 RabbitTemplate 可以进行定制化配置

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.annotation.PostConstruct;

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    /**
     * 使用JSON序列化机制,进行消息转换
     */
    @Bean
    public MessageConverter messageConverter(){

        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务器收到消息就回调
     *      1、spring.rabbitmq.publisher-confirms=true
     *      2、设置确认回调ConfirmCallback
     * 2、消息正确抵达队列进行回调
     *      1、 spring.rabbitmq.publisher-returns=true
     *          spring.rabbitmq.template.mandatory=true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)。
     *      spring.rabbitmq.listener.simple.acknowledge-mode=manual 手动签收
     *      1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息
     *          问题:
     *              我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。就会发生消息丢失;
     *              消费者手动确认模式。只要我们没有明确告诉MQ,货物被签收。没有Ack,
     *                  消息就一直是unacked状态。即使Consumer宕机。消息不会丢失,会重新变为Ready,下一次有新的Consumer连接进来就发给他
     *      2、如何签收:
     *          channel.basicAck(deliveryTag,false);签收;业务成功完成就应该签收
     *          channel.basicNack(deliveryTag,false,true);拒签;业务失败,拒签
     */
	//  @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate(){
        //设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *
             * 1、只要消息抵达Broker就ack=true
             * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
             * @param ack  消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {

                /**
                 * 1、做好消息确认机制(pulisher,consumer【手动ack】)
                 * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
                 */
                //服务器收到了;
                //修改消息的状态
                System.out.println("confirm...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
            }
        });

        //设置消息抵达队列的确认回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * @param message   投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange  当时这个消息发给哪个交换机
             * @param routingKey 当时这个消息用哪个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //报错误了。修改数据库当前消息的状态->错误。
                System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]===>exchange["+exchange+"]===>routingKey["+routingKey+"]");
            }
        });
    }
}

七、定时任务

实现定时任务我们主要用到两种方式: ① Spring Task 和 ②RabbitMQ 延时队列

1. Spring Task 的使用

A、开启功能注解

主启动类上加上@EnableScheduling 开启定时任务。。。。

B、添加定时任务

方法上加上 @Scheduled 注解标注 即可,参数是 cron 表达式

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component // 把此类托管给 Spring,不能省略
public class TaskUtils {    
    // 添加定时任务    
    @Scheduled(cron = "30 40 23 0 0 5") // cron表达式:每周一 23:40:30 执行    
    public void doTask(){        
        System.out.println("我是定时任务~");    
    }
}

Spring Boot 启动后会自动加载并执行定时任务,无需手动操作。

2. RabbitMQ 延时队列

场景:比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。

我们可以用定时任务轮询数据库的方式进行解决,但对系统资源的消耗及数据库压力会增大,我们可以采用 Rabbit 延时队列的方式来实现定时任务,即 消息的 TTL + 死信路由

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MyMQConfig {
    
    // 延时队列, 设置消息的存活时间, 死信交换机和死信路由
    @Bean
    public Queue orderDelayQueue() {

        Map<String,Object> arguments = new HashMap<>();
        /**
         * x-dead-letter-exchange: order-event-exchange
         * x-dead-letter-routing-key: order.release
         * x-message-ttl: 60000
         */
        arguments.put("x-dead-letter-exchange","order-event-exchange");
        arguments.put("x-dead-letter-routing-key","order.release");
        arguments.put("x-message-ttl",60000);
        Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
        return queue;
    }
    
    // 死信队列, 延时队列中消息过期后进入此队列。。。
    @Bean
    public Queue orderReleaseQueue() {
        Queue queue = new Queue("order.release.queue", true, false, false);
        return queue;
    }
 
    // 死信交换机, 死信经此转发。。。同时也充当普通交换机的功能
    @Bean
    public Exchange orderEventExchange() {
       return new TopicExchange("order-event-exchange", true, false);
    }


    // 消息(订单信息) 发布通过此绑定关系进行。。。, 然后消息进入延时队列
    @Bean
    public Binding  orderCreateBinding() {
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create",
                null);
    }
   
    // 消息过期之后因为此绑定关系进入死信队列
    @Bean
    public Binding orderReleaseBinding() {
        return new Binding("order.release.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release",
                null);
    }


    /**
     * 订单释放直接和库存释放进行绑定
     * @return
     */
    @Bean
    public Binding orderReleaseOtherBingding() {
        return new Binding("stock.release.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.#",
                null);
    }
}

使用延时队列完成定时任务:

当订单创建成功时 给 MQ 发送消息

 rabbitTemplate.convertAndSend("order-event-exchange", "order.create", order.getOrder());

这里传递的是订单的详细信息,一段时间后消息过期,订单信息进入死信队列。。

我们同时需要配置监听类(方法)来监听死信队列,获取进入死信队列的消息然后根据订单消息进行关单

import com.fancy.gulimall.order.entity.OrderEntity;
import com.fancy.gulimall.order.service.OrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

@RabbitListener(queues = "order.release.queue")
@Service
public class OrderCloseListener {

    @Autowired
    OrderService orderService;
    
    @RabbitHandler
    public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
        System.out.println("收到过期的订单信息:准备关闭订单" + entity.getOrderSn() + "==>" + entity.getId());
        try{
            orderService.closeOrder(entity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}

关单的操作可能是更改数据库订单状态,释放相应的库存等等。。。,总之是对数据库或内存的相关操作

八、分布式事务

1. 本地事务

在 SpringBoot 中我们可以使用注解轻松完成本地事务,步骤如下

A、开启事务功能注解

在主启动类上添加 @EnableTransactionManagement 注解

B、添加事务

在方法上添加 @Transactional,即可开启事务功能,当方法出现异常时,会进行回滚。但仅仅会对本地的业务方法进行回滚,向远程调用的方法,无法进行回滚。

2. 分布式事务

针对上述问题,我们要用分布式事务来解决,在此用 Seata 做分布式事务。。。

  • 每一个微服务先必须创建 undo_log
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  • 安装事务协调器:seata-server: https://github.com/seata/seata/releases

  • 导入依赖

<dependency>
   <groupId>com.alibaba.cloud</groupId>
   <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
  • 解压并启动 seata-server
    registry.conf: 注册中心配置。修改 registry type=nacos

  • 所有想要用到分布式事务的微服务使用seata DataSourceProxy 代理自己的数据源

import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;

@Configuration
public class MySeataConfig {

    @Autowired
    DataSourceProperties dataSourceProperties;

    @Bean
    public DataSource dataSource(DataSourceProperties dataSourceProperties){
        //properties.initializeDataSourceBuilder().type(type).build();
        HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
        if (StringUtils.hasText(dataSourceProperties.getName())) {
            dataSource.setPoolName(dataSourceProperties.getName());
        }
        return new DataSourceProxy(dataSource);
    }
}
  • 每个微服务,都必须导入
    registry.conf
    file.conf vgroup_mapping.{application.name}-fescar-service-group = “default”
  • 启动测试分布式事务
  • 给分布式大事务的入口标注@GlobalTransactional
  • 每一个远程的小事务用 @Transactional

事务的最终一致性:

在这里插入图片描述

九、拦截器

拦截器主要用于拦截用户的请求并做相应的处理,通常应用在权限验证、记录请求信息的日志、判断用户是否登录等功能上。

在 SpringBoot 中配置拦截器步骤:

package com.fancy.gulimall.order.interceptor;

import com.fancy.common.constant.AuthServerConstant;
import com.fancy.common.vo.MemberRespVo;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;


@Component
public class LoginUserInterceptor implements HandlerInterceptor {

   public static ThreadLocal<LoginUser> loginUser = new ThreadLocal<>();

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
       // 进入 controlller 之前调用 
    }
    
     @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView)    throws Exception {
       // 返回 ModelAndView 之前调用
    }

}

ThreadLocal :我们可以将 controller 中获取到的属性放到 ThreadLocal 中,便于同一线程内进行调用。

与 过滤器 、AOP 类比:

过滤器,拦截器拦截的是URL。AOP拦截的是类的元数据(包、类、方法名、参数等)。过滤器并没有定义业务用于执行逻辑前、后等,仅仅是请求到达就执行。 拦截器有三个方法,相对于过滤器更加细致,有被拦截逻辑执行前、后等。AOP 针对具体的代码,能够实现更加复杂的业务逻辑。 三者功能类似,但各有优势,从过滤器 -––> 拦截器 —–> 切面,拦截规则越来越细致。 执行顺序依次是过滤器、拦截器、切面。

添加拦截器请求过滤规则。。。。

import com.fancy.gulimall.order.interceptor.LoginUserInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class rWebConfiguration implements WebMvcConfigurer {


    @Autowired
    LoginUserInterceptor interceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(interceptor).addPathPatterns("/**");
    }
}

十、分布式信号量

信号量是一种锁,它可以让用户限制一项资源最多能够同时被多少个进程访问,通常用于限定能够同时使用的资源数量。你可以把 Redis 分布式锁里面创建的锁看作是只能被一个进程访问的信号量。在分布式场景下我们可以用信号量来做服务的限流。。。即分布式限流

我们用 Redisson 来做,以秒杀场景为例

// 前端传来的用户要秒杀商品的数量
int num = ...;

@Autowired
RedissonClient redissonClient;

public void save() {
   // 创建信号量
   RSemaphore semaphore = redissonClient.getSemaphore("sem");
   // 商品可以秒杀的数量作为信号量凭证个数
   semaphore.trySetPermits(seckillSkuVo.getSeckillCount());
   // 设置信号量过期时间为秒杀场景结束后 
   semaphore.expireAt(sesssion.getEndTime());
}

public void kill() {
   // 获取信号量
   RSemaphore semaphore = redissonClient.getSemaphore("sem");
   // 尝试获取 num 个凭证, num 值即放行线程的数量 
   boolean bool = semaphore.tryAcquire(num);
   if (bool) {
   	// 业务逻辑代码
   	// 比如说下单后会对相应的秒杀商品数量进行删除
   }
}

十一、SpringCloud (Alibaba) 组件简介

SpringCloud Alibaba Nacos:服务的注册和配置中心

使用说明:引入 Nacos 依赖,application.properties 配置 discovery.server-addr 、config.server-addr、application.name等,主启动类上加上 @EnableDiscoveryClient 开启服务的注册与发现

SpringCloud OpenFeign:服务之间的相互调用

使用说明:引入 OpenFeign 依赖,主启动类上添加 @EnableFeignClients 注解开启远程调用功能,接口上添加@FeignClient("调用服务名")表明要调用哪个服务的方法,接口內部需编写对应服务 Controller 中 方法签名。。。

SpringCloud Gateway:服务网关

使用说明:一般写成单独的一个微服务作为项目整体的入口,application.yml 中配置路由规则来确定路由到哪个服务 。

SpringCould Alibaba Sentinel:服务限流与熔断降级

使用说明:详见 谷粒商城 高级篇 (二十五) --------- SpringCloud Alibaba Sentinel

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐