最近做一个分布式对象云存储项目,中遇到”热点数据调度”的需求,觉得有必要做个记录。
需求
定时将下载次数最多的文件,冗余一份到最热门的10个节点上
把优先调度热点提供下载
这是一个典型的优先队列应用场景。我选择大顶堆(大根堆)作为基础实现。
堆 (heap) 特性
二叉堆是一棵完全二叉树
堆中的某个节点的值总是不大于其父节点的值(最大堆,大顶堆)
相应地定义出最小堆,小顶堆
用数组存储二叉堆
堆的操作
添加节点 - Add(做Siftup调整,时间复杂度为O(logn) )
根据最大堆的定义,当给堆添加一个元素,由于元素的大小有可能破坏堆的定义
所以添加元素不是单纯在尾部添加就好,还要在添加之后做上浮(sift up)调整
比对看看当前节点的值是否比父亲节点的值大
是,则与父情节点交换位置
取出节点 - ExtractMax(做shiftdown调整,时间复杂度为O(logn) )
当从堆顶取出元素后,左右子树就形成两个子堆
为了重新组合起来,我们可以把最后一个元素取出来,暂时替代原来堆顶的位置
然后从堆顶节点开始做”下浮“调整:
比对看看当前值是否小于左右子节点的值
是,则选出最大的子节点,与其交换
实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 package heaptype MaxArrayHeap struct { data []int } func (mah *MaxArrayHeap) GetSize () int { return len (mah.data) } func (mah *MaxArrayHeap) IsEmpty () bool { return mah.GetSize() != 0 } func (mah *MaxArrayHeap) Add (e int ) { mah.data = append (mah.data, e) mah.siftUp(len (mah.data)-1 ) } func (mah *MaxArrayHeap) ExtractMax () int { ret := mah.FindMax() mah.swap(0 , len (mah.data)-1 ) mah.data = mah.data[:len (mah.data)-1 ] mah.siftDown(0 ) return ret } func (mah *MaxArrayHeap) FindMax () int { if mah.GetSize() == 0 { panic ("Can't findMax when heap" ) } return mah.data[0 ] } func (mah *MaxArrayHeap) Replace (e int ) int { ret := mah.FindMax() mah.data[0 ] = e mah.siftDown(0 ) return ret } func (mah *MaxArrayHeap) parent (index int ) int { if index == 0 { panic ("index-0 doesn't have parent." ) } return (index-1 )/2 } func (mah *MaxArrayHeap) leftChild (index int ) int { return index*2 +1 } func (mah *MaxArrayHeap) rightChild (index int ) int { return index*2 +2 } func (mah *MaxArrayHeap) siftUp (index int ) { for index > 0 && mah.data[mah.parent(index)] < mah.data[index] { mah.swap(index, mah.parent(index)) index = mah.parent(index) } } func (mah *MaxArrayHeap) siftDown (index int ) { for mah.leftChild(index) < len (mah.data) { max := mah.leftChild(index) right := mah.rightChild(index) if right < len (mah.data) { if mah.data[right] > mah.data[max] { max = right } } if mah.data[index] >= mah.data[max] { break } mah.swap(index, max) index = max } } func (mah *MaxArrayHeap) swap (i, j int ) { if i < 0 || i >= len (mah.data) || j < 0 || j >= len (mah.data) { panic ("index is illegal." ) } t := mah.data[i] mah.data[i] = mah.data[j] mah.data[j] = t } func NewMaxArrayHeap (capacity int ) *MaxArrayHeap { return &MaxArrayHeap{ data: make ([]int , capacity), } } func NewMaxArrayHeapFromArr (arr []int ) *MaxArrayHeap { mah := &MaxArrayHeap{ data: arr } for i := mah.parent(len (mah.data)-1 ); i >= 0 ; i-- { mah.siftDown(i) } return mah }