一、压缩列表

1.介绍

压缩列表(ziplist)是列表键和哈希键的底层实现之一。当一个列表键只包含列表项,并且每个列表要么就是长度较短的字符串,要么就是小整数值,那么redis就会使用压缩列表作为列表键的底层实现。

压缩列表是redis为了节约而开发的,是一系列由特殊编码的连续内存块组成的顺序型数据结构。一个压缩列表可以包含任意多个节点,每个节点包含一个字节数组或者一个整数值。

压缩列表的布局<zlbytes><zltail><zllen><entry><entry><zlen>
其中zlbytes记录整个压缩列表占用的内存字节数,再对压缩列表进行内存重分配,或者计算zlend的位置时使用

zltail:记录压缩列表表尾节点距离压缩列表的起始地址有多少字节,通过这个偏移量,程序无需遍历整个压缩列表就可以确定表尾节点的地址
zllen:记录压缩列表包含的数量,小于65535时,这个属性的值就是压缩列表包含节点的数量;当这个值等于uintx_max,节点的真实数量需要遍历整个压缩列表才能计算得出。

entryX列表节点,压缩列表包含的各个节点,节点的长度由节点保存的内容决定。
zlend,特殊值oxFF,用于标记压缩列表的末端。

2.压缩列表节点的构成

typedef struct zlentry {

    // prevrawlen :前置节点的长度
    // prevrawlensize :编码 prevrawlen 所需的字节大小
    unsigned int prevrawlensize, prevrawlen;

    // len :当前节点值的长度
    // lensize :编码 len 所需的字节大小
    unsigned int lensize, len;

    // 当前节点 header 的大小
    // 等于 prevrawlensize + lensize
    unsigned int headersize;

    // 当前节点值所使用的编码类型
    unsigned char encoding;

    // 指向当前节点的指针
    unsigned char *p;

} zlentry;

每个压缩列表可以保存一个字节数组或者一个整数值,其中,字节数组可以是以下三种长度之一:

  • 长度小于等于63的字节数组
  • 长度小于等于16383的字节数组
  • 长度小于等于 2 32 − 1 2^{32}-1 2321 的字节数组

而整数可以是以下六种长度之一:

  • 4位长,介于0到12之间的无符号整数
  • 1字节长的有符号整数
  • 3字节长的有符号整数
  • int16_t类型整数
  • int32_t类型整数
  • int64_t类型整数

节点的prevrawlen属性以字节为单位,记录了压缩列表中前一个节点的长度。previous_entry_length属性的长度可以是1字节或者是5字节。如果前一节点的长度小于254字节,那么就是1字节,如果大于254字节,那就是5字节。

encoding 记录了节点content属性的类型以及长度

  • 一字节、两字节或者五字节长,值的最高位为00、01、或者是10的是字节数组编码:这种编码表示节点的content属性保存着字节数组,数组的长度由除去最高两位之后的其它位记录。
  • 一字节长,值的最高位以11开头的是整数编码,这种鞭名马表示节点的content属性保存着整数值,整数值的类型的长度由除去最高两位之后的其他位记录。

二、API

//创建一个压缩列表
unsigned char *ziplistNew(void);  
//创建一个包含给定值的节点,并将这个节点插入到给给定的压缩列表里
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where);
//返回压缩列表给点索引上的节点
unsigned char *ziplistIndex(unsigned char *zl, int index);
//返回给定节点的下一个节点
unsigned char *ziplistNext(unsigned char *zl, unsigned char *p);
//返回给定节点的上一个节点
unsigned char *ziplistPrev(unsigned char *zl, unsigned char *p);
//获取给定节点所保存的值
unsigned int ziplistGet(unsigned char *p, unsigned char **sval, unsigned int *slen, long long *lval);
//将包含给定值的新节点插入到给点节点之后
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen);
//从压缩列表删除指定的节点
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p);
//删除指定范围的的节点
unsigned char *ziplistDeleteRange(unsigned char *zl, unsigned int index, unsigned int num);

unsigned int ziplistCompare(unsigned char *p, unsigned char *s, unsigned int slen);
//查找包含给定值的节点
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip);
//返回压缩列表包含的节点数量
unsigned int ziplistLen(unsigned char *zl);
//返回压缩列表包含的内存字节数
size_t ziplistBlobLen(unsigned char *zl);

三、源码

1.ziplistNew

//创建一个新的压缩列表,并初始化属性
unsigned char *ziplistNew(void) {

    // ZIPLIST_HEADER_SIZE 是 ziplist 表头的大小
    // 1 字节是表末端 ZIP_END 的大小
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;

    // 为表头和表末端分配空间
    unsigned char *zl = zmalloc(bytes);

    // 初始化表属性
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;

    // 设置表末端
    zl[bytes-1] = ZIP_END;

    return zl;
}

2.ziplistPush

//将字符串s推入到zl中
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {

    // 根据 where 参数的值,决定将值推入到表头还是表尾
    unsigned char *p;
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);

    // 返回添加新值后的 ziplist
    // T = O(N^2)
    return __ziplistInsert(zl,p,s,slen);
}

static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    // 记录当前 ziplist 的长度
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; 
    zlentry entry, tail;

    if (p[0] != ZIP_END) {
        // 如果 p[0] 不指向列表末端,说明列表非空,并且 p 正指向列表的其中一个节点
        // 那么取出 p 所指向节点的信息,并将它保存到 entry 结构中
        // 然后用 prevlen 变量记录前置节点的长度
        // (当插入新节点之后 p 所指向的节点就成了新节点的前置节点)
        // T = O(1)
        entry = zipEntry(p);
        prevlen = entry.prevrawlen;
    } else {
        // 如果 p 指向表尾末端,那么程序需要检查列表是否为:
        // 1)如果 ptail 也指向 ZIP_END ,那么列表为空;
        // 2)如果列表不为空,那么 ptail 将指向列表的最后一个节点。
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            // 表尾节点为新节点的前置节点

            // 取出表尾节点的长度
            // T = O(1)
            prevlen = zipRawEntryLength(ptail);
        }
    }

    // 尝试看能否将输入字符串转换为整数,如果成功的话:
    // 1)value 将保存转换后的整数值
    // 2)encoding 则保存适用于 value 的编码方式
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    // 计算编码前置节点的长度所需的大小
    // T = O(1)
    reqlen += zipPrevEncodeLength(NULL,prevlen);
    // 计算编码当前节点值所需的大小
    // T = O(1)
    reqlen += zipEncodeLength(NULL,encoding,slen);

    // 只要新节点不是被添加到列表末端,
    // 那么程序就需要检查看 p 所指向的节点(的 header)能否编码新节点的长度。
    // nextdiff 保存了新旧编码之间的字节大小差,如果这个值大于 0 
    // 那么说明需要对 p 所指向的节点(的 header )进行扩展
    // T = O(1)
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

    /* Store offset because a realloc may change the address of zl. */
    // 因为重分配空间可能会改变 zl 的地址
    // 所以在分配之前,需要记录 zl 到 p 的偏移量,然后在分配之后依靠偏移量还原 p 
    offset = p-zl;
    // curlen 是 ziplist 原来的长度
    // reqlen 是整个新节点的长度
    // nextdiff 是新节点的后继节点扩展 header 的长度(要么 0 字节,要么 4 个字节)
    // T = O(N)
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        // 新元素之后还有节点,因为新元素的加入,需要对这些原有节点进行调整

        /* Subtract one because of the ZIP_END bytes */
        // 移动现有元素,为新元素的插入空间腾出位置
        // T = O(N)
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        // 将新节点的长度编码至后置节点
        // p+reqlen 定位到后置节点
        // reqlen 是新节点的长度
        // T = O(1)
        zipPrevEncodeLength(p+reqlen,reqlen);
    
        // 更新到达表尾的偏移量,将新节点的长度也算上
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

    
        // 如果新节点的后面有多于一个节点
        // 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中
        // 这样才能让表尾偏移量正确对齐表尾节点
        tail = zipEntry(p+reqlen);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        // 新元素是新的表尾节点
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    // 当 nextdiff != 0 时,新节点的后继节点的(header 部分)长度已经被改变,
    // 所以需要级联地更新后续的节点
    if (nextdiff != 0) {
        offset = p-zl;
        // T  = O(N^2)
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }


    // 一切搞定,将前置节点的长度写入新节点的 header
    p += zipPrevEncodeLength(p,prevlen);
    // 将节点值的长度写入新节点的 header
    p += zipEncodeLength(p,encoding,slen);
    // 写入节点值
    if (ZIP_IS_STR(encoding)) {
        // T = O(N)
        memcpy(p,s,slen);
    } else {
        // T = O(1)
        zipSaveInteger(p,value,encoding);
    }

    // 更新列表的节点数量计数器
    ZIPLIST_INCR_LENGTH(zl,1);

    return zl;
}

3.ziplistIndex

unsigned char *ziplistIndex(unsigned char *zl, int index) {

    unsigned char *p;

    zlentry entry;

    // 处理负数索引
    if (index < 0) {

        // 将索引转换为正数
        index = (-index)-1;
        
        // 定位到表尾节点
        p = ZIPLIST_ENTRY_TAIL(zl);

        // 如果列表不为空,那么。。。
        if (p[0] != ZIP_END) {

            // 从表尾向表头遍历
            entry = zipEntry(p);
            // T = O(N)
            while (entry.prevrawlen > 0 && index--) {
                // 前移指针
                p -= entry.prevrawlen;
                // T = O(1)
                entry = zipEntry(p);
            }
        }

    // 处理正数索引
    } else {

        // 定位到表头节点
        p = ZIPLIST_ENTRY_HEAD(zl);

        // T = O(N)
        while (p[0] != ZIP_END && index--) {
            // 后移指针
            // T = O(1)
            p += zipRawEntryLength(p);
        }
    }

    // 返回结果
    return (p[0] == ZIP_END || index > 0) ? NULL : p;
}

4.ziplistNext

unsigned char *ziplistNext(unsigned char *zl, unsigned char *p) {
    ((void) zl);

    // p 已经指向列表末端
    if (p[0] == ZIP_END) {
        return NULL;
    }

    // 指向后一节点
    p += zipRawEntryLength(p);
    if (p[0] == ZIP_END) {
        // p 已经是表尾节点,没有后置节点
        return NULL;
    }

    return p;
}

5.ziplistPrev


unsigned char *ziplistPrev(unsigned char *zl, unsigned char *p) {
    zlentry entry;
    
    // 如果 p 指向列表末端(列表为空,或者刚开始从表尾向表头迭代)
    // 那么尝试取出列表尾端节点
    if (p[0] == ZIP_END) {
        p = ZIPLIST_ENTRY_TAIL(zl);
        // 尾端节点也指向列表末端,那么列表为空
        return (p[0] == ZIP_END) ? NULL : p;
    
    // 如果 p 指向列表头,那么说明迭代已经完成
    } else if (p == ZIPLIST_ENTRY_HEAD(zl)) {
        return NULL;

    // 既不是表头也不是表尾,从表尾向表头移动指针
    } else {
        // 计算前一个节点的节点数
        entry = zipEntry(p);
        assert(entry.prevrawlen > 0);
        // 移动指针,指向前一个节点
        return p-entry.prevrawlen;
    }
}

6.ziplistGet

unsigned int ziplistGet(unsigned char *p, unsigned char **sstr, unsigned int *slen, long long *sval) {

    zlentry entry;
    if (p == NULL || p[0] == ZIP_END) return 0;
    if (sstr) *sstr = NULL;

    // 取出 p 所指向的节点的各项信息,并保存到结构 entry 中
    // T = O(1)
    entry = zipEntry(p);

    // 节点的值为字符串,将字符串长度保存到 *slen ,字符串保存到 *sstr
    // T = O(1)
    if (ZIP_IS_STR(entry.encoding)) {
        if (sstr) {
            *slen = entry.len;
            *sstr = p+entry.headersize;
        }
    
    // 节点的值为整数,解码值,并将值保存到 *sval
    // T = O(1)
    } else {
        if (sval) {
            *sval = zipLoadInteger(p+entry.headersize,entry.encoding);
        }
    }

    return 1;
}

7.ziplistDelete

unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {

    // 因为 __ziplistDelete 时会对 zl 进行内存重分配
    // 而内存充分配可能会改变 zl 的内存地址
    // 所以这里需要记录到达 *p 的偏移量
    // 这样在删除节点之后就可以通过偏移量来将 *p 还原到正确的位置
    size_t offset = *p-zl;
    zl = __ziplistDelete(zl,*p,1);

    *p = zl+offset;

    return zl;
}


8.ziplistDeleteRange

nsigned char *ziplistDeleteRange(unsigned char *zl, unsigned int index, unsigned int num) {

    // 根据索引定位到节点
    unsigned char *p = ziplistIndex(zl,index);

    // 连续删除 num 个节点
    return (p == NULL) ? zl : __ziplistDelete(zl,p,num);
}

9.ziplistcompare

unsigned int ziplistCompare(unsigned char *p, unsigned char *sstr, unsigned int slen) {
    zlentry entry;
    unsigned char sencoding;
    long long zval, sval;
    if (p[0] == ZIP_END) return 0;

    // 取出节点
    entry = zipEntry(p);
    if (ZIP_IS_STR(entry.encoding)) {

        // 节点值为字符串,进行字符串对比

        if (entry.len == slen) {
            return memcmp(p+entry.headersize,sstr,slen) == 0;
        } else {
            return 0;
        }
    } else {
        
        // 节点值为整数,进行整数对比

        if (zipTryEncoding(sstr,slen,&sval,&sencoding)) {
          // T = O(1)
          zval = zipLoadInteger(p+entry.headersize,entry.encoding);
          return zval == sval;
        }
    }

    return 0;
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐