redis 跳跃表
跳跃表
跳跃表skiplist 是一种有序数据结构,通过在每个节点维持多个指向其他节点的指针,从而达到快速访问的目的。
跳跃表可以在O(logn)、最坏O(N)时间复杂度下查找节点,还有顺序性操作批量处理节点。
跳跃表的实现
跳跃表由server.h/zskiplistNode(第883行)和server.h/zskiplist(893行)两个结构定义,其中zskiplistNode表示跳跃表节点,zskiplist保存跳跃表节点的信息,如节点数量,指向表头节点和表尾节点的指针等等。
zskiplistNode结构如下:
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele; // 成员. sds对象,一个元素内成员唯一,分值可相同
double score; // 分值。从小到大排序,相同的按照成员在字典中的大小排序。
struct zskiplistNode *backward; // 后退指针。指向当前节点的前一个节点。从表尾向表头遍历。
struct zskiplistLevel {
struct zskiplistNode *forward; // 前进指针。用于访问表尾方向的其他节点。当从表头向表尾遍历时,会沿着层的前进指针进行。
unsigned long span; // 跨度。记录了当前前进指针指向的节点与当前节点的距离。
} level[]; // 层。每层有两个属性,前进指针和跨度。
} zskiplistNode;
1.层。level数组包含多个元素,每个元素是包含执向其他节点的指针和到该节点的跨度,通过层能加快访问其他节点的速度,层越多访问速度越快。 每次创建节点时,根据幂次定律(power law,越大的数出现几率越小)随机生成一个1~32之间的值作为数组的大小,即层的高度。
2.前进指针。用于访问表尾方向的其他节点。当从表头向表尾遍历时,会沿着层的前进指针进行。图5-3 用虚线表示从表头向表尾进行遍历
3.跨度。level[i].span
属性记录两节点之间的距离。
- 两个节点的跨度越大,距离越远
- 指向NULL的前进指针跨度为0,因为没有连接节点
跨度的作用和遍历操作无关,跨度的作用是计算排位的:将访问过的节点的跨度加起来,就是目标节点在跳跃表中的排位。图5-4用虚线表示了查找分值为3.0、对象为o3的节点时,经过的层:一层且跨度为3,所以该节点的排位是3.
4.后退指针。节点的backward
属性。用于从表尾向表头访问,和前进指针不同,每个节点只有一个后退指针,所以不同跳过多个节点,只能一个一个遍历。图5-6展示了后退指针遍历过程:首先通过跳跃表的tail
指针访问表尾节点,然后通过后退指针往前遍历,直到遇到NULL的后退指针。
zskiplist结构如下:
typedef struct zskiplist {
struct zskiplistNode *header, *tail; // 指向表头和表尾
unsigned long length; // 跳跃表的长度,即当前节点的数量,不包含表头节点
int level; // 记录表内层数最大的节点的层数,不包含表头节点
} zskiplist;
跳跃表和平衡树的比较
- skiplist和各种平衡树(如AVL、红黑树等)的元素是有序排列的,而哈希表不是有序的。因此,在哈希表上只能做单个key的查找,不适宜做范围查找。所谓范围查找,指的是查找那些大小在指定的两个值之间的所有节点。
- 在做范围查找的时候,平衡树比skiplist操作要复杂。在平衡树上,我们找到指定范围的小值之后,还需要以中序遍历的顺序继续寻找其它不超过大值的节点。如果不对平衡树进行一定的改造,这里的中序遍历并不容易实现。而在skiplist上进行范围查找就非常简单,只需要在找到小值之后,对第1层链表进行若干步的遍历就可以实现。
- 平衡树的插入和删除操作可能引发子树的调整,逻辑复杂,而skiplist的插入和删除只需要修改相邻节点的指针,操作简单又快速。
- 从内存占用上来说,skiplist比平衡树更灵活一些。一般来说,平衡树每个节点包含2个指针(分别指向左右子树),而skiplist每个节点包含的指针数目平均为1/(1-p),具体取决于参数p的大小。如果像Redis里的实现一样,取p=1/4,那么平均每个节点包含1.33个指针,比平衡树更有优势。
- 查找单个key,skiplist和平衡树的时间复杂度都为O(logn),大体相当;而哈希表在保持较低的哈希值冲突概率的前提下,查找时间复杂度接近O(1),性能更高一些。所以我们平常使用的各种Map或dictionary结构,大都是基于哈希表实现的。
- 从算法实现难度上来比较,skiplist比平衡树要简单得多。
跳跃表API
插入
跳跃表插入节点通过zslInsert
方法,将包含指定成员和分值的新节点添加到跳跃表。
过程:
具体实现:
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
//在跳表中寻找合适的位置并插入元素
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
//获取元素所在的随机层数
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
//为新创建的元素创建数据节点
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
//获取层数
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
//最大层数定义
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
查找
跳跃表可以是查找给定节点的排位,也可以是查找给定排位上的节点。
查找过程如图:
查找给定节点的排位:
zslGetRank
. 如果不存在,返回0;
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}
查找给定排位上的节点:
zslGetElementByRank
如果没找到,返回NULL
/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}