____tz_zs

一、查询所有键 :函数 keys 与 scan

(一) keys

获取所有符合规则的键
keys(pattern=’*’)
pattern:匹配规则

keys操作在大规模redis集群中会导致redis崩溃 https://mp.weixin.qq.com/s/SGOyGGfA6GOzxwD5S91hLw
Redis 是单线程程序,是按照顺序执行指令的,如果说我们现在正在执行 keys 命令,那么其它指令必须等到当前的 keys 指令执行完了才可以继续,再加上 keys 操作是遍历算法,复杂度是 O (n) 。

(二)scan

scan = redis_client.scan(cursor=0, match=None, count=None, _type=None)
Scan 命令用于迭代数据库中的数据库键。SCAN 命令是一个基于游标的迭代器,每次被调用之后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为下一个 SCAN 命令的游标参数输入,以此来延续之前的迭代过程。

参数:

  • cursor - 游标。默认从0开始。
  • pattern - 匹配的模式。用于筛选过滤 key。(对元素的模式匹配工作是在命令从数据集中取出元素之后, 向客户端返回元素之前的这段时间内进行的, 所以如果被迭代的数据集中只有少量元素和模式相匹配, 那么迭代命令或许会在多次执行中都不返回任何元素。)
  • count - 指定查询元素数量,默认值为 10。(count 不是限定返回结果的数量,而是限定服务器单次遍历的字典槽位数量,所以返回数量可能因为 pattern 过滤而返回少于 count)
  • _type - 按特定的Redis类型过滤返回的值。 Redis实例允许的类型:HASH,LIST,SET,STREAM,STRING,ZSET。

返回值:

  • 返回包含两个元素的元组,第一个元素是用于进行下一次迭代的新游标(如果新游标返回 0 表示迭代已结束), 第二个元素则是一个数组, 这个数组中包含了所有被迭代的元素。

特点:

  • 复杂度虽然也是 O (n),但是它是通过游标分步进行的,不会阻塞线程;
  • 提供 limit 参数,可以控制每次返回结果的最大条数,limit 只是对增量式迭代命令的一种提示 (hint),返回的结果可多可少;
  • 同 keys 一样,它也提供模式匹配功能;
  • 服务器不需要为游标保存状态,游标的唯一状态就是 scan 返回给客户端的游标整数;
  • 返回的结果可能会有重复,需要客户端去重复,这点非常重要;
  • 遍历的过程中如果有数据修改,改动后的数据能不能遍历到是不确定的;
  • 单次返回的结果是空的并不意味着遍历结束,而要看返回的游标值是否为零

二、批量操作 pipeline

管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回。
pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间。将多个命令缓存起来,缓冲区满了就发送(将多条命令打包发送);有点像“请求合并”。服务端接受一组命令集合,切分后逐个执行返回。

Redis 客户端与 server 通信,使用的是客户端-服务器(CS)模式;每次交互,都是完整的请求/响应模式。
https://zhuanlan.zhihu.com/p/102045642
这意味着通常情况下一个请求会遵循以下步骤:

  • 客户端连接服务端,基于特定的端口,发送一个命令,并监听Socket返回,通常是以阻塞模式,等待服务端响应。
  • 服务端处理命令,并将结果返回给客户端。
  • 每个命令底层建立TCP连接的时间是省不掉的,即使我们都是在内网使用Redis,内网快但请求/响应的往返时间是不会减少的。
  • 当需要对一组kv进行批量操作时,这组命令的耗时=sum(N*(建立连接时间+发送命令、返回结果的往返时间RTT)),随批量操作的key越多,时间累加呈线性增长。

pipeline 的局限性

pipeline 只能用于执行连续且无相关性的命令,当某个命令的生成需要依赖于前一个命令的返回时(或需要一起执行时),就无法使用 pipeline 了。

三、使用例子


import time
from redis import StrictRedis
import json


redis_client = StrictRedis(host="ip地址", port=端口号, password="密钥", db=0)

# 得到此 db 中的所有 key 组成的 list。
redis_client_keys = redis_client.scan_iter(match=None, count=None, _type=None)
print(redis_client_keys)  # list

key_list = []  # 选择自己需要的 key
for k in redis_client_keys:
    k = k.decode("utf8")
    if k.startswith("tz_zs_"):
        key_list.append(k)
print(key_list)

pipeline = redis_client.pipeline()

while True:
    for k in key_list:
        pipeline.get(k) #

    ts_d_list = []
    pipeline_execute_results = pipeline.execute()
    now_t = time.time()
    for i in range(len(pipeline_execute_results)):
        one_result = json.loads(pipeline_execute_results[i])
        ts_d = now_t - one_result.get("TS")
        ts_d_list.append(ts_d)
        if ts_d > 3:
            print("%s 延迟超过!%s" % (key_list[i], ts_d))
    print(ts_d_list)
    time.sleep(5)


四、参考

https://www.runoob.com/redis/keys-scan.html
https://learnku.com/articles/25892
http://doc.redisfans.com/key/scan.html
https://zhuanlan.zhihu.com/p/102045642

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐