redis概述

redis是什么?

Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

 免费和开源!是当下最热门的NoSQL技术之一!

Redis能干嘛?

1、内存存储、持久化,内存中数据是断电即失的,持久化很重要(rdb、aof)

2、效率高,可以用于高速缓存

3、发布订阅系统

4、地图信息发布

5、计时器、计数器(浏览量!)

6、......

特性

1、多样的数据类型

2、持久化

3、集群

4、事务

......

学习方式

1、官网 :https://redis.io/

2、中文官网:http://www.redis.cn/   

3、下载地址:在官网上下载(windows版本的在GitHub上下载)

windows安装

1、下载安装包:https://github.com/MicrosoftArchive/redis/releases

2、解压下载的压缩包到电脑的环境下就行了

 4、开启redis,双击redis-cli.exe即可!

Linux安装(源码安装)

1、官网下载最新的安装包

2、将下载的安装包上传至Linux服务器,使用tar命令解压redis安装包,进入redis解压目录

3、安装基本环境gcc、g++

4、使用make && make install 命令进行编译和安装

5、修改redis的redis.config文档

### 请参考以下内容,配置redis.conf文件中,相关参数值,如下:

###bind 127.0.0.1 -::1 只有本机可以访问

bind * -::* #允许所有的访问

####protected-mode yes 默认为yes,受保护状态

protected-mode no

# Accept connections on the specified port, default is 6379 (IANA #815344).

# If port 0 is specified Redis will not listen on a TCP socket.

port 6379

# Close the connection after a client is idle for N seconds (0 to disable)

#timeout 0 

timeout 120

###daemonize no 是否允许后台运行 默认为no

daemonize yes

# dbid is a number between 0 and 'databases'-1

databases 16

# warning (only very important / critical messages are logged) 日志记录等级

loglevel notice

# output for logging but daemonize, logs will be sent to /dev/null

logfile "logs.log"

#requirepass 设置密码,默认没有密码

requirepass Wlkj@12345

# maxclients 10000 设置最大连接数

maxclients 2000

# maxmemory <bytes>

maxmemory 2GB   ### 请根据实际内存大小来设置

# maxmemory-policy noeviction

maxmemory-policy volatile-lru

6、启动redis ,进入redis目录下的src目录执行./redis-server ../redis.conf

7、使用redis-cli连接redis,./redis-cli -h 127.0.0.1 -p 6379

[root@localhost src]# ./redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set word hello
OK
127.0.0.1:6379> get word
"hello"

性能测试

 redis-benchmark

基本参数,图片来自菜鸟教程:

 我们来简单测试下:

测试:100个并发连接10000请求
redis-benchmark -h 127.0.0.1 -p 6379 -c 100 -n 10000

如何查看这些分析?

 

 基础知识

redis默认有16个数据库,查看redis.conf可以看到

 默认使用第0个数据库

可以使用select进行数据库切换

 127.0.0.1:6379> select 3 #切换数据库
OK
127.0.0.1:6379[3]> dbsize #查看数据库大小
(integer) 0
127.0.0.1:6379[3]> keys * #查看数据库所有的key
1) "name"
127.0.0.1:6379[3]> flushdb #清空当前数据库
OK
127.0.0.1:6379[3]> keys *
(empty array)
127.0.0.1:6379[3]> flushall #清空所有数据库

Redis是单线程的!

明白Redis是很快的,官方表示,Redis是基于内存操作,CPU不是Redis性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程来实现,就使用单线程了!所有就使用了单线程了!
Redis 是C语言写的,官方提供的数据为100000+的QPS,完全不比同样是使用key-vale的Memecache差!

Redis 为什么单线程还这么快?

CPU>内存>硬盘的速度
核心; redis 是将所有的数据全部放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换︰耗时的操作!!!),对于内存系统来说,如果没有上下文切换效率就是最高的!多次读写都是在一个CPU上的,在内存情况下,这个就是最佳的方案!

Redis五大数据类型

官方介绍

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication)LUA脚本(Lua scripting), LRU驱动事件(LRU eviction)事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。

Redis-key

127.0.0.1:6379> keys * #查看所有的key
(empty array)
127.0.0.1:6379> set name 1 
OK
127.0.0.1:6379> EXISTS name #判断该key是否存在
(integer) 1
127.0.0.1:6379> EXPIRE name 5 #设置key的过期时间,单位秒
(integer) 1
127.0.0.1:6379> ttl name #查看当前key的剩余时间
(integer) 2
127.0.0.1:6379> ttl name
(integer) 1
127.0.0.1:6379> ttl name
(integer) -2
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> EXISTS name
(integer) 0
127.0.0.1:6379> set name 1
OK
127.0.0.1:6379> MOVE name 1 #移出key-value至库1
(integer) 1

127.0.0.1:6379> type name #查看key的类型
string
127.0.0.1:6379> type age
string

String类型(字符串)

######################################################

127.0.0.1:6379> set key1 v1
OK
127.0.0.1:6379> APPEND key1 "hello" #往key1中追加hello,若key1不存在,就相当于set
(integer) 7
127.0.0.1:6379> get key1
"v1hello"
127.0.0.1:6379> STRLEN key1 #获取字符串长度
(integer) 7
127.0.0.1:6379> APPEND key1 "string"
(integer) 13
127.0.0.1:6379> STRLEN key1
(integer) 13
127.0.0.1:6379> get key1
"v1hellostring"

######################################################
127.0.0.1:6379> set views 0
OK
127.0.0.1:6379> incr views #自增1
(integer) 1
127.0.0.1:6379> incr views 
(integer) 2
127.0.0.1:6379> get views
"2"
127.0.0.1:6379> DECR views #自减1
(integer) 1
127.0.0.1:6379> get views
"1"
127.0.0.1:6379> INCRBY views 10 #指定增10
(integer) 11
127.0.0.1:6379> DECRBY views 2#指定减2
(integer) 9

######################################################

127.0.0.1:6379> set key1 "hello,world"
OK
127.0.0.1:6379> GETRANGE key1 0 3 #截取字符串[0,3]
"hell"
127.0.0.1:6379> GETRANGE key1 0 -1 #获取所有的字符串
"hello,world"

127.0.0.1:6379> set key2 abcdefg
OK
127.0.0.1:6379> SETRANGE key2 1 xx #替换指定位置开的字符串
(integer) 7
127.0.0.1:6379> get key2
"axxdefg"
127.0.0.1:6379> 
######################################################

#setex (set with expire) #设置过期时间
#setnx (set if not exist)#不存在才能设置(在分布式锁中会使用)

127.0.0.1:6379> SETEX key3 30 1
OK
127.0.0.1:6379> ttl key3
(integer) 26
127.0.0.1:6379> get key3
"1"
127.0.0.1:6379> setnx mykey "redis" #如果mykey不存在,创建成功,否则失败
(integer) 1
127.0.0.1:6379> keys *
1) "mykey"
127.0.0.1:6379> setnx mykey "MongoDB"
(integer) 0
127.0.0.1:6379> get mykey
"redis"
######################################################

127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 #同时创建多个值
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
127.0.0.1:6379> mget k1 k2 k3 #同时获取多个值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4 #原子性操作,同时成功同时失败
(integer) 0
127.0.0.1:6379> get k4
(nil)

###################################################### 

#对象

set user:1 {name:zhangsan,age:11} #设置一个user:1 对象 值为json字符串来保存一个对象!

#这里的key是一个巧妙的设计:user:{id}:{filed} ,如此设计在Redis中是完全OK了!

127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "2"
######################################################

127.0.0.1:6379> getset db redis
(nil)  
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mongodb  #先get再set
"redis"
127.0.0.1:6379> get db
"mongodb"

数据结构相同的!

List

在redis里面,我们可以把list玩成栈、队列、阻塞队列!

所有关于list的命令都是以l开头的

######################################################

127.0.0.1:6379> LPUSH list one #左边插入列表
(integer) 1
127.0.0.1:6379> LPUSH list two
(integer) 2
127.0.0.1:6379> LPUSH list three
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1 #获取list中的值
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> LRANGE list 0 1
1) "three"
2) "two"

127.0.0.1:6379> RPUSH list right #右边插入列表
(integer) 4
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
3) "one"
4) "right"

######################################################

127.0.0.1:6379> lpop list  #移除list的第一个元素
"three"
127.0.0.1:6379> rpop list #移除list的最后一个元素
"right"
127.0.0.1:6379> LRANGE list 0 -1
1) "two"
2) "one"
######################################################

127.0.0.1:6379> LRANGE list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> lindex list 1 #根据下标获取元素
"one"
127.0.0.1:6379> lindex list 0
"two"

######################################################

127.0.0.1:6379> LLEn list #返回列表长度
(integer) 2

######################################################

127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "three"
3) "two"
4) "one"
127.0.0.1:6379> lrem list 1 one #移除指定的元素 1个 one
(integer) 1
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "three"
3) "two"
127.0.0.1:6379> lrem list 1 three
(integer) 1
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> lrem list 2 three
(integer) 2
127.0.0.1:6379> LRANGE list 0 -1
1) "two"

######################################################

127.0.0.1:6379> rpush list 1
(integer) 1
127.0.0.1:6379> rpush list 2
(integer) 2
127.0.0.1:6379> rpush list 3
(integer) 3
127.0.0.1:6379> rpush list 4
(integer) 4
127.0.0.1:6379> ltrim list 1 2 #通过下标藏取指定的长度。这个list已经被改变了,截断了只剩下截取的元素!
OK
127.0.0.1:6379> lrange list 0 -1
1) "2"
2) "3"

######################################################

127.0.0.1:6379> RPUSH list 1
(integer) 1
127.0.0.1:6379> RPUSH list 2
(integer) 2
127.0.0.1:6379> RPUSH list 3
(integer) 3
127.0.0.1:6379> RPOPLPUSH list otherlist #移除列表的最后一个元素,将他移动到新的列表中!
"3"
127.0.0.1:6379> lrange list 0 -1
1) "1"
2) "2"
127.0.0.1:6379> lrange otherlist 0 -1
1) "3"

######################################################

127.0.0.1:6379> EXISTS list
(integer) 0
127.0.0.1:6379> LSET list 0 1
(error) ERR no such key
127.0.0.1:6379> LPUSH list 1
(integer) 1
127.0.0.1:6379> LSET list 0 item #将列表中指定下标的值替换为另外一个值,更新操作
OK
127.0.0.1:6379> lrange list 0 -1
1) "item"
127.0.0.1:6379> LSET list 0 1
OK
127.0.0.1:6379> lrange list 0 -1
1) "1"

######################################################

127.0.0.1:6379> rpush list 1
(integer) 1
127.0.0.1:6379> rpush list 2
(integer) 2
127.0.0.1:6379> linsert list before "2" "1.5"  #将值插入到列表中某个值的前面
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1
1) "1"
2) "1.5"
3) "2"
127.0.0.1:6379> linsert list after "2" "2.5" #将值插入到列表中某个值的后面
(integer) 4
127.0.0.1:6379> LRANGE list 0 -1
1) "1"
2) "1.5"
3) "2"
4) "2.5"

  • List实际上是一个链表,before Node after , left , right 都可以播入值·如果key不存在,创建新的链表
  • 如果key存在,新增内容
  • 如果移除了所有值,空链表,也代表不存在!
  • 在两边插入或者改动值,效率最高!中间元素,相对来说效率会低一点
  • ~消息排队!消息队列( Lpush Rpop )  栈(Lpush Lpop )

Set(集合)

set中的值无序唯一

################################################

127.0.0.1:6379> sadd myset hello #添加值
(integer) 1
127.0.0.1:6379> sadd myset word
(integer) 1
127.0.0.1:6379> sadd myset 1 2
(integer) 2
127.0.0.1:6379> SMEMBERS myset  #查看set中的值
1) "2"
2) "word"
3) "hello"
4) "1"
127.0.0.1:6379> SISMEMBER myset hello #判断set中值是否存在
(integer) 1
127.0.0.1:6379> SISMEMBER myset world
(integer) 0

127.0.0.1:6379> scard myset #获取set中元素个数
(integer) 4
################################################

127.0.0.1:6379> srem myset 1 #移除set中的指定元素
(integer) 1
127.0.0.1:6379> smembers myset 
1) "2"
2) "word"
3) "hello"

################################################

set 无序不重复集合,抽随机!

127.0.0.1:6379> smembers myset
1) "2"
2) "word"
3) "hello"
127.0.0.1:6379> SRANDMEMBER myset
"word"
127.0.0.1:6379> SRANDMEMBER myset
"2"
127.0.0.1:6379> SRANDMEMBER myset
"hello"
127.0.0.1:6379> SRANDMEMBER myset
"word"
127.0.0.1:6379> SRANDMEMBER myset
"hello"
127.0.0.1:6379> SRANDMEMBER myset 2 #随机抽取2个元素
1) "hello"
2) "word"
################################################

删除指定的key,随机删除key

127.0.0.1:6379> SMEMBERS myset
1) "2"
2) "word"
3) "hello"
127.0.0.1:6379> spop myset  #随机删除一个set中的元素
"word"
127.0.0.1:6379> spop myset
"2"
127.0.0.1:6379> spop myset
"hello"
127.0.0.1:6379> SMEMBERS myset
(empty array)

################################################

127.0.0.1:6379> sadd myset 1 2 3 4
(integer) 4
127.0.0.1:6379> sadd myset 5 6 7 8
(integer) 4
127.0.0.1:6379> smove myset myset2 1
(integer) 1
127.0.0.1:6379> SMEMBERS myset2
1) "1"
127.0.0.1:6379> 
################################################

数字集合类:

- 差集

- 交集

- 并集

127.0.0.1:6379> sadd key1 a b c d
(integer) 4
127.0.0.1:6379> sadd key2 a c
(integer) 2
127.0.0.1:6379> sdiff key1 key2  #差集
1) "b"
2) "d"
127.0.0.1:6379> sinter key1 key2  #交集
1) "c"
2) "a"
127.0.0.1:6379> sunion key1 key2 #并集
1) "a"
2) "d"
3) "b"
4) "c"

微博,A用户将所有关注的人放在一个set中!将他的粉丝页放在一个集合中!

共同关注,共同爱好,二度好友!(六度分割理论)

Hash(哈希)

map集合,key-Map集合!这个值是一个map集合!

set myhash 

############################################################

127.0.0.1:6379> hset myhash field1 hello #set一个具体的key-vlaue
(integer) 1
127.0.0.1:6379> hget myhash
(error) ERR wrong number of arguments for 'hget' command
127.0.0.1:6379> hget myhash field1
"hello"
127.0.0.1:6379> hset myhash field1 world field2 hello #set多个key-value
(integer) 1
127.0.0.1:6379> hmget myhash field1 field2 #获取多个字段值
1) "world"
2) "hello"
127.0.0.1:6379> hgetall myhash #获取全部的数据
1) "field1"
2) "world"
3) "field2"
4) "hello"

127.0.0.1:6379> hdel myhash field1 #删除hash指定可以字段
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "hello"

############################################################

127.0.0.1:6379> hmset myhash field1 world field3 tian field4 shi
OK
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "hello"
3) "field1"
4) "world"
5) "field3"
6) "tian"
7) "field4"
8) "shi"
127.0.0.1:6379> hlen myhash #获取hash中的字段数量
(integer) 4
############################################################

127.0.0.1:6379> HEXISTS myhash field1 #判断hash中的指定字段是否存在!
(integer) 1
127.0.0.1:6379> HEXISTS myhash field5
(integer) 0
############################################################

27.0.0.1:6379> hkeys myhash #获取所有字段
1) "field2"
2) "field1"
3) "field3"
4) "field4"
127.0.0.1:6379> hvals myhash #获取所有值
1) "hello"
2) "world"
3) "tian"
4) "shi"
############################################################

127.0.0.1:6379> HINCRBY myhash field5 1 #指定增量
(integer) 3
127.0.0.1:6379> HINCRBY myhash field5 -1
(integer) 2
127.0.0.1:6379> Hsetnx myhash field4 1 #如果存在则不能设置
(integer) 0
127.0.0.1:6379> HSETNX myhash field6 2 #如果不存在则能设置
(integer) 1

hash变更的数据 user name age,尤其是用户信息,经常变动的信息!hash跟适合对象的存储,String更适合字符串存储

Zset(有序集合)

在set的集成上,增加了一个值,set k1 v2 zset k1 score1 v1

127.0.0.1:6379> zadd zset 1 one #增加一个数
(integer) 1
127.0.0.1:6379> zadd zset 2 two 3 three
(integer) 2
127.0.0.1:6379> zrange zset 0 -1 #获取所有值
1) "one"
2) "two"
3) "three"
#######################################################

127.0.0.1:6379> zadd salary 2500 hu
(integer) 1
127.0.0.1:6379> zadd salary 5000 liu
(integer) 1
127.0.0.1:6379> zadd salary 300 liang
(integer) 1

#ZRANGEBYSCORE key min max
127.0.0.1:6379> ZRANGEBYScore salary -inf +inf   #显示全部用户 从小到大
1) "liang"
2) "hu"
3) "liu"

127.0.0.1:6379> ZREVRANGE salary 0 -1 #从大到小排序
1) "hu"
2) "liang"
127.0.0.1:6379> ZRANGEBYScore salary -inf +inf withscores #显示全部用户并附带成绩
1) "liang"
2) "300"
3) "hu"
4) "2500"
5) "liu"
6) "5000"
127.0.0.1:6379> ZRANGEBYScore salary -inf 2500 withscores #显示工资小于2500的升序
1) "liang"
2) "300"
3) "hu"
4) "2500"
#######################################################

127.0.0.1:6379> zrange salary 0 -1
1) "liang"
2) "hu"
3) "liu"
127.0.0.1:6379> zrem salary liu #移除有序集合中的指定元素
(integer) 1
127.0.0.1:6379> zrange salary 0 -1 
1) "liang"
2) "hu"
127.0.0.1:6379> ZCARD salary #获取有序集合中的个数
(integer) 2
#######################################################

127.0.0.1:6379> zadd myset 1 hello 2 world 3 qin
(integer) 3
127.0.0.1:6379> zcount myset 1 3 #获取指定区间的成员数量
(integer) 3
127.0.0.1:6379> zcount myset 1 2
(integer) 2

其余的api,可以根据需要,查看官方文档!

案例思路:set 排序 存储班级成绩表,工作表排序!

普通消息,1,重要消息2,带权重判断!

排行榜应用,Top N测试

三种特殊数据类型

geospatial 地理位置

定位,距离计算怎么实现?

Redis的Geo在Redis3.2版本推出!这个功能可以推算地理位置的信息,两地之间的距离,方圆附件的人!

可以查询一下测试数据:http://www.jsons.cn/lngcode/

 geoadd

#添加地理位置

#规则:两极无法直接添加,我们一般不会下载城市数据,直接通过java程序一次性导入

#

  • 有效的经度从-180度到180度。
  • 有效的纬度从-85.05112878度到85.05112878度。

#127.0.0.1:6379> geoadd china:city 39.90 116.40 beijing
(error) ERR invalid longitude,latitude pair 39.900000,116.400000
 

#参数 key 值(经度、纬度、名称)

127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.50 29.53 chongqi 114.05 22.52 shengzheng
(integer) 2
127.0.0.1:6379> geoadd china:city 120.16 30.24 hangzhou 108.96 34.26 xian
(integer) 2

geopos:获得当前定位:一定是一个坐标值

#获取指定的城市的经度和纬度

127.0.0.1:6379> geopos china:city beijing chongqi 
1) 1) "116.39999896287918091"
   2) "39.90000009167092543"
2) 1) "106.49999767541885376"
   2) "29.52999957900659211"

geodist

返回两个给定位置之间的距离。

如果两个位置之间的其中一个不存在, 那么命令返回空值。

指定单位的参数 unit 必须是以下单位的其中一个:

  • m 表示单位为米。
  • km 表示单位为千米。
  • mi 表示单位为英里。
  • ft 表示单位为英尺。

127.0.0.1:6379> geodist china:city beijing shanghai  
"1067378.7564"
127.0.0.1:6379> geodist china:city beijing shanghai km  #查看上海到北京的直线距离
"1067.3788"
127.0.0.1:6379> geodist china:city beijing chongqi km   #查看重庆到北京的直线距离
"1464.0708"

georadius

以给定的经纬度为中心, 返回键包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素。

范围可以使用以下其中一个单位:

  • m 表示单位为米。
  • km 表示单位为千米。
  • mi 表示单位为英里。
  • ft 表示单位为英尺。

我附近的人?(获取所有附近的人的定位)通过半径来查询!

获得指定数量的人, 200

所有数据应该都录入 : china:city ,才会让结果更加请求!

127.0.0.1:6379> georadius china:city 110 30 1000 km #以110,30这个经纬度为中心,寻找方圆1000km内的城市
1) "chongqi"
2) "xian"
3) "shengzheng"
4) "hangzhou"
127.0.0.1:6379> georadius china:city 110 30 500 km
1) "chongqi"
2) "xian"
127.0.0.1:6379> georadius china:city 110 30 500 km withdist #显示到中心距离的位置
1) 1) "chongqi"
   2) "341.9374"
2) 1) "xian"
   2) "483.8340"
127.0.0.1:6379> georadius china:city 110 30 500 km withcoord #显示他人的定位信息
1) 1) "chongqi"
   2) 1) "106.49999767541885376"
      2) "29.52999957900659211"
2) 1) "xian"
   2) 1) "108.96000176668167114"
      2) "34.25999964418929977"
127.0.0.1:6379> georadius china:city 110 30 500 km withdist withcoord count 1 #筛选出指定的结果
1) 1) "chongqi"
   2) "341.9374"
   3) 1) "106.49999767541885376"
      2) "29.52999957900659211"
127.0.0.1:6379> georadius china:city 110 30 500 km withdist withcoord count 2
1) 1) "chongqi"
   2) "341.9374"
   3) 1) "106.49999767541885376"
      2) "29.52999957900659211"
2) 1) "xian"
   2) "483.8340"
   3) 1) "108.96000176668167114"
      2) "34.25999964418929977"

georadiusbymember

#找出位于指定元素周围的其他元素

127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km
1) "beijing"
2) "xian"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city shanghai 400 km
1) "hangzhou"
2) "shanghai"

geohash:返回一个或多个位置元素的geohash表示

该命令将返回11个字符的Geohash字符串

#将二维的经纬度装换为一维的字符串,如果两个字符串越接近,那么距离越近

127.0.0.1:6379> geohash china:city beijing chongqi
1) "wx4fbxxfke0"
2) "wm5xzrybty0"

GEO 底层实现原理其实就是Zset,我们可以使用Zset命令来操作geo

127.0.0.1:6379> zrange china:city 0 -1 #查看地图中所有的元素
1) "chongqi"
2) "xian"
3) "shengzheng"
4) "hangzhou"
5) "shanghai"
6) "beijing"
127.0.0.1:6379> zrem china:city beijing #移除地图中的beijing
(integer) 1
127.0.0.1:6379> zrange china:city 0 -1
1) "chongqi"
2) "xian"
3) "shengzheng"
4) "hangzhou"
5) "shanghai"

Hyperloglog

什么是基数?

A{1,3,5,7,8,9}

B{1,3,5,7,9}

基数(不重复的元素)= 5,可以接受误差!

简介

Redis2.8.9版本更新了Hyperloglog数据结构

Redis Hyperloglog基数统计的算法

优点:占用的内存是固定的,2^64不同的元素的技术,只需要12KB内存!如果从内存角度来比较的话Hyperloglog首选!

网页的UV(一个人访问一个网站多出,但是还是算作一个人!)

传统的方式,set保存用户号的id,然后就可以统计set中的元素数量作为标准判断

这个方式如果保存大量的用户id,比较麻烦,我们的目的是为了技术,而不是保存用户id;

存在0.81%错误率!统计UV任务,可以忽略不计的!

测试使用

127.0.0.1:6379> pfadd mykey a b c d e f g h i j #创建第一组元素  mykey
(integer) 1
127.0.0.1:6379> PFCOUNT mykey   # 统计mykey元素的基数数量
(integer) 10
127.0.0.1:6379> pfadd mykey2 i j z x c v b n m
(integer) 1
127.0.0.1:6379> PFCOUNT mykey2
(integer) 9
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2 # 合并两组 mykey mykey2 => mykey3 并集
OK
127.0.0.1:6379> PFCOUNT mykey3  
(integer) 15

如果允许容错,那么一定可以使用Hyperloglog!

如果不允许容错,就使用set或者自己的数据类型即可!

Bitmaps

存储

统计疫情感染人数:0 1 0 1 0 0……

统计用户信息,活跃,不活跃!登录,未登录!打卡,365打卡!两个状态的,都可以使用Bitmaps!

Bitmaps位图,数据结构!都是操作二进制位来进行记录,就只有0和1两个状态!

365 天 = 365 bit  1字节 = 8bit  大概需要46字节左右!

测试(使用bitmaps来记录周一到周日的打卡!)

127.0.0.1:6379> setbit sign 0 1 #第一位数字代表某天,第二位数字代表是否打卡
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 1
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 0
(integer) 0
127.0.0.1:6379> getbit sign 3   #查看某一天是否打卡
(integer) 1
127.0.0.1:6379> getbit sign 6
(integer) 0
127.0.0.1:6379> bitcount sign  #统计这周的打卡记录,就可以看到是否全勤!
(integer) 3

事物

Redis事务本质:一组命令的集合!一个事务中的所有命令都会被序列化,在事务执行过程中,会安装顺序执行!

一次性、顺序性、排他性!执行一系列的命令!

----- 队列 set set set 执行-------

Redis事务没有隔离级别的概念!

所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行!Exec

Redis单条命令是保持原子性的,但是事务保证原子性!

redis的事务:

  • 开启事务( multi )
  • 命令入队(……)
  • 执行事务(exec)

127.0.0.1:6379> multi  #开启事务
OK

#命令入队
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> get k2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> exec  #执行事务
1) OK
2) OK
3) "v2"
4) OK

127.0.0.1:6379> MULTI #开启事务
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k4 k4
QUEUED
127.0.0.1:6379(TX)> DISCARD #取消事务
OK
127.0.0.1:6379> get k4 #事务队列中命令不会执行
(nil)

编写型远程(代码有问题!命令有错!),事务中所有的命令都不会执行!

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> getset k3 #错误的命令
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379(TX)> set k4 v4
QUEUED
127.0.0.1:6379(TX)> set k5 v5
QUEUED
127.0.0.1:6379(TX)> EXEC 
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k5 #所有命令都不会执行
(nil)

运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令可以正常执行,错误命令抛出异常!

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> set k1 "v1"
QUEUED
127.0.0.1:6379(TX)> incr k1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> get k3
QUEUED
127.0.0.1:6379(TX)> EXEC
1) OK
2) (error) ERR value is not an integer or out of range #虽然这条命令报错了,但是其他命令依然正常执行!
3) OK
4) OK
5) "v3"
127.0.0.1:6379> get k2
"v2"
127.0.0.1:6379> get k3
"v3"

监控!watch(面试常问)

悲观锁:

  • 很悲观,什么时候都会出问题,无论做什么都会加锁!

乐观锁:

  • 很乐观,认为什么时候都不会出问题,不会上锁!更新数据的时候去判断一下,在此期间是否有人修改过这周数据,
  • 获取version
  • 更新的时候比较version

Redis监听测试

正常执行成功!

127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money #监视money对象
OK
127.0.0.1:6379> MULTI   #事务正常结束,
OK
127.0.0.1:6379(TX)> DECRBY money 20
QUEUED
127.0.0.1:6379(TX)> INCRBY out 20 
QUEUED
127.0.0.1:6379(TX)> EXEC
1) (integer) 80
2) (integer) 20

测试多线程修改值,使用watch可以当做redis的乐观锁操作!

127.0.0.1:6379> WATCH money
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> DECRBY money 10
QUEUED
127.0.0.1:6379(TX)> INCRBY out 10
QUEUED
127.0.0.1:6379(TX)> exec #执行之前,另外一个线程修改了我们的值,这个时候,就会导致事务执行失败!
(nil)

如果修改失败,后勤最新的值就好

127.0.0.1:6379> unwatch #1、事务失败,先解锁
OK
127.0.0.1:6379> watch money #2、获取新值,再次监视
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> DECRBY money 10
QUEUED
127.0.0.1:6379(TX)> INCRBY out 10
QUEUED
127.0.0.1:6379(TX)> exec  #3、比对监视的值是否发生变化,如果没有变化,那么可以执行成功,否则执行失败
1) (integer) 990
2) (integer) 30

Jedis

Jedis是Redis官方推荐的java连接开发工具!使用java操作Redis中间件!如果要使用java操作redis,一定对Jedis十分的熟悉!

测试

1、导入对应的依赖

<!--导入jedis包-->
<dependencies>
    <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>4.3.0</version>
    </dependency>
    <!--fastjson-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

2、编码测试:

  • 连接数据库
  • 操作命令
  • 断开连接
//1、new Jedis 对象即可
Jedis jedis = new Jedis("192.168.168.142",6379);
//jedis 所有命令就是我们之前学习的所有指令
System.out.println(jedis.ping());

常用的API

String

List

Set

Hash

Zset

所有命令与redis中的命令基本一致

事务操作

//1、new Jedis 对象即可
Jedis jedis = new Jedis("192.168.168.142",6379);

JSONObject jsonObject = new JSONObject();
jsonObject.put("hello","world");
jsonObject.put("name","qzh");
jedis.flushDB();

//开启事务
Transaction multi = jedis.multi();
String s = jsonObject.toJSONString();
// jedis.watch(s);
try {
    multi.set("user1",s);
    multi.set("user2",s);
    int i = 1/0;//异常代码
    multi.exec();//执行事务
} catch (Exception e) {
    multi.discard();//放弃事务
    e.printStackTrace();
} finally {
    System.out.println(jedis.get("user1"));
    System.out.println(jedis.get("user2"));
    jedis.close();//关闭事务
}

SpringBoot整合

springboot操作数据:使用spring-data

在SpringBoot2.x之后,原来使用的jedis被替换为了lettuce
jedis :采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedis pool连接池,更像BIO模式

lettuce :采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据了,更像NIO模式
源码分析

@Bean
@ConditionalOnMissingBean(name = {"redisTemplate"}) //可以自定义一个redisTemplate来替换这个默认Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
    //默认的RedisTemplate,redis对象都是需要序列化
    //两个泛型都是object,object的类型,我们使用需要强制转换<String, Object>
    RedisTemplate<Object, Object> template = new RedisTemplate();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}

@Bean
@ConditionalOnMissingBean //String 是redis中最常用的类型,所以单独提出来一个Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
    StringRedisTemplate template = new StringRedisTemplate();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}

整合测试一下

1、导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、配置连接

spring.redis.host=192.168.168.142
spring.redis.port=6379

3、测试

   @Autowired
    private RedisTemplate redisTemplate;

    @Test
    void contextLoads() {

        //redisTemplate
        // opsForValue 操作字符串 类似String
        // opsForList 操作list 类似list
        // opsForSet
        // opsForHash
        // opsForZset
        // opsForGeo
        // opsForHyperLogLog

        //处理进行基本的操作,我们常用的方法都可以通过RedisTemplate操作,比如事务,和基本的CRUD

        // 获取redis的连接对象
        //        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        //        connection.flushAll();
        //        connection.flushDb();
        redisTemplate.opsForValue().set("mykey", "天天向上");
        System.out.println(redisTemplate.opsForValue().get("mykey"));
    }

 

编写自己的RedisConfig(包含使用redis做缓存部分配置)

package com.qzh.springboot.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.cache.CacheProperties;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.*;

import java.lang.reflect.Method;
import java.util.Arrays;

/**
 * @author qzh
 * @date 2022/10/13 上午 11:03
 * @description
 */
@Configuration
@EnableCaching //启用缓存
public class RedisConfig extends CachingConfigurerSupport {

    /**
     * 自定义redisTemplate的序列化
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        // Json对象的序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        // String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // key采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // hash 的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();

        return redisTemplate;
    }

    /**
     * 配置Redis缓存注解的value序列化方式,解决乱码
     */
    @Bean
    public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {

        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();

        // 序列化值
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair
                .fromSerializer(new GenericJackson2JsonRedisSerializer()));

        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }
        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixKeysWith(redisProperties.getKeyPrefix());
        }
        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }
        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }

        return config;
    }


    /**
     * 自定义缓存key生成策略
     *
     * @return
     */
    @Bean
    @Override
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object o, Method method, Object... objects) {
                StringBuilder sb = new StringBuilder();
                sb.append(o.getClass().
                        getName()).append(":")
                        .append(method.getName());
                if (objects != null && objects.length != 0) {
                    sb.append(":" + Arrays.toString(objects));
                }

                return sb.toString();
            }
        };
    }
}

redis工具类RedisUtil

package com.qzh.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @author qzh
 * @date 2022/11/1 下午 04:06
 * @description redis工具类
 */
@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    //============================common=============================

    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire (String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
                return true;
            } else {
                throw new RuntimeException("缓存失效时间不能小于0");
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire (String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length != 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }

    // ============================String=============================

    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }


    /**
     * 普通缓存存放
     * @param key 键
     * @param value 值
     * @return true成功 false失败
     */
    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                redisTemplate.opsForValue().set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().decrement(key, delta);
    }

    // ================================Map=================================

    /**
     * HashGet
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<Object, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<Object, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0 ) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================

    /**
     * 获取list缓存的内容
     *  @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     * @return
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将list放入缓存
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }

    }
}

Redis.conf详解

启动的时候,就是通过配置文件启动的

1、单位 (units)

 配置文件unit单位 对大小写不敏感

2、包含

 组合多个配置文件

3、网络(NETWORK)

bind 127.0.0.1  #绑定ip

protected-mode yes  #保护模式

port 6379  #端口设置

4、通用(GENERAL)

daemonize yes         # 以守护进程的方式运行,默认是no,我们需要自己打开为yes

pidfile /var/run/redis_6379.pid      #如果以后台方式运行,需要指定一个pid文件

#日志

# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably) 生成环境
# warning (only very important / critical messages are logged)
loglevel notice
logfile ""   #配置日志文件名

databases 16  #数据库数量,默认16

always-show-logo no  #是否显示logo

5、快照(SNAPSHOTTING)

持久化,在规定时间内,执行多少次操作,则会持久化到文件.rdb.aof

redis 是内存数据库,若没有持久化,会断电即失

# 如果900s内,至少有1 key进行了修改,会进行持久化操作

save 900 1

# 如果300s内,至少有10 key进行了修改,会进行持久化操作
save 300 10
save 60 10000

stop-writes-on-bgsave-error yes  #如果持久化出错,是否继续工作

rdbcompression yes   #是否压缩rdb文件

rdbchecksum yes  #保存rdb文件的是否,进行错误的检查校验

dir ./    #rdb文件保存的目录

6、REPLICATION复制,(主从复制)

7、SECURITY 安全

设置redis的密码,默认是没有密码

requirepass  #设置redis密码

8、CLIENTS限制客户端

maxclients 10000   #设置连接的最大客户端数

9、MEMORY MANAGEMENT  内存设置

maxmemory <bytes>   #redis配置最大的内存容量

maxmemory-policy noeviction  # 内存到达上限后的处理策略,

  1. volatile-lru:只对设置了过期时间的key进行LRU(默认值)
  2. allkeys-lru :删除:lru算法的key
  3. volatile-random:随机删除即将过期key
  4. allkeys-random:随机删除
  5. volatile-ttl :删除即将过期的
  6. noeviction :永不过期,返回错误

10、APPEND ONLY MODE模式  aof配置

appendonly no    #默认是不开启aof模式,默认使用rdb方式

appendfilename "appendonly.aof"  #持久化文件名

# appendfsync always    #每次修改都会sync同步
appendfsync everysec   #每秒执行一次sync,可能会丢失ls的数据
# appendfsync no   #不执行sync,操作系统自己同步数据

Redis持久化

RDB(Redis DataBase)

 在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。

Redis会单独创建( fork )一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。我们默认的就是RDB,一般情况下不需要修改这个配置!

生产环境会将rdb备份

rdb保存的文件是 dump.rdb 在配置文件中配置

 触发机制

1、save的规则满足的情况下,会自动触发rdb规则

2、执行flushall命令,也会触发我们的rdb规则

3、退出redis,也会产生rdb文件

恢复rdb文件

1、只需要将rdb文件放到我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复数据

2、查看需要存在的位置

127.0.0.1:6379> config get dir
1) "dir"
2) "/"             #如果在这个目录下存在dump.rdb文件,启动就会自动恢复其中的数据

优点:

1、适合大规模的数据恢复!dump.rdb

2、对数据的完整性要求不高

缺点:

1、需要一定的时间间隔进程操作!如果redis宕机了,这个最后一次修改数据就没有了

2、fork进程的时候,会占用一定的内容空间

AOF (Append Only File)

将我们的所有命令都记录下来,history ,恢复的时候就把这个文件全部在执行一遍!

 以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作


aof保存的是appendonly.aof文件

 默认是不开启,我们需要将appendonly改为yes,开启aof模式。

重启,redis就生效!

如果这个aof文件有错误,这个时候redis启动不了,我们需要修复aof文件

可以使用redis的工具  redis-check-aof  --fix  恢复aof文件

重写规则

如果aof文件大于64m,太大了!fork一个新进程来将我们的文件进行重写

 优缺点

优点:

1、每一次修改都同步,文件的完整性更好

2、每秒同步远程,可能会丢失一秒数据

3、从不同步,效率最高

缺点:

1、相对于数据文件来说,aof远远大于rdb,修复速度也不rdb慢

2、aof运行效率比rdb更慢

小结扩展

1、RDB持久化方式能够在指定的时间间隔内对你的数据进行快照存储
2、AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大,
3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
4、同时开启两种持久化方式

  • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
  • RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。

5、性能建议

  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1这条规则。
  • 如果Enable AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
  • 如果不Enable AOF,仅靠Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave同时宕掉,会丢失十几分钟的数据,启动脚本也要比较两个Master/Slave 中的RDB文件,载入较新的那个,微博就是这种架构。

Redis订阅发布

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis客户端可以订阅任意数量的频道。

订阅/发布消息图:

 下图展示频道channel1,以及订阅这个频道的三个客户端之间的关系:

 

当有新消息通过publish命令发送给频道channel1时,这个消息会被发送给订阅它的三个客户端:

 

 

 命令

 测试

订阅端

127.0.0.1:6379> SUBSCRIBE qzh  #订阅一个频道 qzh
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "qzh"
3) (integer) 1

# 等待读取推送的消息
1) "message"  # 消息
2) "qzh"   #哪个频道的消息
3) "qzh"   #消息的具体内容
1) "message"
2) "qzh"
3) "hello,redis"

发送端

127.0.0.1:6379> PUBLISH qzh qzh  # 发布者发布消息到频道
(integer) 1
127.0.0.1:6379> PUBLISH qzh hello,redis   
(integer) 1

java实现发布订阅测试

Main方法

    public static void main(String[] args) {
        Jedis subscriberJedis = new Jedis(127.0.0.1, 6379);//订阅者连接
        Jedis publisherJedis = new Jedis(127.0.0.1, 6379);//发布者连接


        String channel = "channel";//频道

        SubscriberThread subscriberThread = new SubscriberThread(subscriberJedis, channel);//订阅线程
        PublisherThread publisherThread = new PublisherThread(publisherJedis, channel);//发布线程

        subscriberThread.start();
        publisherThread.start();
    }

Subscriber(订阅者)

//继承JedisPubSub类

public class Subscriber extends JedisPubSub {

    //收到消息时调用
    @Override  
    public void onMessage(String channel, String message) {
        System.out.printf("from channel \"%s\", message: %s\n", channel, message);
    }

    //订阅成功时调用

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.printf("Subscribe channel %s succeed, subscribed channels %d\n", channel, subscribedChannels);
    }

    //取消订阅时调用

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.printf("Unsubscribe %s succeed, subscribedChannels %d\n", channel, subscribedChannels);
    }
}

PublisherThread 发布者线程类

public class PublisherThread extends Thread {
    private Jedis jedis;
    private String channel;

    PublisherThread(Jedis jedis, String channel) {
        this.jedis = jedis;
        this.channel = channel;
    }

    @Override
    public void run() {
        try {
            while (true) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));

                jedis.publish(channel, bufferedReader.readLine());//向频道发送消息
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

SubscriberThread 订阅者线程类

public class SubscriberThread extends Thread {

    private Jedis jedis;
    private String channel;

    public SubscriberThread(Jedis jedis, String channel) {
        this.jedis = jedis;
        this.channel = channel;
    }

    @Override
    public void run() {
        Subscriber subscriber = new Subscriber();
        jedis.subscribe(subscriber, channel);//订阅者订阅频道
    }
}

Redis是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。

Redis 通过 PUBLISH、SUBSCRIBE 和PSUBSCRIBE等命令实现发布和订阅功能。


通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个个channel,而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定channel的订阅链表中。

通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。


Pub/Sub从字面上理解就是发布(Publish )与订阅( Subscribe ) ,在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
 

使用场景:

1、实时消息系统!

2、实时聊天!(频道当做聊天室)

3、订阅,关注系统

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave以读为主。
默认情况下,每台Redis服务器都是主节点;

且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

主从复制的作用主要包括:
1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
2、故障恢复︰当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复﹔实际上是一种服务的冗余。
3、负载均衡∶在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用(集群)基石∶除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。


一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的(宕机),原因如下︰
1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;
2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G

 
电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是""多读少写"。

对于这种场景,我们可以使如下这种架构:

 主从复制,读写分离!80%的情况下都是在进行读操作,减轻服务器的压力,架构中经常使用,最基本的情况下是一主二从!

只要在实际项目中,主从复制是必须使用的

主从复制环境配置

只配置从库,不用配置主库!

127.0.0.1:6379> info replication #查看当前库的信息
# Replication
role:master # 角色  master
connected_slaves:0  #从机数量0
master_failover_state:no-failover
master_replid:422817d3d15cdb40c1fafe97f8672af4b02ba743
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

复制3个配置文件,修改对应信息

1、端口

2、pid名字

3、log文件名字

4、dump.rdb名字

修改完毕后,启动3个redis服务器

 一主二从配置

默认情况下,每台Redis服务器都是主节点;我们一般情况下只用配置从机就好了!

认老大!一主(79)二从(80,81)

127.0.0.1:6380> Slaveof 127.0.0.1 6379  #slaveof host 6379 认老大
OK
127.0.0.1:6380> info replication
# Replication
role:slave  #当前角色时从机
master_host:127.0.0.1  #可以看到主机的信息
master_port:6379
master_link_status:up
master_last_io_seconds_ago:2
master_sync_in_progress:0
slave_read_repl_offset:14
slave_repl_offset:14
slave_priority:100
slave_read_only:1
replica_announced:1
connected_slaves:0
master_failover_state:no-failover
master_replid:96d8fffb5ad88a3bb977b6c10bd201c6570e2a77
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:14

#  在主机中查看从机信息

127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:1  #多了从机的配置
slave0:ip=127.0.0.1,port=6380,state=online,offset=28,lag=0 #从机的信息
master_failover_state:no-failover
master_replid:96d8fffb5ad88a3bb977b6c10bd201c6570e2a77
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:28
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:28

如果两个都配置完成了,就是有两个从机的

 真实的主从配置应该在配置文件中配置,这样才是永久的,目前使用的是命令配置的是暂时的。

配置文件配置:

 细节

主机可以写,从机不能写只能读!主机中所有的数据都会被从机保存

主机写:

 从机只能读:

 测试:主机断开连接,从机永久连接到主机的,但是没有写操作,这个时候,主机如果回来了,从机依旧可以直接获取到主机写的信息!

如果是使用命令行,来配置的主从,这个时候如果重启了,就会变为主机!只要变为从机,立马就会从主机中获取值!

复制原理

Slave启动成功连接到master后会发送一个sync同步命令
Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave ,并完成一次完全同步。

全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中.

增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步但是只要是重新连接master,一次完全同步(全量复制)将被自动执行!我们的数据一定可以在从机中看到!

链路连接

上一个M连接下一个S,下一个S连接下一个S, 这时也可以完成我们的主从复制

如果没有老大了,这个时候能不能选择一个老大出来?手动设置

谋朝篡位主动版:如果主机断开了连接,我们可以使用SLAVEOF no one 让自己变成主机!其他节点就可以手动连接到最新的主节点(手动)!如果这个时候老大修复了,那就重新连接!

哨兵模式

(自动选举老大的模式)

概述

主从切换技术的方法是︰当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel (哨兵)架构来解决这个问题。

谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

 这里的哨兵有两个作用

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

 假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线,当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

测试!

我们目前的状态是一主二从

1、配置哨兵配置文件sentinel.conf

# sentinel monitor 被监控的名称 host port 1

sentinel monitor myredis 127.0.0.1 6379 1

后面的这个数字1 ,代表主机挂了,slave投票看让谁来接替成为主机,数量最多的成为主机!

2、启动哨兵

[root@localhost bin]# redis-sentinel sentinel.conf 
81781:X 03 Nov 2022 00:55:23.276 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
81781:X 03 Nov 2022 00:55:23.276 # Redis version=6.2.7, bits=64, commit=00000000, modified=0, pid=81781, just started
81781:X 03 Nov 2022 00:55:23.276 # Configuration loaded
81781:X 03 Nov 2022 00:55:23.277 * monotonic clock: POSIX clock_gettime
81781:X 03 Nov 2022 00:55:23.278 # A key '__redis__compare_helper' was added to Lua globals which is not on the globals allow list nor listed on the deny list.
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 6.2.7 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                  
 (    '      ,       .-`  | `,    )     Running in sentinel mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 26379
 |    `-._   `._    /     _.-'    |     PID: 81781
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           https://redis.io       
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               

81781:X 03 Nov 2022 00:55:23.281 # Sentinel ID is 06776c034f2c3dc70343ea714217abc4c287b3b9
81781:X 03 Nov 2022 00:55:23.281 # +monitor master myredis 127.0.0.1 6379 quorum 1
81781:X 03 Nov 2022 00:55:23.281 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
81781:X 03 Nov 2022 00:55:23.284 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379

如果Master节点断开了,这个时候就会从从机中随机选择一个服务器!(这里有一个投票算法)

 哨兵日志

 如果主机此时回来了,只能归并到新的主机下,当做从机,这就是哨兵模式的规则

优缺点

优点:

1、哨兵集群,基于主从复制模式,所有主从配置优点,它都有

2、主从可以切换,故障可以转移,系统的可用性会更好

3、哨兵模式就是主从复制模式的升级,手动到自动,更加健壮!

缺点:

1、Redis不好在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦

2、实现哨兵模式的配置其实是很麻烦的,里面有很多选择!

哨兵模式的全部配置

# Example sentinel.conf


# 哨兵sentinel实例运行的端口 默认26379 

port 26379


# 哨兵sentinel的工作:目录

dir /tmp


# 哨兵sentinel监控的redis主节点的 ip port
# master-name可以自己命名的主节点名字只能由字衍A-z、数字0-9、这三个字符".-_""组成。

# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了

# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2


#当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码

#设置哨兵sentinel连接主从的密码注意必须为主从没置一样的验证密码
#sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd


#指定多少毫秒之后主节点没有应答哨兵sentinel此时哨兵主观上认为主节点下线 默认30秒#sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000
 

#这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行同步,这个数字越小。完成failover所需的时间就越长。
但是如果这个数字越大。就意味着越多的slave因为replication而不可用。
可以通过将这个值设为1来保证每次只有一个s1ave处于不能处理命令请求的状态。

#sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1

#故障转移的超时时间 failover-timeout 可以用在以下这些方面:

#1,同一个sentinel对同一个master两次failover之间的间隔时间。
#2,当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。

#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master。但是就不按parallel-syncs所配置的规则来了
#默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>

sentinel failover-timeout mymaster 180000
 

#SCRIPTS EXECUTION


#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。

#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10

#若脚本执行后返回2,或者比2更高的一个返回值。脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。


#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本
#sentinel notification-script <master-name> <script-path>

sentinel notification-script mymaster /var/redis/notify.sh

#客户端重新配置主节点参数脚本
#当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。

#以下参数将会在调用脚本时传给脚本:
#<master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>

#目前<state>总是""failover".
#<role>是"leader"或者observer""中的一个。
#参数from-ip,from-port,to-ip,to-port是用来和旧的master和新的master(即旧的slave)通信的

#这个脚本应该是通用的。能被多次测用,不是针对性的。
#sentinel client-reconfig-script <master-name> <script-path>

sentinel client-reconfig-script mymaster /var/redis/reconfig.sh    

Redis缓存穿透和雪崩(面试高频,工作常用)

服务的高可用问题!

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题,其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。


缓存穿透(查不到)

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;

 缓存空对象

存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;

 但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

缓存击穿(量太大,缓存过期!)

概念

这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。

解决方案

设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点 key过期后产生的问题。

加互斥锁

分布式锁∶使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
 

缓存雪崩

概念

缓存雪崩,是指在菜一个时间段,缓存集中过期失效。Redis宕机!

产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

 其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形战的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。

解决方案

redis高可用

这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis ,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。(异地多活)

限流降级

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

小结

资料获取:关注狂神说:Redis

redis视频链接:【狂神说Java】Redis最新超详细版教程通俗易懂_哔哩哔哩_bilibili

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐