Redis的设计(1)

此系列是阅读《Redis设计与实现》(机械工业出版社)所做的笔记,还参考了其他的一些资料。笔记的内容较简略,仅供面试前临时抱佛脚使用 ^o^。

本文内容:

  1. 动态字符串实现   2. 字典的实现(2.2.2内容参考自:http://blog.csdn.net/ordeder/article/details/12836017)。列表与传统的没啥区别故省略。

动态字符串

由于 Redis 的字符串需要修改,所以自己构建了一个名为简单动态字符串(Simple Dynamic String)。SDS还被用作缓冲区AOF中。

RPUSH fruits “apple” “banana” “cherry”

值是一个列表对象,列表中每个元素都是SDS。

struct sdshrd {
     int len;  //C字符串取长度复杂度为O(n),故保存长度
     int free;
     char buff[];
}

在结尾额外添加一个”(兼容C里面的大部分字符串处理函数)。buff直接采用二进制的形式存放数据,因此 Redis 实际上可以存储任何类型的数据。由于记录了len,因此不需要”判断是否是结尾。

存储空间分配:
扩容:如果新增后小于1MB,分配新增后*2(另加上结尾的”)。如果超过1MB,分配1MB。
惰性空间释放:不主动回收未使用空间。


字典

typedef struct dict {
     dictType *type;
     void* privdata;  //保存需要传给特定函数的可选参数
     dictht ht[2];

     int rehashindx; // 标记是否正在进行rehash
} dict;

ht[2]保存2份哈希表(第二个在rehash的时候使用)。

typedef struct dictType {
     unsigned int (*hashFunction)(const void* key);
     void* (*keyDup)(void* privdata, const void* key);  //复制键的函数
     void* (*valDup)(void* privdata, const void* obj);
     int (*keyCompare)(void* privdata, const void* key1, const void* key2);
     void (*keyDestructor)(void* privdata, void* key);
     void (*valDestructor)(void* privdata, void* obj);
}

typedef struct dictht {
     dictEntry **table;
     unsigned long size;
     unsinged long sizemask;  // 用于计算索引值,总是等于size-1
     unsinged long used;
} dictht;

typedef struct dictEntry {
     void* key;
     union {
          void* val;
          uint64 _tu64;
          int64 _ts64;
     } v;
     
     struct dictEntry *next;  // 用于解决冲突的链表
} dictEntry;

struct dictEntry 结构对redis对象在hashtable上进行了很好的抽象,在rehash中,从ht[0]只修改该Hash表中的对象的dictEntry指针即可实现对象的转移(从ht[0]到ht[1])。

哈希算法

Redis 使用 MurmurHash2 算法计算。

hash = dict->type->hashFunction(key);
index = hash & dict->ht[x].sizemask;

rehash

当load factor过大或过小时,进行rehash。此时为ht[1]分配空间,将ht[0]的内容rehash到ht[1]上,迁移完毕后释放ht[0]并将ht[0]指向ht[1]。

执行扩展操作时,ht[1]的大小为第一个大于2^n的 ht[0].used * 2。

执行收缩操作时,ht[1]为第一个大于2^n ht[0].used。

如果服务器没有执行BGSAVE或者BGREWRITEAOF且负载因子大于等于1,或者在执行这些操作且负载因子大于等于5,则开始进行扩展操作。若负载因子小于0.1则开始执行收缩操作。

在执行BGSAVE或BGREWRITEAOF时,Redis 需要创建当前服务器进行的子进程,而大多操作系统都采用写时复制(copy-on-write),所在在子进程存在期间,服务器会尽可能避免不必要的内存写入操作以节约内存。

渐进式rehash

在rehash期间服务器停止响应,所以需要渐进式完成。

分配ht[1]的空间;维持一个索引计数器变量rehashidx=0;

在rehash期间,每次对字典的修改或查询操作,除了执行指定操作外,顺带将ht[0]

在rehashidx索引上的所有键值对rehash到ht[1],当rehash完成后将rehashidx加一。

所有键值对rehash完成后,rehashidx = -1。

在此过程中,字典的操作会在两个哈希表上进行。如果在ht[0]没有找到指定键会在ht[1]进行查找。新增加的键值对一律添加到ht[1],ht[0]随着操作的进行最终变成空表。

Redis 2.2.2 的实现方法:对于每个字典在1ms的限定时间内,每次转移100个元素,直到转移完成。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
     /*记录sever调用serverCron的总次数
     *对于需要协调的不同的任务,可以依据loops%n的方式设置不同的频率     */
     int loops = server.cronloops 
     。。。
     /* We don't want to resize the hash tables while a bacground saving
     * is in progress: the saving child is created using fork() that is
     * implemented with a copy-on-write semantic in most modern systems, so
     * if we resize the HT while there is the saving child at work actually
     * a lot of memory movements in the parent will cause a lot of pages
     * copied. 后台没有对数据进行操作的程序...*/
    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {
          /*loops % 10 : 
          serverCron没循环10次,进行一次tryResizeHashTables检查*/
        if (!(loops % 10)) tryResizeHashTables();
               //下面的for是tryResizeHashTables源码
               >for (j = 0; j < server.dbnum; j++) {
                    //htNeedsResize:检查used*100/size < REDIS_HT_MINFILL(设定的阈值)
                    if (htNeedsResize(server.db[j].dict)) 
                         dictResize(server.db[j].dict);//见下文分析 扩容Resize
                    if (htNeedsResize(server.db[j].expires))
                         dictResize(server.db[j].expires);
               >}
        if (server.activerehashing) 
               incrementallyRehash();//见下文分析 Rehash
    }

------------------------扩容Resize------------------------------------------

/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USER/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
    int minimal;

    if (!dict_can_resize || dictIsRehashing(d)/*rehashidx ?= -1*/) return DICT_ERR;
    minimal = d->ht[0].used;
    if (minimal < DICT_HT_INITIAL_SIZE)
        minimal = DICT_HT_INITIAL_SIZE;
     //minimal = Max(d->ht[0].used,DICT_HT_INITIAL_SIZE)
     //原来的容量为size,现要扩充到used或DICT_HT_INITIAL_SIZE
    return dictExpand(d, minimal);
}

/* Expand or create the hashtable */
int dictExpand(dict *d, unsigned long size)
{
    dictht n; /* the new hashtable */
     //将size扩展到2^n : while(1) { if (i >= size) return i; i *= 2; }
    unsigned long realsize = _dictNextPower(size);

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hashtable */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    /* Allocate the new hashtable and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* 如果是在dict的第一次申请空间?那么就直接将n赋给d->ht[0],而且不需要rehash */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;     // rehashidx! = -1; 表示d进入了rehash阶段
    return DICT_OK;
}


-------------------------Rehash-----------------------------------------
/*记得前文:serverCron作为一个定时器事件的处理函数,定时的时间为1ms
在serverCron会调用incrementallyRehash()函数去不断的完成rehash任务。
这里说“不断的"的意思是,rehash的任务不是一次性的函数调用完成的,可能需要
serverCron调用多次incrementallyRehash()来完成。
下文就是incrementallyRehash()函数和子函数的额调用关系,incrementallyRehash()
的执行限时为1ms,在这时间内,dictRehash()会以一定量任务(100)为单元进行d->ht的
转移。
*/

incrementallyRehash(void)
     遍历server.db[],对db中的dict进行rehash,每个dict的限定时间为1ms
     dictRehashMilliseconds(server.db[j].dict,1)
          while(dictRehash的执行时间<1ms)
               dictRehash(d,100)//详见下文源码
                    将dict->ht[1]取出100个元素(dictEntry) Rehash到dict->ht[0]
     

/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
* Note that a rehashing step consists in moving a bucket (that may have more
* thank one key as we use chaining) from the old to the new hash table. */

/*将依次将d->ht[1].table[]中的元素搬到d->ht[0].table[],修改相关的used。
d->rehashidx:记录着当前的hashtable中搬了那一条链表的索引下标
d->ht[0].table[d->rehashidx] => d->ht[0].table[d->rehashidx]                    
完成一个dict的Rehash后,重置d->rehashidx = -1 */
int dictRehash(dict *d, int n) {
    if (!dictIsRehashing(d)) return 0;

    while(n--) {
        dictEntry *de, *nextde;

        /* Check if we already rehashed the whole table... */
          //d->ht[0].used == 0 : 说明d->ht[0] 已经全部搬到 d->ht[1]
        if (d->ht[0].used == 0) {
            zfree(d->ht[0].table);
            d->ht[0] = d->ht[1];
            _dictReset(&d->ht[1]);
            d->rehashidx = -1;     //该dict的Rehash结束 设置为 -1
            return 0;
        }

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            unsigned int h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;     ////////
            d->ht[1].used++;     ////////
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    return 1;
}
✏️ 有任何想法?欢迎发邮件告诉老夫:daozhihun@outlook.com