go操作redis

go-redis安装

  1. go-redis 三方库为我们封装了很多函数来执行 Redis 命令,而 redigo 三方库只有一个 Do 函数执行 Redis 命令,更接近使用 redis-cli 操作 Redis。
  2. go-redis 支持连接哨兵及集群模式的Redis
  3. 安装命令:go get github.com/go-redis/redis/v8

官方文档:https://redis.uptrace.dev/zh/guide/#%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%88%97%E8%A1%A8

连接方式

单机连接

rdb := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "", // no password set
    DB:       0,  // use default DB
})

TLS连接模式

rdb := redis.NewClient(&redis.Options{
	TLSConfig: &tls.Config{
		MinVersion: tls.VersionTLS12,
		// Certificates: []tls.Certificate{cert},
    // ServerName: "your.domain.com",
	},
})

Redis 集群连接

rdb := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},

    // 若要根据延迟或随机路由命令,请启用以下命令之一
    // RouteByLatency: true,
    // RouteRandomly: true,
})

Redis 哨兵模式连接

rdb := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "master-name",
    SentinelAddrs: []string{":9126", ":9127", ":9128"},
})

基本使用

// doCommand go-redis基本使用示例
func doCommand() {
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	// 执行命令获取结果
	val, err := rdb.Get(ctx, "key").Result()
	fmt.Println(val, err)

	// 先获取到命令对象
	cmder := rdb.Get(ctx, "key")
	fmt.Println(cmder.Val()) // 获取值
	fmt.Println(cmder.Err()) // 获取错误

	// 直接执行命令获取错误
	err = rdb.Set(ctx, "key", 10, time.Hour).Err()

	// 直接执行命令获取值
	value := rdb.Get(ctx, "key").Val()
	fmt.Println(value)
}

go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。

执行任意命令

go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行:

// doDemo rdb.Do 方法使用示例
func doDemo() {
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	// 直接执行命令获取错误
	err := rdb.Do(ctx, "set", "key", 10, "EX", 3600).Err()
	fmt.Println(err)

	// 执行命令获取结果
	val, err := rdb.Do(ctx, "get", "key").Result()
	fmt.Println(val, err)
}

redis.Nil

go-redis 库提供了一个 redis.Nil 错误来表示 Key 不存在的错误。因此在使用 go-redis 时需要注意对返回错误的判断。

在某些场景下应该区别处理 redis.Nil 和其他不为 nil 的错误。

// getValueFromRedis redis.Nil判断
func getValueFromRedis(key, defaultValue string) (string, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	val, err := rdb.Get(ctx, key).Result()
	if err != nil {
		// 如果返回的错误是key不存在
		if errors.Is(err, redis.Nil) {
			return defaultValue, nil
		}
		// 出其他错了
		return "", err
	}
	return val, nil
}

zset操作

// zsetDemo 操作zset示例
func zsetDemo() {
	// key
	zsetKey := "language_rank"
	// value
	languages := []*redis.Z{
		{Score: 90.0, Member: "Golang"},
		{Score: 98.0, Member: "Java"},
		{Score: 95.0, Member: "Python"},
		{Score: 97.0, Member: "JavaScript"},
		{Score: 99.0, Member: "C/C++"},
	}
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	// ZADD
	err := rdb.ZAdd(ctx, zsetKey, languages...).Err()
	if err != nil {
		fmt.Printf("zadd failed, err:%v\n", err)
		return
	}
	fmt.Println("zadd success")

	// 把Golang的分数加10
	newScore, err := rdb.ZIncrBy(ctx, zsetKey, 10.0, "Golang").Result()
	if err != nil {
		fmt.Printf("zincrby failed, err:%v\n", err)
		return
	}
	fmt.Printf("Golang's score is %f now.\n", newScore)

	// 取分数最高的3个
	ret := rdb.ZRevRangeWithScores(ctx, zsetKey, 0, 2).Val()
	for _, z := range ret {
		fmt.Println(z.Member, z.Score)
	}

	// 取95~100分的
	op := &redis.ZRangeBy{
		Min: "95",
		Max: "100",
	}
	ret, err = rdb.ZRangeByScoreWithScores(ctx, zsetKey, op).Result()
	if err != nil {
		fmt.Printf("zrangebyscore failed, err:%v\n", err)
		return
	}
	for _, z := range ret {
		fmt.Println(z.Member, z.Score)
	}
}

扫描或遍历所有key

获取所有key:

vals, err := rdb.Keys(ctx, "prefix*").Result()

但是如果需要扫描数百万的 key ,那速度就会比较慢,这种场景下可以使用Scan 命令来遍历所有符合要求的 key:

// scanKeysDemo2 按前缀扫描key示例
func scanKeysDemo2() {
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()
	// 按前缀扫描key
	iter := rdb.Scan(ctx, 0, "prefix:*", 0).Iterator()
	for iter.Next(ctx) {
		fmt.Println("keys", iter.Val())
	}
	if err := iter.Err(); err != nil {
		panic(err)
	}
}

可以写出一个将所有匹配指定模式的 key 删除的示例:

// delKeysByMatch 按match格式扫描所有key并删除
func delKeysByMatch(match string, timeout time.Duration) {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	iter := rdb.Scan(ctx, 0, match, 0).Iterator()
	for iter.Next(ctx) {
		err := rdb.Del(ctx, iter.Val()).Err()
		if err != nil {
			panic(err)
		}
	}
	if err := iter.Err(); err != nil {
		panic(err)
	}
}

此外,对于 Redis 中的 set、hash、zset 数据类型,go-redis 也支持类似的遍历方法。

iter := rdb.SScan(ctx, "set-key", 0, "prefix:*", 0).Iterator()
iter := rdb.HScan(ctx, "hash-key", 0, "prefix:*", 0).Iterator()
iter := rdb.ZScan(ctx, "sorted-hash-key", 0, "prefix:*", 0).Iterator(

Pipeline

Redis Pipeline 允许通过使用单个 client-server-client 往返执行多个命令来提高性能。区别于一个接一个地执行100个命令,你可以将这些命令放入 pipeline 中,然后使用1次读写操作像执行单个命令一样执行它们。这样做的好处是节省了执行命令的网络往返时间(RTT)。

在下面的示例代码中演示了使用 pipeline 通过一个 write + read 操作来执行多个命令:

pipe := rdb.Pipeline()

incr := pipe.Incr(ctx, "pipeline_counter")
pipe.Expire(ctx, "pipeline_counter", time.Hour)

cmds, err := pipe.Exec(ctx)
if err != nil {
	panic(err)
}

// 在执行pipe.Exec之后才能获取到结果
fmt.Println(incr.Val())

上面的代码相当于将以下两个命令一次发给 Redis Server 端执行,与不使用 Pipeline 相比能减少一次RTT。或者,你也可以使用Pipelined 方法,它会在函数退出时调用 Exec。

var incr *redis.IntCmd

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
	incr = pipe.Incr(ctx, "pipelined_counter")
	pipe.Expire(ctx, "pipelined_counter", time.Hour)
	return nil
})
if err != nil {
	panic(err)
}

// 在pipeline执行后获取到结果
fmt.Println(incr.Val())

可以遍历 pipeline 命令的返回值依次获取每个命令的结果。下方的示例代码中使用pipiline一次执行了100个 Get 命令,在pipeline 执行后遍历取出100个命令的执行结果。

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
	for i := 0; i < 100; i++ {
		pipe.Get(ctx, fmt.Sprintf("key%d", i))
	}
	return nil
})
if err != nil {
	panic(err)
}

for _, cmd := range cmds {
    fmt.Println(cmd.(*redis.StringCmd).Val())
}

在那些我们需要一次性执行多个命令的场景下,就可以考虑使用 pipeline 来优化。

事务

Redis 是单线程执行命令的,因此单个命令始终是原子的,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。但是,Multi/exec能够确保在multi/exec两个语句之间的命令之间没有其他客户端正在执行命令。

在这种场景我们需要使用 TxPipeline 或 TxPipelined 方法将 pipeline 命令使用 MULTI 和EXEC包裹起来:

// TxPipeline demo
pipe := rdb.TxPipeline()
incr := pipe.Incr(ctx, "tx_pipeline_counter")
pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
_, err := pipe.Exec(ctx)
fmt.Println(incr.Val(), err)

// TxPipelined demo
var incr2 *redis.IntCmd
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
	incr2 = pipe.Incr(ctx, "tx_pipeline_counter")
	pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
	return nil
})
fmt.Println(incr2.Val(), err)

上面代码相当于在一个RTT下执行了下面的redis命令:

MULTI
INCR pipeline_counter
EXPIRE pipeline_counts 3600
EXEC

Watch

通常搭配 WATCH命令来执行事务操作。从使用WATCH命令监视某个 key 开始,直到执行EXEC命令的这段时间里,如果有其他用户抢先对被监视的 key 进行了替换、更新、删除等操作,那么当用户尝试执行EXEC的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务或者放弃事务。

Watch方法接收一个函数和一个或多个key作为参数:

Watch(fn func(*Tx) error, keys ...string) error

下面的代码片段演示了 Watch 方法搭配 TxPipelined 的使用示例:

// watchDemo 在key值不变的情况下将其值+1
func watchDemo(ctx context.Context, key string) error {
	return rdb.Watch(ctx, func(tx *redis.Tx) error {
		n, err := tx.Get(ctx, key).Int()
		if err != nil && err != redis.Nil {
			return err
		}
		// 假设操作耗时5秒
		// 5秒内我们通过其他的客户端修改key,当前事务就会失败
		time.Sleep(5 * time.Second)
		_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
			pipe.Set(ctx, key, n+1, time.Hour)
			return nil
		})
		return err
	}, key)
}

将上面的函数执行并打印其返回值,如果我们在程序运行后的5秒内修改了被 watch 的 key 的值,那么该事务操作失败,返回redis: transaction failed错误。

配合redsync实现redis分布式锁

redsync是redis官网上的golang版本的分布式锁的实现!
在这里插入图片描述
首先,该包对外暴露了两个接口:Lock和Unlock。这也是锁最基本的两个操作原语。Lock接口的底层实现是代码中的acquire函数;Unlock接口的底层实现是代码中的release函数。

基于redis的setnx,实现互斥性。

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
 conn, err := pool.Get(ctx)
 if err != nil {
  return false, err
 }
 defer conn.Close()
 reply, err := conn.SetNX(m.name, value, m.expiry)
 if err != nil {
  return false, err
 }
 return reply, nil
}

在这里插入图片描述
设置过期时间,防死锁:

在setnx的时候,还设置了过期时间,过期时间的设置是为了防止死锁的产生,设置过期时间还需要注意的一点就是需要保证setnx+expire是原子操作。因为在redis 2.8版本之前,setnx+expire是两个操作;从redis 2.8版本开始,setnx才支持同时设置expire。(这个和上面未设置过期时间的场景下产生死锁的原理相似。只不过是在执行了setnx之后,还没来的及执行expire操作,进程就崩溃了。也同样会导致死锁的产生。)

value值的随机性+唯一性验证,防误删:

加锁时setnx的value值的设置。该value值的产生是通过genValueFunc函数产生的。genValueFunc函数又是在初始化Mutex对象时指定的在genValue函数中产生的,默认是genValue函数。genValue函数的功能是随机生成了一个16字节的序列,然后通过base64进行编码成字符串。如下:
在这里插入图片描述
所以,value是一个随机值。因为随机性也就产生了唯一性,或者在一定时间范围内是唯一的。其作用就是为了防止被别的进程误删。

被误删的一个前提是锁的有效期到了,锁被自动释放了。以下是一个产生锁被误删的情景。假设线程a先获取了锁。当线程a执行完业务要去释放锁的时候,正巧赶上锁的过期时间也到了,这时锁自动被释放。同时,线程b获取了锁。然后线程a又做了释放锁的操作。这时如果是直接删除锁的话,就把线程b的锁给删除掉了。如下:

在这里插入图片描述
所以,在释放锁时,不是简单的对redis的key的删除。而是增加了对value值的校验判断。如下:

在这里插入图片描述

redsync中代码的实现如下:

在这里插入图片描述
预估业务可执行时间,防获取无效锁:
在redsync的获取锁的代码中,当执行完acuquire函数后,判断是否成功获取锁还有一个时间比较的条件。如下:
在这里插入图片描述
在锁的生命周期内其实是有 获取锁的时间+漂移时间+业务执行时间三部分组成的。
在这里插入图片描述
那么留给业务的执行时间就是:过期时间 - 获取锁的时间 - redis服务器漂移时间再用 当前时间 + 留给业务的时间 就能推导出业务执行的截止时间。如果当前时间已经超过了业务运行的截止时间,那么就说明锁已经过期了(比如获取锁的时间过长),就需要释放锁,并返回加锁失败。

重试机制,提高获取锁的效率:
在redsync包中,还增加了获取锁的重试机制。代码如下:
在这里插入图片描述
首先重试增加获取锁的稳定性。在分布式系统中,由于网络延迟等原因,获取锁的操作可能会失败。等待一段时间后再进行重试可以增加系统的稳定性,从而降低系统崩溃的概率。

其次,要防止频繁重试。如果在获取锁时发生错误,立即进行重试可能导致系统频繁重试,从而导致性能下降。因此,在等待一段时间后再进行重试可以减少这种情况的发生。

多redis节点支持,保证高可用性:
redsync包为了保证获取锁的高可用性,还支持了多redis节点。如下代码:
在这里插入图片描述
m.pools是一个redis实例的数组。在实例化Mutex的时候传入的。在获取锁时,依次向所有的redis节点发送加锁请求,当获取锁的redis节点数量超过预先设定的quorum值时(一般为redis总节点的1/2)才算加锁成功。

安装使用:

go get github.com/go-redsync/redsync/v4
package main

import (
    goredislib "github.com/go-redis/redis/v8"
    "github.com/go-redsync/redsync/v4"
    "github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
    // 创建一个redis的客户端连接
    client := goredislib.NewClient(&goredislib.Options{
        Addr: "localhost:6379",
    })
    // 创建redsync的客户端连接池
    pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

    // 创建redsync实例
    rs := redsync.New(pool)

    // 通过相同的key值名获取同一个互斥锁.
    mutexname := "my-global-mutex"
    //创建基于key的互斥锁
    mutex := rs.NewMutex(mutexname)

    // 对key进行
    if err := mutex.Lock(); err != nil {
        panic(err)
    }

    // 获取锁后的业务逻辑处理.

    // 释放互斥锁
    if ok, err := mutex.Unlock(); !ok || err != nil {
        panic("unlock failed")
    }
}

如果要想基于redis的集群模式,则在创建redis的客户端连接时使用NewClusterClient函数,如下:

    // 创建一个redis集群模式的客户端连接
    client := goredislib.NewClusterClient(&goredislib.ClusterOptions{
        Addr: []string{"localhost:6379"},
    })

go-redis 官方文档中使用 GET 、SET和WATCH命令实现一个 INCR 命令的完整示例。

const routineCount = 100

increment := func(key string) error {
	txf := func(tx *redis.Tx) error {
		// 获得当前值或零值
		n, err := tx.Get(key).Int()
		if err != nil && err != redis.Nil {
			return err
		}

		// 实际操作(乐观锁定中的本地操作)
		n++

		// 仅在监视的Key保持不变的情况下运行
		_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
			// pipe 处理错误情况
			pipe.Set(key, n, 0)
			return nil
		})
		return err
	}

	for retries := routineCount; retries > 0; retries-- {
		err := rdb.Watch(txf, key)
		if err != redis.TxFailedErr {
			return err
		}
		// 乐观锁丢失
	}
	return errors.New("increment reached maximum number of retries")
}

var wg sync.WaitGroup
wg.Add(routineCount)
for i := 0; i < routineCount; i++ {
	go func() {
		defer wg.Done()

		if err := increment("counter3"); err != nil {
			fmt.Println("increment error:", err)
		}
	}()
}
wg.Wait()

n, err := rdb.Get("counter3").Int()
fmt.Println("ended with", n, err)

在这个示例中使用了 redis.TxFailedErr 来检查事务是否失败。

注意事项

在这里插入图片描述

// KeepTTL是-1
log.Fatalln(rdb.Set(context.Background(),"name","1",redis.KeepTTL))

有些版本不支持-1设置不过期,

func (c cmdable) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
	args := make([]interface{}, 3, 5)
	args[0] = "set"
	args[1] = key
	args[2] = value
	if expiration > 0 {
		if usePrecise(expiration) {
			args = append(args, "px", formatMs(ctx, expiration))
		} else {
			args = append(args, "ex", formatSec(ctx, expiration))
		}
	} else if expiration == KeepTTL {
		args = append(args, "keepttl")
	}

	cmd := NewStatusCmd(ctx, args...)
	_ = c(ctx, cmd)
	return cmd
}

通过阅读源码,当exp为0时就可以跳过exp的分支判断,就可以设置不过期。

python操作redis

连接方式

单机连接

如果要连接单个Redis节点或Redis主从模式,可以使用redis-py库,redis-py是Redis官方提供的Python库之一,用于与Redis进行交互。

安装:pip install redis

import redis

# 普通连接
redis_conn = redis.Redis(host='127.0.0.1', port= 6379, password= 'password', db= 0)

# 连接池
redis_pool = redis.ConnectionPool(host='127.0.0.1', port= 6379, password= 'password', db= 0)
redis_conn = redis.Redis(connection_pool= redis_pool)

哨兵连接

from redis.sentinel import Sentinel


sentinel_list = [
			("192.168.233.1", "26379"),
			("192.168.233.2", "26379"),
			("192.168.233.3", "26379")
		]
mySentinel = Sentinel(sentinel_list)
master = mySentinel.master_for("mymaster", db=0)
slave = mySentinel.slave_for("mymaster", db=0)

集群连接

在Python中连接Redis集群,可以使用redis-py-cluster库。redis-py-cluster是Redis官方提供的Python库之一,专门用于与Redis集群进行交互。

redis-py-cluster库提供了与Redis集群通信所需的功能,包括节点发现、故障转移、数据分片等。它基于redis-py库,但添加了对Redis集群的支持。

安装:pip install redis-py-cluster

from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
startup_nodes = [
    {'host': '10.50.132.92', 'port': 6379},
    {'host': '10.50.132.93', 'port': 6379},
    {'host': '10.50.132.94', 'port': 6379},

]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx")
client = RedisCluster(connection_pool=pool)

对于高版本的python-redis库(版本3.0及以上),它已经支持了对Redis集群的连接和操作。在这种情况下,可以直接使用python-redis库来连接Redis集群:

import redis

startup_nodes = [
    {"host": "127.0.0.1", "port": "7000"},
    {"host": "127.0.0.1", "port": "7001"},
    {"host": "127.0.0.1", "port": "7002"}
]

rc = redis.RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

Django中使用redis

方式一:直接使用redis库

import redis
POOL = redis.ConnectionPool(host='127.0.0.1',port=6379,password='1234',max_connections=1000)


from django.shortcuts import render,HttpResponse
def index(request):
    conn = redis.Redis(connection_pool=POOL)
    conn.hset('kkk','age',18)
    return HttpResponse('设置成功')
    
def order(request):
    conn = redis.Redis(connection_pool=POOL)
    conn.hget('kkk','age')
    return HttpResponse('获取成功')

方式二:配置django的cache功能

# redis配置
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        #  redis默认是6379端口,第0的数据库,这里选择第5个数据库,
        "LOCATION": "redis://127.0.0.1:6379/5",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}
# 使用django_redis的get_redis_connect()获取连接
from django_redis import get_redis_connection
conn = get_redis_connection('default')

# 使用cache获取连接
from django.core.cache import cache
cache.get("key")

方式三:哨兵

CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://sentinel_host:26379',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis_sentinel.SentinelCache',
            'SENTINELS': [
                ('sentinel_host', 26379),  # 哨兵连接信息
                ('sentinel_host', 26380),  # 可以添加多个哨兵连接信息
            ],
            'SERVICE_NAME': 'mymaster',  # Redis主节点的服务名称
        }
    }
}

方式四:集群

CACHES = {
  'default': {
    'BACKEND': 'django_redis.cache.RedisCache',
    'LOCATION': [
                "redis://10.10.27.222:6380",
                "redis://10.10.27.222:6381",
                "redis://10.10.27.222:6382",
                "redis://10.10.27.222:6383",
                "redis://10.10.27.222:6384",
                "redis://10.10.27.222:6385",
        ],# redis://IP:PORT/db_index,数据库编号可为空,默认为0号
    'OPTIONS': {
       # 或 "CLIENT_CLASS": "django_redis.client.DefaultClient",
      'CLIENT_CLASS': 'rediscluster.RedisCluster',
      'CONNECTION_POOL_CLASS': 'rediscluster.connection.ClusterConnectionPool',
      "PASSWORD": "thisisacode",
      'CONNECTION_POOL_KWARGS': {
        'skip_full_coverage_check': True # AWS ElasticCache has disabled CONFIG commands
      }
    }
  }
}

Q&A

Redis中的pipeline和事务的区别是什么?

简单来讲:

  1. pipeline是客户端的行为,对于服务器来说是透明的,可以认为服务器无法区分客户端发送来的查询命令是以普通命令的形式还是以pipeline的形式发送到服务器的;

  2. 事务则是实现在服务器端的行为,用户执行MULTI命令时,服务器会将对应这个用户的客户端对象设置为一个特殊的状态,在这个状态下后续用户执行的查询命令不会被真的执行,而是被服务器缓存起来,直到用户执行EXEC命令为止,服务器会将这个用户对应的客户端对象中缓存的命令按照提交的顺序依次执行。

详细来讲:

  1. 在TCP连接之中,系统会在内核之中为每一条TCP连接分配属于它的输入缓冲区以及输出缓冲区。数据的发送方可以通过write系统调用将数据发送到连接之上,而当连接上有数据到达时,会先写入内核之中的输入缓冲区里,然后向应用程序通知这个连接当前可读。此时应用程序可以调用read系统调用将输入缓冲区之中的数据读取到用户空间的内存之中,进行解析与处理。

  2. 发送方发送的数据过大,会导致数据无法一次性地被发送到网络对端,这是就需要应用层缓冲区的概念了。当发送较大数据时,其中一部分数据会通过TCP发送到网络对端,对端应用程序调用read系统调用将数据从内核缓冲区转移到应用层缓冲区。此时被清空的内核缓冲区将有足够的空间来接收发送方的剩余数据。

  3. 具体到Redis之中,服务器处理客户端的查询命令是遵从如下的逻辑进行的:

    1. 用户的查询命令通过TCP连接发送给Redis服务器。
    2. Redis服务器接收到数据时,在该连接上触发可读事件,并从事件循环之中跳出。
    3. 服务器通过read系统调用将内核缓冲区之中的数据一次性地读入到该连接所对应的客户端对象的应用层缓冲区之中。
    4. Redis服务器会循环解析并处理应用层缓冲区之中的数据,将其解析成Redis命令,并执行查询逻辑,直到缓冲区中剩余的数据无法被解析成完整的Redis命令或者缓冲区之中的数据已被全部解析完成。
    5. 完成一个客户端对象数据的处理,Redis会继续应用步骤3、4中的逻辑处理其他客户端之中的内容。
  4. 在上面的步骤1之中,如果用户一次性地将多条查询命令发送到网络上,而不是收到一条的返回之后再发送第二条数据;那么在步骤4之中,服务器将一次性的处理这些命令,并且不会被其他用户的命令所打断。这种方式其实就是pipeline的机制,应用pipeline可以提服务器的吞吐能力,并提高Redis处理查询请求的能力。

  5. 但是这里存在一个问题,当通过pipeline提交的查询命令数据较少,可以被内核缓冲区所容纳时,Redis可以保证这些命令执行的原子性。然而一旦数据量过大,超过了内核缓冲区的接收大小,那么命令的执行将会被打断,原子性也就无法得到保证。

  6. 因此pipeline只是一种提升服务器吞吐能力的机制,如果想要命令以事务的方式原子性的被执行,那么请使用MULTI/EXEC的事务机制,或者使用更高级的脚本功能以及模块功能。

  7. 需要批量的将数据写入redis,允许一定比例的写入失败,那么这种场景就可以使用pipline了,比如10000条一下进入redis,可能失败了2条没关系,后期有补偿机制就行了,比如短信群发这种场景。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐