写在前面

本文是我毕业后入职的第一家公司的第一周入职培训所布置的任务,主要是考察一些java的基础知识,以及一些数据库的基本使用,文章过于入门,请大佬走开。

需求说明

阶段1需求

  1. 纯后台,不需要前端
  2. 增加接口,增加商品(商品名称、商品型号、生产厂家、添加时间、商品编号(主键)、商品状态[未售出、已售出]),其中商品编号用雪花算法生成
  3. 增加接口,商品查询(查询条件为 商品名称[商品型号][生产厂家]或根据商品编号直接查询),考虑复合索引
  4. 增加接口,售出商品(商品名称、商品型号、添加时间),编辑商品状态即可
  5. 使用jmeter测试效果,录入10W个商品,同时售出5W个商品
  6. 使用springboot+mybatis+mysql8+swagger

阶段1主要是springboot对mysql数据库的基础操作吧,这部分还是比较简单。

阶段2需求

  1. 纯后台,不需要前端将商品查询接口改为redis缓存,加入@cacheable注解
  2. 了解redis的五种数据结构及常用命令

阶段2就加几个注解,背会儿redis相关的东西吧

阶段3需求

  1. 将之前的系统改造为纯redis实现
  2. 引入lambda stream

阶段3就是由对mysql数据库操作改为对redis数据库的操作

阶段4需求

  1. 学习什么是MQ,什么是kafka、生产者、消费者组、消费者、topic、partition、单播广播,及kafka原理
  2. 增加商品改为kafka生产,多线程入库
  3. 售出商品改为kafka消费,多线程出库

这个阶段就是改成使用kafka来当做"数据库"了吧,除此之外经理还让我下去了解以下这几个问题,也记录上吧。。。
问题1:什么是MQ?
问题2:什么是kafka
问题3:生产者
问题4:消费者组
问题5:消费者
问题6:topic
问题7:partition
问题8:单播广播
问题9:kafka原理。

阶段5需求

  1. 了解分布式系统中session的解决方案(客户端或服务端)
  2. 增加登录接口,jwt方案
  3. 部署2-3个程序,前置nginx,验证jwt,给出验证方案

这个阶段我就属实看不懂了,在学校也都单机没研究过分布式问题。。。

阶段6需求

  1. 学习Elasticsearch
  2. 程序全改用ES,入10w卖5W

又改数据库了吧。。不过最后还是都了解到了redis,kafka主要作为中间件,作为缓存、暂时存放的数据库,ES作为主要用来查询的数据库。同样针对这个阶段的需求1,经理提出了以下问题
问题1:shard分片 replicas副本 index template索引模板 常见数据类型
问题2:luence 与 ES的关系
问题3:ELK是什么以及三个的关系
问题4:ES存储 字符串 有哪两种数据类型,这两种的区别以及原理【重要】
问题5:ES检索(query)与过滤(filter)的区别是什么(本质)(问题3关联)
问题6:DSL基本语法(K)–查询仓库中所有状态为已售出的数据10W(分页 浅?深?)
问题7:ES分页 浅分页 深分页 区别是什么,怎么用
问题8:bulk\bulk processor是什么,什么时候用)(刁钻问题 bulk和bulk processor的区别)
问题9:官方JAVA CLIENT(highLevel client/lowLevel client 区别)

实现

数据库设计

第一天,开始整吧,首先设计数据库,需求中方括号指的是可选条件。
一些基本类型,状态是枚举,主键使用雪花算法。之后是根据查询条件建立联合索引。

DROP TABLE IF EXISTSmy_stock`;

CREATE TABLE my_stock (
id bigint NOT NULL COMMENT ‘商品编号’,
goods_name varchar(100) DEFAULT NULL COMMENT ‘商品名称’,
goods_type varchar(100) DEFAULT NULL COMMENT ‘商品型号’,
manufacturer varchar(100) DEFAULT NULL COMMENT ‘生产厂家’,
add_date datetime DEFAULT NULL COMMENT ‘添加时间’,
status tinyint DEFAULT NULL COMMENT ‘商品状态’,
PRIMARY KEY (id),
UNIQUE KEY goods_unique_indeies (goods_name,goods_type,add_date) USING BTREE,
KEY goods_indeies (goods_name,goods_type,manufacturer) USING BTREE,
KEY goods_indeies1 (goods_name,manufacturer) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;`

之后是编写实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Goods implements Serializable {

    /**
     商品编号 */
    private Long id;

    /**
     商品名称 */
    private String goodsName;

    /**
     商品类型 */
    private String goodsType;

    /**
     生产厂家 */
    private String manufacturer;

    /**
     商品状态 */
    private Status status;

    /**
      添加时间 */
    private Date addDate;
}

商品状态是0未售出,1已售出的枚举,过于简单就不放代码了
不过主要是要实现TypeHandler来让数据库更方便的储存枚举(mybatisplus直接使用一个注解就好了)

@MappedJdbcTypes(JdbcType.TINYINT)
public class StatusHandle extends BaseTypeHandler<Status> {

    /**
      为非null的字段值赋值为0或者1 */
    @Override
    public void setNonNullParameter(PreparedStatement ps,int i,Status param,JdbcType jt) throws SQLException {
        ps.setInt(i,param.getStateCode());
    }

    /**
      通过0或者1得到未售出或已售出状态 */
    private Status getState(int code){
        if(code==0||code==1){
            return Status.getStateByCode(code);
        }
        return null;
    }

    /**
      通过返回集的字段名来得到状态 */
    @Override
    public Status getNullableResult(ResultSet rs,String columnName) throws SQLException{
        int statuscode=rs.getInt(columnName);
        return getState(statuscode);
    }

    /**
     通过返回集的0或者1来得到状态 */
    @Override
    public Status getNullableResult(ResultSet rs,int columnName) throws SQLException{
        int statuscode=rs.getInt(columnName);
        return getState(statuscode);
    }

    /**
     通过返回存储过程来得到状态 */
    @Override
    public Status getNullableResult(CallableStatement cs, int columnName) throws SQLException{
        int statuscode=cs.getInt(columnName);
        return getState(statuscode);
    }
}

其次是最好是类名与字段名与数据库的一致
最后是雪花算法,说实话当时看到需求根本不认识雪花算法,心里还咯噔一下。
之后了解到雪花算法其实就是指使用64位bit作为唯一id,因为在分布式模式中全局唯一id是非常重要的
其中第一位没有意义,统一为0,之后41位是时间戳,之后10位分别是机房的5位id和机器的5位id。然后一个机房的一个机器在一毫秒内可能生成多个id,所以最后12位就区分这种情况。
既然知道了原理,之后我们从网上copy一个雪花算法工具类就行了。。。。

接口编写

基本的增删改查。
首先是添加:
请求类DTO为

@Data
@ApiModel(description = "商品")
public class GoodsDTO implements {

    @ApiModelProperty(value = "商品名称")
    @NotBlank(message = "商品名称不能为空")
    private String goodsName;

    @ApiModelProperty(value = "商品类型")
    private String goodsType;

    @ApiModelProperty(value = "生产厂家")
    private String manufacturer;
}

id由内部雪花算法生成,添加时间为调用接口时间,售出状态刚添加肯定为未售出。如果有枚举作为入参的需求可以看这篇 枚举反序列化
之后是mapper的insert语句

    <insert id="insertGoods" parameterType="com.lyq.stock.entity.Goods">
        INSERT INTO my_stock
        VALUES (#{id}, #{goodsName}, #{goodsType}, #{manufacturer}, #{addDate}, #{status})
    </insert>

然后是查询商品,这个阶段最难的地方,主要是可以通过各种条件进行查询,所以当时写mapper还整了挺久的

    <resultMap id="fiedmap" type="com.lyq.stock.entity.Goods">
        <id property="id" column="id" jdbcType="BIGINT"/>
        <result property="goodsName" column="goods_name" jdbcType="VARCHAR" />
        <result property="goodsType" column="goods_type" jdbcType="VARCHAR" />
        <result property="manufacturer" column="manufacturer" jdbcType="VARCHAR"/>
        <result property="addDate" column="add_date"/>
        <result property="status" column="status" jdbcType="TINYINT"/>
    </resultMap>
<select id="getGoodsByCondition" resultMap="fiedmap">
        select *
        from my_stock
        <where>
            <choose>
                <when test="id != null and id != ''">
                    id=#{id}
                </when>
                <when test="goodsName != null and goodsName != ''">
                    and goods_name=#{goodsName}
                    <if test="goodsType != null and goodsType != ''">
                        and goods_type=#{goodsType}
                    </if>
                    <if test="manufacturer != null and manufacturer != ''">
                        and manufacturer=#{manufacturer}
                    </if>
                </when>
            </choose>
        </where>
    </select>

最后是售出商品,只是把售出状态改为已售出而已

   <update id="sellGood">
        UPDATE my_stock
        set status = 1 where goods_name = #{goodsName}
          and goods_type = #{goodsType}
          and add_date = #{addDate}
    </update>

jmeter测试

jmeter是啥???我直接问经理,经理告诉我就只是个压力测试工具,很简单的!(经理的口头禅),之前那个实习生10分钟就学会啦。
好吧,先康康咋用的

新建一个http请求,输入地址端口路径和参数,然后回到Thread Group选择线程数和循环数,需求是录入10w,那我就500个线程循环200次吧,之后打开navicat查询select count来看数量有10条,成功。
售出的话其他操作大差不差,但是问题是jmeter里id怎么给5w个?百度:使用csv。好吧,先用navicat查出5w个id并保存为csv,之后将csv存入jmeter(新建CSV Data Set Config,并设置csv文件路径和参数)之后就ok拉。

使用@cacheable注解

第二天,加入redis的缓存机制,在查询的方法上加入

@Cacheable(value = "goodsCache",key="'goods'+#goodsDTO.goodsName")

以及在启动类上加入@EnableCacheing
就实现第一次查询时会同时保存至redis数据库,之后就会先去redis里查数据了,redis作为缓存中间件。
最后经理说,写的不戳,可是你的redis配置在哪呢?
我才突然想起来我忘记配置redis了,可是数据确确实实保存到了redis了啊,我跟经理说:我不写配置就是默认使用本机地址和默认端口。经理说这个我知道,可是你redis依赖都没有呢。
对哦,那为啥会保存到redis了?经理叫我下去研究,我至今都不知道为啥,如果有人知道希望评论告诉我。
至于redis的常用命令建议看redis菜鸟教程

改为纯Redis

第三天。。
增删改查大差不差,就拿查询作为例子吧,简简单单的添加,可需求的联合索引查询条件咋办呢,redis有像mysql这样的联合索引吗,我是没有找到,所以我用了最笨的方法,插入一条数据时是对redis里插入了6条数据(分别对应6个查询条件)
代码:

    /**
     添加商品 */
    @Override
    public int addGoods(GoodsDTO goods){
        HashMap<String,String> goodsmap=new HashMap<>();
        goodsmap.put("id",goods.getId().toString());
        goodsmap.put("goodsName",goods.getGoodsName());
        goodsmap.put("goodsType",goods.getGoodsType());
        goodsmap.put("manufacturer",goods.getManufacturer());
        goodsmap.put("addDate",goods.getAddDate());
        goodsmap.put("Status",goods.getStatus().getStateString());
        try {
            Jedis jedis = new Jedis("localhost", 6379);
            Transaction transaction = jedis.multi();
            transaction.hmset(RedisKey.getGoodsKey(goods.getId().toString()), goodsmap);
            transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName()), goodsmap);
            transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getGoodsType()), goodsmap);
            transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getManufacturer()), goodsmap);
            transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getGoodsType() + goods.getManufacturer()), goodsmap);
            transaction.hmset(RedisKey.getGoodsKey(goods.getGoodsName() + goods.getGoodsType() + goods.getAddDate()), goodsmap);
            transaction.exec();
            log.info("添加完成");
            jedis.close();
            return 1;
        } catch (Exception e){
            log.error("添加事务执行失败",e);
            return 0;
        }
    }

记得使用事务,得保证同时插入6条。
查询是方便了,后果是售出商品的话,如果我售出一个商品,那么需要对6个redis的key修改为已售出。。。。
lambda学是学了,但没找到可以用的地方。

改为kafka

第四天。。
现在不需要查询商品了,首先是生产者添加商品
编写生产者工具类

@Component
@Slf4j
public class ProducerUtil {
    private final KafkaProducer<String,String> kafkaProducer;
    
    public ProducerUtil(){
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        kafkaProducer=new KafkaProducer<>(properties);
    }

    public void produce(String topic,String date){
        kafkaProducer.send(new ProducerRecord<>(topic,date));
        log.info("发送成功");
    }
}

只需要在调接口时,调用工具类的produce方法传入topic和数据就行了
之后是消费者售出商品,经理是让我使用jemeter添加10w条,然后再启一个程序运行来消费5w条,我直接用测试写了

@SpringBootTest
class StockManageApplicationTests {
    @Test
    void contextLoads() {
        String topic = "mytest";

        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_test");
        p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);
        kafkaConsumer.subscribe(Arrays.asList(topic));// 订阅消息
        int count = 1;
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                record.value();
                System.out.println("已消费" + count + "条");
                count++;
            }
        }
    }
}

注意topic要保持一致。
关于提的问题:
问题1:什么是MQ?
回答1:消息队列

问题2:什么是kafka
回答2:分布式发布消息订阅系统

问题3:生产者
回答3:产生消息的个体

问题4:消费者组
回答4:多个消费者实例的组成

问题5:消费者
回答5:订阅消息的个体

问题6:topic
回答6:存放消息,指定消息被放入某一个队列里

问题7:partition
回答7:topic的数据分割为一个或多个的分区

问题8:单播广播
回答8:广播即将消息发送给所有消费者,单播即发送给某一个消费者

问题9:kafka原理。
回答9:生产者发布消息topic到broker上,每条信息追加顺序写入到不同分区,kafka将数据持久化到磁盘,消费者订阅某个topic的消息进行消费。

jwt方案

第五天。。。
首先是了解了分布式系统怎么统一session的问题的解决方案。
第一个是session复制:既一个用户发起请求,每台服务器都保存一个session,不过这样做服务器压力很大。
第二个是存储在客户端的cookie中,这样就可以不用存在服务器啦,不过这个方案缺点很严重,不仅需要用户宽带资源保存,cookie还有大小限制,而且还不安全
第三个是四层代理:通过反向代理用户ip来作hash,以此保证每个用户落在同一个服务器上。不过缺点是如果这台服务器拉闸了就需要用户重新登录了。
第四个是保存到数据库里。利用中间件redis作为缓存。缺点是增加一次网络请求。
最后一个就是本次要使用的方案,用客户端保存session,每次请求的时候传递session到服务端,服务端来验证session是否正确有效。

jwt只有该方案的一种实现方式,而token就是客户端用来登录的凭证。
token由三部分组成
header:定义token的类型以及签名signature的生成算法。
payload:除了jwt声明的exp(token有效期)等其他已经声明的信息外,还可以自己添加一些key,比如用户名,以此来获得发起请求的用户名。
signature:token的签名,请求时以此来判断token是否被修改,登录是否有效。

流程是,用户发起登录请求,服务端返回token给客户端,之后客户端再次发起请求时就会附带token而服务端会验证token判断登录。实际上验证token跟生成token是一个样的,都是生成一个token,只是验证时会判断附带的生成的是否一致。

了解完这些开始敲代码吧
首先是登录接口,就是简单的判断一下用户名密码和token
这是我写的token工具类

/**
  Token工具类 */
@Slf4j
public class TokenUtil {
    //设置过期时间
    private static final long EXPIRE_DATE=10000000;
    //token秘钥
    private static final String TOKEN_SECRET = "ZCfasfhuaUUHufguGuwu2021BQWE";

    public static String token (String userAccount){

        String token = "";
        //过期时间
        Date date = new Date(System.currentTimeMillis()+EXPIRE_DATE);
        //秘钥及加密算法
        Algorithm algorithm = Algorithm.HMAC256(TOKEN_SECRET);
        //设置头部信息
        Map<String,Object> header = new HashMap<>();
        header.put("typ","JWT");
        header.put("alg","HS256");
            //携带userId,username,password信息,生成签名
        token = JWT.create()
                .withHeader(header)
//                    .withClaim("userId",userId)
                .withClaim("userAccount",userAccount)
//                    .withClaim("userName",userName)
                .withExpiresAt(date)
                .sign(algorithm);
        return token;
    }


    /**
      验证方法,验证成功返回用户名 */
    public static String verify(String token) {
        try {
            JWTVerifier jwtVerifier = JWT.require(Algorithm.HMAC256(TOKEN_SECRET)).build();
            DecodedJWT decodedJWT = jwtVerifier.verify(token);
            return decodedJWT.getClaim("userAccount").asString();
        } catch (Exception e) {
            log.error("token验证方法错误",e);
            return "";
        }
    }
}

而在Service层就可以验证一下token

    public ApiResultDTO<String> verify(String token){
        log.info(token);
        if(token==null|| "".equals(token)){
            return ApiResultDTO.error("用户未登录");
        }
        else {
            String userAccount= TokenUtil.verify(token);
            return ApiResultDTO.success("现在的登录用户帐号为:"+userAccount);
        }
    }

nginx使用

来试验分布式案例,nginx作为均衡负载的服务器,分布式环境下还是得学的。
主要就是写nginx的配置文件

upstream api{ #定义自己需要代理的服务器
	server localhost:8080;
	server localhost:8081;
}
server {
	listen   80;  #端口
    server_name  localhost; #地址
    location / {
proxy_pass http://api/; #转向自己定义的api服务器列表
    }

配置好 后,启动nginx,打开自己配置的代理服务器,再启动2个springboot,端口为8080和8081,每次登录成功后就返回端口号,后面验证端口号一直在变,说明nginx代理成功了。

ES学习

第二周的第一天。这个需求给了我2天时间,还算充裕吧。首先是学习Elasticsearch,所以先把问题回答了吧。
问题1:shard分片 replicas副本 index template索引模板 常见数据类型
回答:分片是es中的最小数据单元,由一个index拆分为一个或多个分片,每一个分片都是一个luence索引实例。
副本是每一个主分片的复制。
通过模板定义好了mapping,只要index的名称被模板匹配到,那么该index的mapping就按照模板中定义的mapping自动创建。
text,keyword,integer,long,short,boolean,double,flaot,date

问题2:luence 与 ES的关系
回答:luence是信息检索工具jar包,而ES是基于lucene的搜索引擎。

问题3:ELK是什么以及三个的关系
回答:E是elasticsearch。
L是Logstash,作为ELK的中央数据流引擎,从各个目标收集不同格式的数据,经过过滤后输出到各个目地。
K是kibana,可以将es的数据可视化展示,并提供了分析的功能。

问题4:ES存储 字符串 有哪两种数据类型,这两种的区别以及原理【重要】
text,存储数据的时候会自动分词并生成索引,不支持排序、聚合等。
keyword,存储时不会分词,直接建立索引,支持排序和聚合。

问题5:ES检索(query)与过滤(filter)的区别是什么(本质)(问题3关联)
query在查询时不仅会判断是否包含查询条件,还会判断结果相关匹配程度,_score越高搜索时排名越靠前
而filter只会查询到是或不是满足查询条件的结果。

问题6:DSL基本语法(K)–查询仓库中所有状态为已售出的数据10W(分页 浅?深?)

POST /test/_search?scroll=3m
{
  "query": {
    "term": {
      "Status.keyword": {
        "value": "已售出"
      }
    }
  }
}

问题7:ES分页 浅分页 深分页 区别是什么,怎么用
浅分页指对数据量不大的数据进行分页
from+size的浅分页
scroll深分页
search_after深分页

问题8:bulk\bulk processor是什么,什么时候用(刁钻问题 bulk和bulk processor的区别)
bulk是Elasticsearch的restful语法中的一个方法。
bulk processor是java中ES中的一个类。
批量操作时使用

问题9:官方JAVA CLIENT(highLevel client/lowLevel client 区别)
低级客户端是用户自己处理参数的封装以及结果的解析。
高级客户端基于低级客户端,内部处理参数封装与结果的解析。

改为ES

第二天。
首先是添加商品:

    /**
     添加商品 */
    @Override
    public int addGoods(GoodsDTO goods){
        HashMap<String,String> goodsmap=new HashMap<>();
        goodsmap.put("id",goods.getId().toString());
        goodsmap.put("goodsName",goods.getGoodsName());
        goodsmap.put("goodsType",goods.getGoodsType());
        goodsmap.put("manufacturer",goods.getManufacturer());
        goodsmap.put("addDate",goods.getAddDate());
        goodsmap.put("Status",goods.getStatus().getStateString());
        
        BulkRequest bulkRequest=new BulkRequest();
        bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(goodsmap.remove("id"))).source(goodsmap));

        try {
            restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            return 1;
        } catch (IOException e){
            log.error("插入时错误",e);
            return 0;
        }
    }

这里需要注意下是由于source始终是要保存偶数个值,所以如果是奇数还会报错,所以尽量我都使用map来保存值。

难点是售出商品,由于ES最大查询数量是1w条,所以10w肯定不行,需要用的深分页,这里我选择的深分页方式是scroll。

    /**
     售出商品 */
    @Override
    public int sellGoods(){
    	//首先是得到1w个数据
        SearchRequest searchRequest=new SearchRequest(INDEX);

        SearchSourceBuilder sourceBuilder=new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.matchAllQuery());
        sourceBuilder.from(0);
        sourceBuilder.size(10000);

        searchRequest.source(sourceBuilder);
        searchRequest.scroll(TimeValue.timeValueMinutes(3L));

        SearchResponse searchResponse=null;
        try {
            searchResponse= restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT);
        } catch (IOException e){
            log.error("得到数据时错误",e);
            return 0;
        }
		//然后是售出
        String scrollid=searchResponse.getScrollId();
        SearchHit[] searchHits=searchResponse.getHits().getHits();
        //循环5次,即5w个
        for (int i = 0; i < 5; i++) {
            for (SearchHit row:searchHits
            ) {
                update(row);
            }

            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollid);
            scrollRequest.scroll(SCROLL);
            try {
                searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
            } catch (IOException e){
                log.error("scroll得到数据错误",e);
                return 0;
            }
            scrollid=searchResponse.getScrollId();
            searchHits=searchResponse.getHits().getHits();
        }
        return 1;
    }
    private void update(SearchHit hit){
        UpdateRequest updateRequest=new UpdateRequest(INDEX,hit.getId());
        updateRequest.doc("Status",Status.SOLD.getStateString());
        try {
            restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
        }catch (IOException e){
            log.error("售出时错误",e);
        }
    }

that’s all,all finished!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐