感谢尚硅谷

视频地址:【尚硅谷】Redis 6 入门到精通 超详细 教程_哔哩哔哩_bilibili

一、Redis介绍

  • Redis 是一个开源key-value 存储系统。
  • 和  Memcached 类似,它支持存储的  value 类型相对更多,包括  string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和 hash(哈希类型)。
  • 这些数据类型都支持 push/pop、add/remove 及取交集并集和差集及更丰富的操作, 而且这些操作都是原子性的。
  • 在此基础上,Redis 支持各种不同方式的排序。
  • 与  memcached 一样,为了保证效率,数据都是缓存在内存中。
  • 区别的是 Redis 会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件。
  • 并且在此基础上实现了 master-slave(主从)同步

1、应用场景

1. 配合关系型数据库做高速缓存

  • 高频次,热门访问的数据,降低数据库  IO
  • 分布式架构,做  session 共享

2. 多样的数据结构存储持久化数据

2、NoSQL 数据库概述

NoSQL(NoSQL = Not Only SQL ),意即“不仅仅是  SQL”,泛指非关系型的数据库。 

NoSQL 不依赖业务逻辑方式存储,而以简单的 key-value 模式存储。因此大大的增加了数据库的扩展能力。

  • 不遵循  SQL 标准。
  • 不支持  ACID。
  • 远超于  SQL 的性能。 

NoSQL 适用场景

  • 对数据高并发的读写
  • 海量数据的读写
  • 对数据高可扩展性的

NoSQL 不适用场景

  • 需要事务支持
  • 基于 sql 的结构化查询存储,处理复杂的关系,需要即席查询。
  • (用不着 sql 的和用了 sql 也不行的情况,请考虑用 NoSql)

 

3、相关知识

  • 默认 16 个数据库,类似数组下标从  0 开始,初始默认使用 0 号库
  • 使用命令  select   <dbid>来切换数据库。如: select 8 
  • 统一密码管理,所有库同样密码。
  • dbsize 查看当前数据库的  key 的数量 
  • flushdb 清空当前库
  • flushall 通杀全部库

Redis 是单线程+多路  IO 复用技术

多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用 select 和 poll 函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)

串行  vs 多线程+锁(memcached)vs  单线程+多路  IO 复用(Redis)

(与 Memcache 三点不同: 支持多数据类型,支持持久化,单线程+多路 IO 复用)

 

二、key及常用五大数据类型 

1、Redis 键(key)

keys *
查看当前库所有 key,也可以单个匹配(keys  key) 

exists key
判断某个 key 是否存在,1 表示存在,0表示不存在

type key
查看你的 key 是什么类型

del key
删除指定的 key 数据

unlink key
根据 value 选择非阻塞删除,仅将 keys 从 keyspace 元数据中删除,真正的删除会在后续异步操作。

expire key 数字
为给定的 key 设置过期时间
例如:expire key 10:设置过期时间为10s

ttl key
查看还有多少秒过期,-1 表示永不过期,-2表示已过期

select 数字
命令切换数据库(默认在0数据库)
例如:select 1 :切换到1数据库

dbsize
查看当前数据库的 key 的数量

flushdb
清空当前库

flushall
通杀全部库

2、Redis  字符串(String) 

1)简介

  • String 类型是二进制安全的。意味着 Redis 的 String可以包含任何数据。比如 jpg图片或者序列化的对象。
  • 一个 Redis中字符串 value最多可以是 512M。

2)常用命令

set <key><value>
添加键值对,可以修改已存在的key的值
参数说明:
NX:只有 key 不存在时设置,与XX互斥
XX:仅在 key 存在时设置,可以修改value的值,key如果就没有却设置了此参数则无法创建key-value键值对
EX:key 指定的过期时间,单位 秒
PX:key 指定的过期时间,单位 毫秒,与EX互斥
例如:
# 设置key值为v1 10s 过期
set k1 v1 EX 10
#假设原本的k1值为v1,且key没有过期,则k1值将变为v2
set k1 v2 XX

get <key>
查询对应键值

append <key><value>
将给定的<value> 追加到原值的末尾

strlen <key>
获得值的长度

setnx <key><value>
只有在 key不存在时 设置 key 的值

incr <key>
将key中储存的数字值增1,只能对数字值操作。如果为空,新增值为 1 

decr <key>
将key中储存的数字值减1,只能对数字值操作,如果为空,新增值为-1 

incrby / decrby <key><步长>
将 key中储存的数字值增减。自定义步长
原子性,有一个失败则都失败 

mset <key1><value1><key2><value2> .....
同时设置一个或多个 key-value 对

mget <key1><key2><key3> .....
同时获取一个或多个 value

msetnx <key1><value1><key2><value2> .....
同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在。

Redis本身提供的所有API都是原子操作,所谓原子操作是指不会被线程调度机制打断的操作; 

1)在单线程中, 能够在单条指令中完成的操作都可以认为是"原子操作",因为中断只能发生于指令之间。
2)在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。Redis 单命令的原子性主要得益于 Redis的单线程

getrange <key><起始位置><结束位置>
获得值的范围,类似 java中的substring。前包,后包
#例如
get k1 //qwerty
getrange k1 1 3 //wer 

setrange <key><起始位置><value>
用 <value> 覆写<key>所储存的字符串值,从<起始位置>开始(索引从 0 开始)。

setex <key><过期时间><value>
设置键值的同时,设置过期时间,单位秒。 

getset <key><value>
以新换旧,设置了新值同时获得旧值。 

3)数据结构 

String 的数据结构为简单动态字符串,是可以修改的字符串,内部结构实现上类似于 Java 的 ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。

如图中所示,内部为当前字符串实际分配的空间 capacity一般要高于实际字符串长度len。当字符串长度小于 1M时,扩容都是加倍现有的空间,如果超过 1M,扩容时一次只会多扩 1M的空间。需要注意的是字符串最大长度为 512M。 

3、Redis列表(List)

1)简介

单键多值

Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。

它的底层实际是个双向链表对两端的操作性能很高,通过索引下标的操作中间的节
点性能会较差。

2)常用命令

lpush/rpush <key><value1><value2><value3> ....
从左边/右边插入一个或多个值。 

lpop/rpop <key>
从左边/右边吐出一个值。值在键在,值光键亡。 

rpoplpush <key1><key2>
从<key1>列表右边吐出一个值,插到<key2>列表左边。 

lrange <key><start><stop>
按照索引下标获得元素(从左到右)

lrange mylist 0 -1
0 左边第一个,-1 右边第一个,(0 -1 表示获取所有)

lindex <key><index>
按照索引下标获得元素(从左到右)

llen <key>
获得列表长度

linsert <key> before <value><newvalue>
在<value>的后面插入<newvalue>插入值

lrem <key><n><value>
从左边删除 n 个 value(从左到右)

lset<key><index><value>
将列表 key 下标为 index 的值替换成 value 

3)数据结构

List 的数据结构为快速链表 quickList

首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是 ziplist,也即是压缩列表, 它将所有的元素紧挨着一起存储,分配的是一块连续的内存。

当数据量比较多的时候才会改成 quicklist。

因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是 int类型的数据,结构上还需要两个额外的指针 prev和 next。

Redis将链表和 ziplist 结合起来组成了 quicklist。也就是将多个 ziplist 使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。  

4、Redis集合(Set)

1)简介

当你需要存储一个列表数据,又不希望出现重复数据时,set 是一个很好的选择,并且 set提供了判断某个成员是否在一个 set集合内的重要接口,这个也是 list所不能提供的。

Redis 的 Set 是 string类型的无序集合。它底层其实是一个 value 为 null 的 hash表,所以添加,删除,查找的复杂度都是 O(1)。

数据增加,查找数据的时间不变。

2)常用命令

sadd <key><value1><value2> .....
将一个或多个 member 元素加入到集合 key 中,已经存在的 member 元素将被忽略

smembers <key>
取出该集合的所有值。

sismember <key><value>
判断集合<key>是否为含有该<value>值,有 1,没有 0

scard<key>
返回该集合的元素个数。

srem <key><value1><value2> ....
删除集合中的某个元素。

spop <key>
随机从该集合中吐出一个值。

srandmember <key><n>
随机从该集合中取出 n 个值。不会从集合中删除 。

smove <source><destination><value>
把集合中一个值从一个集合移动到另一个集合

sinter <key1><key2>
返回两个集合的交集元素。

sunion <key1><key2>
返回两个集合的并集元素。

sdiff <key1><key2>
返回两个集合的差集元素(key1 中的,不包含 key2中的)

3)数据结构

Set 数据结构是 dict字典,字典是用哈希表实现的。

Java 中 HashSet 的内部实现使用的是 HashMap,只不过所有的 value 都指向同一个对象。Redis的 set结构也是一样,它的内部也使用 hash结构,所有的 value都指向同一个内部值。

5、Redis哈希(Hash) 

1)简介

Redis hash 是一个 string 类型的 field 和 value 的映射表,hash 特别适合用于存储对象。

类似 Java 里面的 Map<String,Object>,用户 ID 为查找的 key,存储的 value用户对象包含姓名,年龄,生日等信息。

如果用普通的 key/value结构来存储,主要有以下 2 种存储方式:

方式一:每次修改用户的某个属性需要,先反序列化改好后再序列化回去。开销较大。

方式二:用户 ID数据冗余 

使用hash后:通过 key(户 用户 ID) + field( 属性标签)  就可以操作对应属性数据了,既不需要重复存储数据,也不会带来序列化和并发修改控制的问题 

2)常用命令

hset <key><field><value>
给<key>集合中的 <field>键赋值<value>

hget <key1><field>
从<key1>集合<field>取出 value

hmset <key1><field1><value1><field2><value2>...
批量设置 hash 的值

hexists<key1><field>
查看哈希表 key 中,给定域 field 是否存在。

hkeys <key>
列出该 hash 集合的所有 field

hvals <key>
列出该 hash 集合的所有 value

hincrby <key><field><increment>
为哈希表 key 中的域 field 的值加上增量 1 -1

hsetnx <key><field><value>
将哈希表 key 中的域 field 的值设置为 value ,当且仅当域field 不存在。

3)数据结构

Hash 类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value 长度较短且个数较少时,使用 ziplist,否则使用 hashtable。 

6、Redis有序集合 Zset(sorted set)

1)简介

有序集合的每个成员都关联了一个评分(score),这个评分被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复的 。

因为元素是有序的, 所以你也可以很快的根据评分或者次序来获取一个范围的元素。

访问有序集合的中间元素也是非常快的,因此你能够使用有序集合作为一个没有重复成员的智能列表。

2)常用命令

zadd <key><score1><value1><score2><value2>…
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。

zrange <key><start><stop> [WITHSCORES]
返回有序集 key 中,下标在<start><stop>之间的元素。
带 WITHSCORES,可以让分数一起和值返回到结果集。

zrangebyscore key minmax [withscores] [limit offset count]
返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。
有序集成员按 score 值递增(从小到大)次序排列。

zrevrangebyscore key maxmin [withscores] [limit offset count]
同上,改为从大到小排列。

zincrby <key><increment><value>
为元素的 score加上增量

zrem <key><value>
删除该集合下,指定值的元素

zcount <key><min><max>
统计该集合,分数区间内的元素个数

zrank <key><value>
返回该值在集合中的排名,从 0 开始。

案例:如何利用 zset 实现一个文章访问量的排行榜? 

3)数据结构

zset 一方面它等价于 Java的数据结构 Map<String, Double>,可以给每一个元素 value 赋予一个权重 score,另一方面它又类似于 TreeSet,内部的元素会按照权重 score 进行排序,可以得到每个元素的名次,还可以通过 score 的范围来获取元素的列表。

zset 底层使用了两个数据结构

  • hash:hash 的作用就是关联元素 value 和权重 score,保障元素 value 的唯一性,可以通过元素 value 找到相应的 score 值。
  • 跳跃表:跳跃表的目的在于给元素 value 排序,根据 score 的范围获取元素列表。 

4)跳跃表(跳表) 

1. 简介

数组不便元素的插入、删除;平衡树或红黑树虽然效率高但结构复杂;链表查询需要遍历所有效率低。Redis采用的是跳跃表。跳跃表效率堪比红黑树,实现远比红黑树简单。

2. 实例

对比有序链表和跳跃表,从链表中查询出 51

(1) 有序链表

要查找值为 51 的元素,需要从第一个元素开始依次查找、比较才能找到。共需要 6 次比较。
(2) 跳跃表

从第 2 层开始,1 节点比 51 节点小,向后比较。
21 节点比 51 节点小,继续向后比较,后面就是 NULL 了,所以从 21 节点向下到第 1 层。
在第 1 层,41 节点比 51 节点小,继续向后,61 节点比 51 节点大,所以从 41 向下。
在第 0 层,51 节点为要查找的节点,节点被找到,共查找 4 次。

从此可以看出跳跃表比有序链表效率要高

三、Redis配置文件介绍

1、Units 单位

配置大小单位,开头定义了一些基本的度量单位,只支持 bytes,不支持 bit

大小写不敏感

2、INCLUDES 包含

类似 jsp 中的 include,多实例的情况可以把公用的配置文件提取出来

3、网络相关配置

3.1 bind

默认情况 bind=127.0.0.1 只能接受本机的访问请求

不写的情况下,无限制接受任何 ip 地址的访问

如果开启了 protected-mode,那么在没有设定 bind ip且没有设密码的情况下,Redis只允许接受本机的响应

保存配置,停止服务,重启启动查看进程,不再只是本机可以访问了。

3.2 protected-mode

将本机访问保护模式设置 no

3.3 Port

端口号,默认 6379

3.4  tcp-backlog

设置 tcp 的 backlog,backlog其实是一个连接队列,backlog队列总和=未完成三次握手队列 + 已经完成三次握手队列。

在高并发环境下你需要一个高 backlog 值来避免慢客户端连接问题。

3.5 timeout

一个空闲的客户端维持多少秒会关闭,0 表示关闭该功能。即永不关闭。 

3.6 tcp- - keepalive

对访问客户端的一种心跳检测,每隔 n 秒检测一次。

单位为秒,如果设置为 0,则不会进行 Keepalive检测,建议设置成 60 

4、GENERAL  通用

4.1 daemonize

是否为后台进程,设置为 yes

守护进程,后台启动

4.2 pidfile

存放 pid 文件的位置,每个实例会产生一个不同的 pid文件 

4.3 loglevel

指定日志记录级别,Redis 总共支持四个级别:debug、verbose、notice、warning,默认为notice

四个级别根据使用阶段来选择,生产环境选择 notice 或者 warning 

4.4 logfile

日志文件名称 

4.5 databases 16

设定库的数量默认16,默认数据库为 0,可以使用 SELECT <dbid>命令在连接上指定数据库 id

5、LIMITS 限制 

5.1 maxclients

  •  设置 redis 同时可以与多少个客户端进行连接。 
  • 如果达到了此限制,redis 则会拒绝新的连接请求,并且向这些连接请求方发出“max number of clients reached”以作回应。

 5.2 maxmemory

  • 建议必须设置,否则,将内存占满,造成服务器宕机
  • 设置 redis 可以使用的内存量。一旦到达内存使用上限,redis将会试图移除内部数据,移除规则可以通过 maxmemory-policy 来指定。
  • 如果 redis 无法根据移除规则来移除内存中的数据,或者设置了“不允许移除”,那么 redis则会针对那些需要申请内存的指令返回错误信息,比如 SET、LPUSH等。
  • 但是对于无内存申请的指令,仍然会正常响应,比如 GET 等。如果你的 redis是主redis(说明你的 redis有从 redis),那么在设置内存使用上限时,需要在系统中留出一些内存空间给同步队列缓存,只有在你设置的是“不移除”的情况下,才不用考虑这个因素。 

 5.3 maxmemory-policy

  • volatile-lru:使用 LRU算法移除 key,只对设置了过期时间的键;(最近最少使用)
  • allkeys-lru:在所有集合 key中,使用 LRU算法移除 key
  • volatile-random:在过期集合中移除随机的 key,只对设置了过期时间的键
  • allkeys-random:在所有集合 key中,移除随机的 key
  • volatile-ttl:移除那些 TTL值最小的 key,即那些最近要过期的 key
  • noeviction:不进行移除。针对写操作,只是返回错误信息

5.4 maxmemory-samples

  • 设置样本数量,LRU 算法和最小 TTL算法都并非是精确的算法,而是估算值,所以你可以设置样本的大小,redis 默认会检查这么多个 key 并选择其中 LRU 的那个。
  • 一般设置 3 到 7的数字,数值越小样本越不准确,但性能消耗越小。 

四、Redis 的发布和订阅

redis客户端可以订阅任意数量的频道

当给这个频道发布消息后,消息就会发送给订阅的客户端


发布订阅命令行实现

1、 打开一个客户端订阅 channel1


2、打开另一个客户端,给 channel1 发布消息 hello

返回的 1 是订阅者数量

3、打开第一个客户端可以看到发送的消息

 注:发布的消息没有持久化,只能收到订阅后发布的消息

五、Redis事务

1、Redis 事务定义

Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis 事务的主要作用就是串联多个命令防止别的命令插队

2、Multi 、Exec 、discard

从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。

组队的过程中可以通过 discard 来放弃组队。

案例1

组队成功,提交成功 

案例2

 组队阶段报错,提交失败

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。

案例3

组队成功,提交有成功有失败情况,由于v1为字符串类型,incr执行失败。 

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

3、乐观锁

乐观锁,顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis 就是利用这种 check-and-set机制实现事务的。

4、事务冲突的例子

watch  key  [key ...]

在执行 multi 之前,先执行 watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

unwatch

取消 WATCH 命令对所有 key 的监视。

如果在执行 WATCH 命令之后,EXEC 命令或 DISCARD 命令先被执行了的话,那么就
不需要再执行 UNWATCH 了

案例 

客户端1> set k1 100
客户端1> watch k1

客户端2> watch k1 

客户端1> multi
客户端1> incrby k1 10

客户端2> multi
客户端2> incrby k1 20

客户端1> exec  #修改成功
客户端2> exec  #修改失败

该案例验证了redis使用的是乐观锁实现事务

5、Redis事务三特性

单独的隔离操作

  • 事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

没有隔离级别的概念

  • 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

不保证原子性

  • 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

六、Redis持久化

1、Redis持久化之 RDB

1)是什么

在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的 Snapshot 快照,它恢复时是将快照文件直接读到内存里。

2)备份是如何执行的

Redis 会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何 IO 操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那 RDB方式要比 AOF方式更加的高效。

RDB 的缺点是最后一次持久化后的数据可能丢失。

3)Fork

  •   Fork 的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程
  •  在 Linux 程序中,fork()会产 生一个和父进程完全相同的子进程,但子进程在此后多会 exec系统调用,出于效率考虑,Linux中引入了“写时复制技术
  •  一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。

4)RDB 持久化流程

5)dump.rdb  文件

在 redis.conf 中配置文件名称,默认为 dump.rdb 

配置位置

rdb 文件的保存路径,也可以修改。默认为 Redis 启动时命令行所在的目录下 

注意: ./ 是相对目录 

6)如何触发 RDB 

6.1配置文件中默认的快照配置

格式:save 秒钟 写操作次数

默认是 1 分钟内改了 1 万次,或 5分钟内改了10  次,或 15分钟 内改了 1 次触发持久化。

如何禁用save?

不设置 save 指令,或者给 save 传入空字符串

6.2 命令save VS bgsave

save :save时只管保存,其它不管,全部阻塞。手动保存。不建议。
bgsave:Redis 会在后台异步进行快照操作, 快照同时还可以响应客户端请求。
可以通过 lastsave 命令获取最后一次成功执行快照的时间

6.3 flushall 命令

执行 flushall 命令,也会产生 dump.rdb文件,但里面是空的,无意义

6.4 stop-writes-on-bgsave-error

当 Redis 无法写入磁盘的话,直接关掉 Redis的写操作。推荐 yes.

6.5 rdbcompression 压缩文件 

对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis 会采用LZF 算法进行压缩。

如果你不想消耗 CPU 来进行压缩的话,可以设置为关闭此功能。推荐 yes. 

6.6 rdbchecksum 检查完整性

 在存储快照后,还可以让 redis 使用 CRC64算法来进行数据校验,但是这样做会增加大约 10%的性能 消耗,如果希望获取到最大的性能提升,可以关闭此功能。推荐 yes

7)rdb  的备份

先通过 config get dir 查询 rdb文件的目录,将*.rdb 的文件拷贝到别的地方

rdb 的恢复

  • 关闭 Redis
  • 把备份的文件拷贝到工作目录下 cp dump2.rdb dump.rdb
  • 启动 Redis, 备份数据会直接加载 

8)如何停止RDB

动态停止 RDB:redis-cli config set save ""(save 后给空值,表示禁用保存策略)

9)优势

  •   适合大规模的数据恢复
  •   对数据完整性和一致性要求不高更适合使用
  •   节省磁盘空间
  •   恢复速度快

10)劣势

  • Fork 的时候,内存中的数据被克隆了一份,大致 2倍的膨胀性需要考虑
  • 虽然 Redis 在 fork 时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
  • 在备份周期在一定间隔时间做一次备份,所以如果 Redis 意外 down 掉的话,就会丢失最后一次快照后的所有修改。 

11)总结

2、Redis之持久化之 AOF

1)是什么

以日志的形式来记录每个写操作(增量保存),将 Redis 执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis 启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

2)AOF  持久化流程

(1)客户端的请求写命令会被 append 追加到 AOF 缓冲区内;
(2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作同步到磁盘的AOF文件中;
(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF 文件容量;(4)Redis 服务重启时,会重新 load 加载 AOF 文件中的写操作达到数据恢复的目的;

3)AOF  默认不开启

可以在 redis.conf 中配置文件名称,默认为 appendonly.aof

AOF 文件的保存路径,同 RDB的路径一致。

4)AOF和RDB 同时开启,redis 听谁的?

AOF 和 RDB 同时开启,系统默认取 AOF 的数据(数据不会存在丢失)

5)AOF启动/ 修复/ 恢复

AOF 的备份机制和性能虽然和 RDB不同, 但是备份和恢复的操作同 RDB一样,都是拷贝备份文件,需要恢复时再拷贝到 Redis 工作目录下,启动系统即加载。

正常恢复

  • 修改默认的 appendonly no,改为 yes
  • 将有数据的 aof 文件复制一份保存到对应目录(查看目录:config get dir)                                          
  • 重启 redis 然后重新加载

 异常恢复

  • 修改默认的 appendonly no,改为 yes
  • 如遇到 AOF 文件损坏,通过/usr/local/bin/redis-check-aof--fix appendonly.aof 进行恢复被写坏的 AOF 文件
  • 重启 redis,然后重新加载

6)AOF  同步频率设置

appendfsync always

  • 始终同步,每次 Redis 的写入都会立刻记入日志;性能较差但数据完整性比较好

appendfsync everysec

  • 每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。

appendfsync no

  • redis 不主动进行同步,把同步时机交给操作系统

7)优势

  •  备份机制更稳健,丢失数据概率更低。
  • 可读的日志文本,通过操作 AOF 稳健,可以处理误操作。

8)劣势

  •   比起 RDB 占用更多的磁盘空间。
  •   恢复备份速度要慢。
  •   每次读写都同步的话,有一定的性能压力。
  •   存在个别 Bug,造成恢复不能。

 9)总结

3、总结(Which one)

1)用哪个好

官方推荐两个都启用。

  • 如果对数据不敏感,可以选单独用 RDB。
  • 不建议单独用 AOF,因为可能会出现 Bug。
  • 如果只是做纯内存缓存,可以都不用。

2)官网建议

  •  RDB 持久化方式能够在指定的时间间隔能对你的数据进行快照存储
  •  AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF 命令以 redis 协议追加保存每次写的操作到文件末尾,Redis 还能对 AOF 文件进行后台重写,使得 AOF 文件的体积不至于过大
  • 只做缓存:如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化方式。
  • 同时开启两种持久化方式,在这种情况下,当 redis 重启的时候会优先载入 AOF 文件来恢复原始的数据,因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集要完整。
  • RDB 的数据不实时,同时使用两者时服务器重启也只会找 AOF 文件。建议不要只使用 AOF,因为 RDB 更适合用于备份数据库(AOF在不断变化不好备份), 快速重启,而且不会有 AOF可能潜在的 bug,留着作为一个万一的手段。

性能建议

  • 因为 RDB文件只用作后备用途,建议只在 Slave 上持久化 RDB文件,而且只要 15分钟备份一次就够了,只保留 save 900 1 这条规则。
  • 如果使用 AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只 load自己的 AOF文件就可以了。代价,一是带来了持续的 IO,二是 AOF rewrite 的最后将 rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。
  • 只要硬盘许可,应该尽量减少 AOF rewrite 的频率,AOF重写的基础大小默认值 64M太小了,可以设到 5G 以上。
  • 默认超过原大小 100%大小时重写可以改到适当的数值。

七、Redis主从复制

1、介绍

数据更新后根据配置和策略, 自动同步到备机的 master/slaver 机制,Master 以写为主,Slave 以读为主。

可以实现:

  • 读写分离,性能扩展
  • 容灾快速恢复

2、实现一主二仆

在原始redis.conf配置文件中开启 daemonize yes

1)新建 redis6379.conf,填写以下内容

说明:include /myredis/redis.conf

将/myredis下的redis.conf配置文件的内容引入redis6379.conf中

2)新建 redis6380.conf,填写以下内容

3)新建 redis 6381.conf,填写以下内容

slave-priority 10

设置从机的优先级,值越小,优先级越高,用于选举主机时使用。默认 100

4)启动三台 redis 服务器

5)查看系统进程,看看三台服务器是否启动  

6)查看三台主机运行情况 

info replication
打印主从复制的相关信息

7)配从不配主 

slaveof <ip><port>
成为某个实例的从服务器

① 在 6380 和 6381 上执行:  slaveof 127.0.0.1 6379

 ② 在主机上写,在从机上可以读取数据

  在从机上写数据报错

③ 主机挂掉,重启就行,一切如初,从机还是从机

④ 从机重启需重设:slaveof 127.0.0.1 6379

可以将配置增加到文件中。永久生效。

3、薪火相传

上一个 Slave 可以是下一个 slave的 Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该 slave 作为了链条中下一个的 master, 可以有效减轻 master的写压力,去中心化降低风险。

  • 用 slaveof <ip><port>
  • 中途变更转向:会清除之前的数据,重新建立拷贝最新的
  • 风险是一旦某个 slave 宕机,后面的 slave 都没法备份
  • 主机挂了,从机还是从机,无法写数据了

4、反客为主

当一个 master 宕机后,后面的 slave可以立刻升为 master,其后面的 slave 不用做任何修改。

 slaveof no one  将从机变为主机。

注意:哨兵模式是反客为主的自动版 

5、复制原理

  • Slave 启动成功连接到 master后会发送一个 sync命令
  •  Master 接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,master 将传送整个数据文件到 slave,以完成一次完全同步                   
  • 全量复制:而 slave 服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  • 增量复制 :Master 继续将新的所有收集到的修改命令依次传给 slave,完成同步
  • 但是只要是重新连接 master,一次完全同步(全量复制)将被自动执行

八、Redis哨兵模式

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

搭建步骤

1. 调整为一主二仆模式,6379 带着 6380 、6381

2. 自定义的/myredis 目录下新建 sentinel.conf 文件,名字绝不能错

3. 配置哨兵, , 填写内容

sentinel  monitor mymaster 127.0.0.1 6379 1

其中 mymaster 为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。

4. 启动哨兵

执行 redis-sentinel /myredis/sentinel.conf

5. 当主机挂掉,从机选举中产生新的主机

大概 10 秒左右可以看到哨兵窗口日志,切换了新的主机

哪个从机会被选举为主机呢?根据优先级别:slave-priority

原主机重启后会变为从机。

复制延时

由于所有的写操作都是先在 Master 上操作,然后同步更新到 Slave上,所以从 Master同步到 Slave 机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。

故障恢复

  • 优先级在 redis.conf 中默认:slave-priority 100,值越小优先级越高
  • 偏移量是指获得原主机数据最全的
  • 每个 redis 实例启动后都会随机生成一个 40位的 runid

九、Redis 缓存穿透,缓存击穿与缓存雪崩

1、缓存穿透

key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

解决方案

一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

  •  对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
  •  设置可访问的名单(白名单):使用 bitmaps 类型定义一个可以访问的名单,名单 id 作为 bitmaps 的偏移量,每次访问和 bitmap 里面的 id 进行比较,如果访问 id 不在 bitmaps 里面,进行拦截,不允许访问。
  • 采用布隆过滤器:(布隆过滤器(Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。将所有可能存在的数据哈希到一个足够大的 bitmaps 中,一个一定不存在的数据会被 这个 bitmaps 拦截掉,从而避免了对底层存储系统的查询压力。
  • 进行实时监控:当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

2、缓存击穿

key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期 DB 加载数据并回设到缓存一般都会从后端,这个时候大并发的请求可能会瞬间把后端 DB 压垮。 

解决方案

key 可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

  • 预先设置热门数据:在 redis 高峰访问之前,把一些热门数据提前存入到redis 里面,加大这些热门数据 key 的时长
  • 实时调整:现场监控哪些数据热门,实时调整 key 的过期时长
  • 使用锁:

(1) 就是在缓存失效的时候(判断拿出来的值为空),不是立即去 load db。

(2) 先使用缓存工具的某些带成功操作返回值的操作(比如 Redis 的 SETNX)去 set 一个 mutex key;

(3) 当操作返回成功时,再进行 load db 的操作,并回设缓存,最后删除 mutex key;
(4) 当操作返回失败,证明有线程在 load db,当前线程睡眠一段时间再重试整个 get 缓存的方法。

3、缓存雪崩

key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。

缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个 key 

正常访问

缓存失效瞬间 

解决方案

缓存失效时的雪崩效应对底层系统的冲击非常可怕!

  • 构建多级缓存架构:nginx 缓存 + redis 缓存 +其他缓存(ehcache 等)
  • 使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
  • 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际 key 的缓存。
  •  将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

十、SpringBoot整合redis

 1、pom.xml

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、yml文件

spring:
  redis:
    host: 192.168.16.106
    database: 1
    port: 6379

3、创建实体类(要实现序列化接口,否则无法存入redis)

@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class User implements Serializable {
    private String name;
    private int age;
}

4、config配置文件,直接引入即可,根据情况可修改

存入redis的乱码问题配置文件中解决了(起初我用默认的redisTemplate,出现了乱码问题)

@EnableCaching
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template =  new RedisTemplate<>();
        RedisSerializer<String> redisSerializer =  new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer =  new
                Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om =  new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL , JsonAutoDetect.Visibility.ANY );
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL );
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key 序列化方式
        template.setKeySerializer(redisSerializer);
        //value 序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap 序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }
    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer =  new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer =  new
                Jackson2JsonRedisSerializer(Object. class);
        //解决查询缓存转换异常的问题
        ObjectMapper om =  new ObjectMapper();
        om.setVisibility(PropertyAccessor. ALL , JsonAutoDetect.Visibility. ANY );
        om.enableDefaultTyping(ObjectMapper.DefaultTyping. NON_FINAL );
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间 600 秒
        RedisCacheConfiguration config =
                RedisCacheConfiguration.defaultCacheConfig ()
                        .entryTtl(Duration.ofSeconds (600))
                        .serializeKeysWith(RedisSerializationContext.SerializationPair
                                .fromSerializer (redisSerializer))
                        .serializeValuesWith(RedisSerializationContext.SerializationPair
                                .fromSerializer (jackson2JsonRedisSerializer))
                        .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder (factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}

5、创建启动类

@SpringBootApplication
public class RedisSpringbootApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisSpringbootApplication.class, args);
    }
}

6、具体操作

set操作

@RestController
public class UserController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/set")
    public void set(){
        User user = new User("张三", 12);
        redisTemplate.opsForValue().set("user",user);
    }

}

get操作

 @GetMapping("/get/{key}")
 public String get(@PathVariable String key){
      Object value = redisTemplate.opsForValue().get(key);
      return value.toString();
 }

delete操作

@GetMapping("delete/{key}")
public void delete(@PathVariable String key){
     Boolean hasKey = redisTemplate.hasKey(key);
     if(hasKey){
        redisTemplate.delete(key);
     }
}

列表类型

@GetMapping("list")
public String list(){
    ListOperations listOperations = redisTemplate.opsForList();
    //从左边添加
    listOperations.leftPush("list","left1");
    listOperations.leftPush("list","left2");
    //从右边添加
    listOperations.rightPush("list", "right1");
    listOperations.rightPush("list", "right2");

    //利用下标来取数据,这里取left和left1
    List<String> list = listOperations.range("list", 0, 3);
    return list.toString();
}

集合类型

@GetMapping("/set")
public void setTest(){
    SetOperations<String, String> setOperations = redisTemplate.opsForSet();
    setOperations.add("k1", "v1");
    setOperations.add("k1", "v1");
    setOperations.add("k2", "v1");
    setOperations.add("k2", "v2");
    Set<String> k1 = setOperations.members("k1");
    Set<String> k2 = setOperations.members("k2");
    System.out.println(k1);
    System.out.println(k2);
}

有序集合类型

@GetMapping("/zset")
public void zsetTest(){
    ZSetOperations<String, String> zSetOperations = redisTemplate.opsForZSet();
    zSetOperations.add("k2", "v1", 3);
    zSetOperations.add("k2", "v2", 2);
    zSetOperations.add("k2", "v3", 1);

    Set<String> zset = zSetOperations.range("k2", 0, 2);
    System.out.println(zset);
}

 

hash类型 

@GetMapping("/hash")
public void hashTest(){
    HashOperations<String, String, String> hashOperations = redisTemplate.opsForHash();
    hashOperations.put("k3","hashKey1","v1");
    hashOperations.put("k3","hashKey2","v2");
    String s1 = hashOperations.get("k3", "hashKey1");
    String s2 = hashOperations.get("k3", "hashKey2");
    System.out.println(s1);
    System.out.println(s2);
}

 

十一、Redis实现分布式锁

1、问题描述

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁主流的实现方案:

  1. 基于数据库实现分布式锁
  2. 基于缓存(Redis 等)
  3. 基于 Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  1. 性能:redis 最高
  2. 可靠性:zookeeper 最高

2、实现方式

  • EX second :设置键的过期时间为 second 秒。
  • SET key value EX second 效果等同于SETEX key second value 。
  • NX :只在键不存在时,才对键进行设置操作。
  • SET key value NX 效果等同于 SETNX key value 。

  1. 多个客户端同时获取锁(setnx)
  2. 获取成功,执行业务逻辑{从 db 获取数据,放入缓存},执行完成释放锁(del)
  3. 其他客户端等待重试 
@GetMapping("testLock")
public void testLock(){
    //1 获取锁,setne
    Boolean lock =  redisTemplate.opsForValue().setIfAbsent( "lock",  "111");

    //2 获取锁成功、查询 num 的值
    if(lock){
        Object value =  redisTemplate.opsForValue().get( "num");
        int num = Integer.parseInt(value+ "");
        redisTemplate.opsForValue().set("num", ++num);
        //释放锁
        redisTemplate.delete( "lock");
    } else{
    //3 获取锁失败、每隔 0.1 秒再获取
        try {
            Thread. sleep (100);
            testLock();
        }  catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

重启,服务集群,通过redis自带的网关压力测试:

ab -n 1000 -c 100 http://192.168.140.1:8080/test/testLock

-n 1000次

-c 100个并发 

查看 redis 中 num 的值: 

问题:setnx 刚好获取到锁,业务逻辑出现异常,导致锁无法释放

3、优化之设置锁的过期时间

设置过期时间有两种方式

1. 首先想到通过 expire 设置过期时间(缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)

setnx k1 v1
expire k1 100

2. 在 set 时指定过期时间(推荐)

set k1 v1 EX 100 NX

 

 场景:如果业务逻辑的执行时间是 7s。执行流程如下

  1. index1 业务逻辑没执行完,3 秒后锁被自动释放。
  2. index2 获取到锁,执行业务逻辑,3 秒后锁被自动释放。
  3. index3 获取到锁,执行业务逻辑
  4. index1 业务逻辑执行完成,开始调用 del 释放锁,这时释放的是 index3 的锁,

导致 index3 的业务只执行 1s 就被别人释放。最终等于没锁的情况。

4、优化之 UUID 防误删

场景:

  1. index1 执行删除时,查询到的 lock 值确实和 uuid 相等
  2. index1 执行删除前,lock 刚好过期时间已到,被 redis 自动释放
  3. index2 获取了 lock
  4. index1 执行删除,此时会把 index2 的 lock 删除

问题:删除操作缺乏原子性。

5、优化之 LUA 脚本保证删除的原子性 

@GetMapping( "testLockLua")
public void testLockLua() {
    String uuid = UUID. randomUUID ().toString();
    //定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
    String skuId =  "25";  // 访问 skuId 为 25 号的商品 100008348542
    String locKey =  "lock:" + skuId;  // 锁住的是每个商品的数据
    Boolean lock =  redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit. SECONDS );
    if (lock) {
        Object value =  redisTemplate.opsForValue().get( "num");
        int num = Integer. parseInt (value +  "");
        redisTemplate.opsForValue().set( "num", String. valueOf (++num));
        // 定义 lua 脚本
        String script =  "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用 redis 执行 lua 执行
        DefaultRedisScript<Long> redisScript =  new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为 Long
        // 因为删除判断的时候,返回的 0,给其封装为数据类型。如果不封装那么默认返回 String 类型,
        // 那么返回字符串与 0 会有发生错误。
        redisScript.setResultType(Long. class);
        // 第一个script 脚本 ,第二个是 key,第三个就是 key 所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList (locKey), uuid);
    }  else {
        try {
            Thread. sleep (1000);
            testLockLua();
        }  catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Lua 脚本详解:  

6、总结 

三步骤

  1. 加锁
  2. 使用 lua 释放锁
  3. 失败重试

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
  • 加锁和解锁必须具有原子性。
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐