热点数据调度 - 使用大顶堆实现优先队列

Posted by jintang on 2018-11-21

最近做一个分布式对象云存储项目,中遇到”热点数据调度”的需求,觉得有必要做个记录。

需求

  1. 定时将下载次数最多的文件,冗余一份到最热门的10个节点上
  2. 把优先调度热点提供下载

这是一个典型的优先队列应用场景。我选择大顶堆(大根堆)作为基础实现。

堆 (heap)

特性

  1. 二叉堆是一棵完全二叉树
  2. 堆中的某个节点的值总是不大于其父节点的值(最大堆,大顶堆)
  3. 相应地定义出最小堆,小顶堆

用数组存储二叉堆

堆的操作

  1. 添加节点 - Add(做Siftup调整,时间复杂度为O(logn)

    • 根据最大堆的定义,当给堆添加一个元素,由于元素的大小有可能破坏堆的定义
    • 所以添加元素不是单纯在尾部添加就好,还要在添加之后做上浮(sift up)调整
      • 比对看看当前节点的值是否比父亲节点的值大
      • 是,则与父情节点交换位置
  2. 取出节点 - ExtractMax(做shiftdown调整,时间复杂度为O(logn)

    • 当从堆顶取出元素后,左右子树就形成两个子堆
    • 为了重新组合起来,我们可以把最后一个元素取出来,暂时替代原来堆顶的位置
    • 然后从堆顶节点开始做”下浮“调整:
      • 比对看看当前值是否小于左右子节点的值
      • 是,则选出最大的子节点,与其交换

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package heap

type MaxArrayHeap struct {
data []int
}

func (mah *MaxArrayHeap) GetSize() int {
return len(mah.data)
}

func (mah *MaxArrayHeap) IsEmpty() bool {
return mah.GetSize() != 0
}

func (mah *MaxArrayHeap) Add(e int) {
mah.data = append(mah.data, e)
mah.siftUp(len(mah.data)-1)
}

func (mah *MaxArrayHeap) ExtractMax() int {
ret := mah.FindMax()
mah.swap(0, len(mah.data)-1)
mah.data = mah.data[:len(mah.data)-1]
mah.siftDown(0)
return ret
}

func (mah *MaxArrayHeap) FindMax() int {
if mah.GetSize() == 0 {
panic("Can't findMax when heap")
}
return mah.data[0]
}

func (mah *MaxArrayHeap) Replace(e int) int {
ret := mah.FindMax()
mah.data[0] = e
mah.siftDown(0)
return ret
}

func (mah *MaxArrayHeap) parent(index int) int {
if index == 0 {
panic("index-0 doesn't have parent.")
}
return (index-1)/2
}

func (mah *MaxArrayHeap) leftChild(index int) int {
return index*2+1
}

func (mah *MaxArrayHeap) rightChild(index int) int {
return index*2+2
}

func (mah *MaxArrayHeap) siftUp(index int) {
// 首先当前节点必须大于零,因为一直上浮到堆顶,index是0,则退出
// 每一次都比较当前节点是否大于父亲节点
for index > 0 && mah.data[mah.parent(index)] < mah.data[index] {

// 与父亲节点交换
mah.swap(index, mah.parent(index))

// 然后把父亲节点当成当前节点,继续上浮
index = mah.parent(index)
}
}

func (mah *MaxArrayHeap) siftDown(index int) {
for mah.leftChild(index) < len(mah.data) {

max := mah.leftChild(index) // 先让左节点左最大值
right := mah.rightChild(index)

if right < len(mah.data) {
// 比较左右节点的大小, 取最大值得索引
if mah.data[right] > mah.data[max] {
max = right
}
}

// 当当前节点的值,比左右节点中最大值得节点都大,则没必要调整了
if mah.data[index] >= mah.data[max] {
break
}

// 否则交换当前节点与左右节点中的最大值节点
mah.swap(index, max)

// 然后接续下沉
index = max
}
}

func (mah *MaxArrayHeap) swap(i, j int) {
if i < 0 || i >= len(mah.data) || j < 0 || j >= len(mah.data) {
panic("index is illegal.")
}

t := mah.data[i]
mah.data[i] = mah.data[j]
mah.data[j] = t
}

func NewMaxArrayHeap(capacity int) *MaxArrayHeap {
return &MaxArrayHeap{
data: make([]int, capacity),
}
}

func NewMaxArrayHeapFromArr(arr []int) *MaxArrayHeap {
mah := &MaxArrayHeap{ data: arr }
for i := mah.parent(len(mah.data)-1); i >= 0; i-- {
mah.siftDown(i)
}
return mah
}