Easy SortedSet

概念

我们有一堆数据,希望能够支持快速的插入、删除、查询以及范围查询
我们都知道数组进行排序后,可以通过二分查找的方式快速定位数据,时间复杂度是$O(LogN)$,但是如果数组经常发生插入、删除操作,就需要进行大量平移,时间复杂度是$O(N)$。
那用平衡树呢?AVL树、红黑树都是经过深度优化的动态数据结构,他们的插入删除查找都可以在$O(LogN)$的时间内完成,但是他们实现起来较为复杂,且对于范围查询,还需要进行中序遍历,但至少比数组好多了。
那链表呢,链表的插入删除是很方便,但关键查询呢,它能进行二分查找吗?链表无法随机访问,怎么二分?很显然,直接对普通链表进行二分查找是困难的。1990年,Pugh发表论文《Skip Lists: A Probabilistic Alternative to Balanced Trees》,创造性的提出了自己的解决方案:”如果不能在节点间跳跃,那就建造可以跳跃的桥梁吧!”,跳跃表(SkipList)正式被提出。
跳跃表是一种概率性的有序数据结构,通过建立多级索引来加速查找,可以看作是在链表基础上加了“快速通道”,变相了实现对链表的二分查找,并且,范围查找也能够很轻易的完成,只要找到目标,就可以沿着链条依次访问。
游戏里排行榜一般是采用Redis Sorted Set,它在数据规模较大时的底层数据结构就是跳跃表+哈希表。

  • 跳跃表:天然支持有序插入、删除和范围查询,时间复杂度O(log n)
  • 哈希表:用于快速查找特定用户的当前位置和分数(O(1))

示意图跳跃表的每个节点拥有一个前序指针和多层后继指针。每层后继指针都独立成链,每个节点在插入时,都会按照概率分配给它多少层后继指针(至少一层)。类似抛硬币,落到正面,就给它加一层后继指针,然后继续抛硬币;落到反面就结束。当然,硬币正面的概率不一定是50%(实际有论文提出,包括业内经验来看,25%的概率是对性能和内存占用权衡后的得到的较优选择)。实际上,作者最初考虑过确定性跳表,即第0层是全连接,第1层是隔1个连接,第2层是隔3个连接…但是这样会导致插入和删除需要进行大量调整,复杂度变高,从而引入随机性,不仅简化了插入和删除的操作,并且统计上仍然是接近二分的结构。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
// 简化的 SortedSet 实现,基于跳跃表
public class SimpleSortedSet<T> where T : IComparable<T>
{
private class SkipListNode
{
public T Value { get; set; }
public double Score { get; set; }
public SkipListNode Backward { get; set; } // 后退指针
public SkipListNode[] Forward { get; set; } // 前进指针数组
public int[] Span { get; set; } // 跨度数组

public SkipListNode(T value, double score, int level)
{
Value = value;
Score = score;
Forward = new SkipListNode[level];
Span = new int[level];
Backward = null;
}
}

private const int MAX_LEVEL = 32; // 最大层数
private const double PROBABILITY = 0.5; // 层数增长概率(论文和实践都表明0.25或许是更好的选择)

private SkipListNode _header; // 头节点(纯虚拟
private SkipListNode _tail; // 尾节点(指向真实存在的
private int _level; // 当前最大层数
private int _count; // 元素数量
private Random _random; // 随机数生成器
private Dictionary<T, double> _dict; // 字典:Value -> Score

public SimpleSortedSet()
{
_random = new Random();
_dict = new Dictionary<T, double>();
_level = 1;
_count = 0;

// 初始化头节点(分数为最小值)
_header = new SkipListNode(default, double.MinValue, MAX_LEVEL);
_tail = null;

// 初始化头节点的跨度
for (int i = 0; i < MAX_LEVEL; i++)
{
_header.Forward[i] = null;
_header.Span[i] = 0;
}
}

public bool Add(T value, double score)
{
// 如果已存在,返回false(简化实现,不支持更新)
if (_dict.ContainsKey(value))
return false;

SkipListNode[] update = new SkipListNode[MAX_LEVEL];
int[] rank = new int[MAX_LEVEL];
SkipListNode current = _header;

// 从最高层开始查找插入位置
for (int i = _level - 1; i >= 0; i--)
{
rank[i] = (i == _level - 1) ? 0 : rank[i + 1];

while (current.Forward[i] != null &&
(current.Forward[i].Score < score ||
(current.Forward[i].Score == score &&
current.Forward[i].Value.CompareTo(value) < 0)))
{
rank[i] += current.Span[i];
current = current.Forward[i];
}
update[i] = current;
}

// 随机生成层数
int newLevel = RandomLevel();
if (newLevel > _level)
{
for (int i = _level; i < newLevel; i++)
{
rank[i] = 0; // 新节点是通过虚拟head直接跳过来的,所以前面的累加span是0
update[i] = _header;
update[i].Span[i] = _count; // 后面会处理,新节点会进行中间截断
}
_level = newLevel;
}

// 创建新节点
SkipListNode newNode = new SkipListNode(value, score, newLevel);

// 更新指针和跨度
for (int i = 0; i < newLevel; i++)
{
// 普通的链表插入
newNode.Forward[i] = update[i].Forward[i];
update[i].Forward[i] = newNode;
// 更新跨度
newNode.Span[i] = update[i].Span[i] - (rank[0] - rank[i]);
update[i].Span[i] = (rank[0] - rank[i]) + 1;
}

// 更新更高层的跨度
for (int i = newLevel; i < _level; i++)
update[i].Span[i]++;

// 更新后退指针
newNode.Backward = (update[0] == _header) ? null : update[0];
if (newNode.Forward[0] != null)
newNode.Forward[0].Backward = newNode;
else
_tail = newNode; // 如果是最后一个节点,更新尾指针

_count++;
_dict[value] = score;
return true;
}

public bool Remove(T value)
{
if (!_dict.TryGetValue(value, out var score))
return false;

SkipListNode[] update = new SkipListNode[MAX_LEVEL];
SkipListNode current = _header;

for (int i = _level - 1; i >= 0; --i)
{
while (current.Forward[i] != null &&
(current.Forward[i].Score < score ||
(current.Forward[i].Score == score &&
current.Forward[i].Value.CompareTo(value) < 0)))
{
current = current.Forward[i];
}
update[i] = current;
}

var deleteNode = update[0].Forward[0];
if (deleteNode == null || !deleteNode.Value.Equals(value) || Math.Abs(deleteNode.Score - score) > 1e-10)
return false;

// 更新各层的指针和跨度
for (int i = 0; i < _level; ++i)
{
// 普通链表删除操作
if (update[i].Forward[i] == deleteNode)
{
update[i].Span[i] += (deleteNode.Span[i] - 1); // +=
update[i].Forward[i] = deleteNode.Forward[i];
}
// 该层链表不包含,仅需要改变span
else if (update[i].Span[i] > 0)
{
--update[i].Span[i];
}
}

// 更新后退指针
if (deleteNode.Forward[0] != null)
deleteNode.Forward[0].Backward = deleteNode.Backward;
else
_tail = deleteNode.Backward;

// 更新最高层数(循环)
while (_level > 1 && _header.Forward[_level - 1] == null)
--_level;

--_count;
_dict.Remove(value);
return true;
}

// 获取元素的排名(从0开始,分数从低到高)
public int? GetRank(T value)
{
if (!_dict.TryGetValue(value, out double score))
return null;

SkipListNode current = _header;
int rank = 0;

for (int i = _level - 1; i >= 0; i--)
{
while (current.Forward[i] != null &&
(current.Forward[i].Score < score ||
(current.Forward[i].Score == score &&
current.Forward[i].Value.CompareTo(value) <= 0)))
{
rank += current.Span[i];
current = current.Forward[i];
}

if (current.Value != null && current.Value.Equals(value))
return rank;
}
return null;
}

// 获取逆序排名(从0开始,分数从高到低)
public int? GetReverseRank(T value)
{
int? rank = GetRank(value);
return rank.HasValue ? _count - rank.Value + 1 : null;
}

// 获取前N名(分数从高到低)
public List<T> GetTopN(int n)
{
if (n <= 0) return new List<T>();
if (n > _count) n = _count;

List<T> result = new List<T>();
SkipListNode current = _tail;

// 从尾部向前遍历
for (int i = 0; i < n && current != null; i++)
{
result.Add(current.Value);
current = current.Backward;
}

return result;
}

// 获取分数范围内的元素(利用跳跃表特性,O(log N + M))
public List<T> GetRangeByScore(double minScore, double maxScore)
{
if (minScore > maxScore) return new List<T>();

List<T> result = new();
SkipListNode current = _header;

// 步骤1:使用高层索引快速定位到第一个 >= minScore 的节点 (O(log N))
for (int i = _level - 1; i >= 0; i--)
{
while (current.Forward[i] != null && current.Forward[i].Score < minScore)
{
current = current.Forward[i];
}
}

// 现在 current 是最后一个 < minScore 的节点,下一个就是第一个 >= minScore 的节点
current = current.Forward[0];

// 步骤2:从定位点开始顺序遍历,直到超过 maxScore (O(M))
while (current != null && current.Score <= maxScore)
{
result.Add(current.Value);
current = current.Forward[0];
}

return result;
}

public List<T> GetRangeByRank(int start, int end)
{
var ret = new List<T>();
if (start > end)
return ret;
if (start <= 0) start = 1;
if (end > _count) end = _count;

var rank = _count - start + 1;
var current = _header;
for (int i = _level - 1; i >= 0; i--)
{
while (current.Forward[i] != null && current.Span[i] <= rank)
{
rank -= current.Span[i];
current = current.Forward[i];
}

if (rank == 0)
break;
}

var cnt = end - start + 1;
while (cnt-- > 0 && current != null)
{
ret.Add(current.Value);
current = current.Backward;
}
return ret;
}

// 随机生成节点层数(抛硬币算法)
private int RandomLevel()
{
int level = 1;
while (_random.NextDouble() < PROBABILITY && level < MAX_LEVEL)
++level;
return level;
}

// 打印跳跃表结构(用于调试)
public void PrintStructure()
{
Console.WriteLine($"跳跃表结构 (元素数量: {_count}, 最大层数: {_level}):");

for (int i = _level - 1; i >= 0; i--)
{
StringBuilder sb = new();
sb.Append($"Layer{i}\t");
SkipListNode current = _header;
while (current != null)
{
sb.Append($"{current.Span[i]}\t");
if (current.Forward[i] != null)
sb.Append(new string('\t', current.Span[i] - 1));
else
sb.Append(new string('\t', current.Span[i]));
current = current.Forward[i];
}
Console.WriteLine(sb);

}
var cur = _header;
string o = "Data\t";
string s = "Score\t";
while (cur != null)
{
if (cur.Value == null)
{
o += "Head\t";
s += "Head\t";
}
else
{
o += cur.Value.ToString() + '\t';
s += cur.Score.ToString() + '\t';
}
cur = cur.Forward[0];
}
Console.WriteLine();
Console.WriteLine(o);
Console.WriteLine(s);
}
}

代码说明

更新跨度的代码:

1
2
3
4
5
6
for (int i = 0; i < newLevel; i++)
{
...
newNode.Span[i] = update[i].Span[i] - (rank[0] - rank[i]);
update[i].Span[i] = (rank[0] - rank[i]) + 1;
}

这里的rank[0]-rank[i]需要说明下,首先每个节点里的Span[i]存储的是当前层级下,该节点到下一个节点的跨度,是一个相对跨度。但是注意的是,rank数组是一个累加和跨度,rank[i]指的是,查找该目标时,在当前层级下走过的最远累计跨度。这个累计跨度是从高层级到最低层级一步步求来的,在较高层级时,由于层级节点的跨度都比较大,因此只能得到粗略的跨度,但是随着层级一点点降低,这个值会在rank[0]得到最终的实际跨度。因此也就能理解了:rank[0]-rank[i]代表的是在第i层,该层的前一个节点,到新节点的相对跨度是多少。
因此,newNode.Span[i] = update[i].Span[i] - (rank[0] - rank[i]);代表,新加入的节点,它自身的跨度是多少;update[i].Span[i] = (rank[0] - rank[i]) + 1;则代表该层的前一个节点,到新节点的跨度是多少,+1是因为增加了新节点。
插入示意图