golang实现一致性hash环及优化方法

Posted by jintang on 2018-08-20

一致性Hash原理

一致性哈希算法在1997年由麻省理工学院的Karger等人在解决分布式Cache中提出的,设计目标是为了解决因特网中的热点(Hot spot)问题,初衷和CARP十分类似。一致性哈希修正了CARP使用的简单哈希算法带来的问题,使得DHT可以在P2P环境中真正得到应用。

但现在一致性hash算法在分布式系统中也得到了广泛应用,研究过memcached缓存数据库的人都知道,memcached服务器端本身不提供分布式cache的一致性,而是由客户端来提供,具体在计算一致性hash时采用如下步骤:

  1. 首先求出memcached服务器(节点)的哈希值,并将其配置到0~232的圆(continuum)上。
  2. 然后采用同样的方法求出存储数据的键的哈希值,并映射到相同的圆上。
  3. 然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过232仍然找不到服务器,就会保存到第一台memcached服务器上。

从上图的状态中添加一台memcached服务器。余数分布式算法由于保存键的服务器会发生巨大变化而影响缓存的命中率,但Consistent Hashing中,只有在园(continuum)上增加服务器的地点逆时针方向的第一台服务器上的键会受到影响,如下图所示:

值得注意的是,一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题。

优化方法

通过增加虚拟节点来解决数据倾斜问题

一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体做法可以在服务器ip或主机名的后面增加编号来实现。例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “Node A#1”、“Node A#2”、“Node A#3”、“Node B#1”、“Node B#2”、“Node B#3”的哈希值,于是形成六个虚拟节点:

当我们添加的虚拟节点越多,虚拟节点分布就越平均,数据倾斜的程度就越小。我们一般建议虚拟节点大概在200个左右。但是问题来了,如果大量的虚拟节点,节点的查找性能就成为必须考虑的因数。

使用红黑树来加快查找速度

红黑树已经广泛应用,其原理就不再叙述了。

实现一致性哈希

go语言版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
package hashring

// 提供一个一致性哈希环功能
// 加入了两个优化点
// 1. 增加虚拟节点,用于删除节点后,可以把负载相对均匀地分布分布给其他节点
// 2. 增加红黑树结构,优化查询节点的效率
//
// @author jintang

import (
"fmt"
"strconv"
"strings"
"sync"
"hash/crc32"
"io"

"github.com/HuKeping/rbtree"
)

// Node 节点结构
type Node struct {
ID string
Host string
Data *sync.Map
}

// nodeKey 生成node key
func (n *Node) nodeKey() string {
return n.ID + "#" + n.Host
}

// Less 实现rbtree接口方法Less
// 实现改接口,可以使Node可以存入rbtree节点
func (n *Node) Less(than rbtree.Item) bool {
return CalcCRC32([]byte(n.ID)) < CalcCRC32([]byte(than.(*Node).ID))
}

// VNode 虚拟节点
type VNode struct {
Index uint32 // 索引值, 通过计算hashKey转crc32得来
NodeID string // Node节点ID
}

// Less 实现rbtree接口方法Less
// 实现改接口,可以使Node可以存入rbtree节点
func (vn *VNode) Less(than rbtree.Item) bool {
return vn.Index < than.(*VNode).Index
}

// HashRing 哈希环
type HashRing struct {
VRing *rbtree.Rbtree
NRing *rbtree.Rbtree
Nodes *sync.Map // map(NodeID => *Node)
NodeStatus map[string]bool // map(NodeID => status)
NodeCount uint32
NodeOkCount uint32
NumberOfVirtual uint32
sync.Mutex
}

// virtualKey 生成虚拟节点的key
func (r *HashRing) virtualKey(nodeID string, index uint32) string {
return "node#" + nodeID + "#" + strconv.FormatUint(uint64(index), 10)
}

// hashKey 获取哈希
func (r *HashRing) hashKey(key string) string {
return CalcHash([]byte(key))
}

// hashTOCRC32 hash转换RCR32
func (r *HashRing) hashToCRC32(hashInString string) uint32 {
return CalcCRC32([]byte(hashInString))
}

// CalcIndex 计算key的索引值
func (r *HashRing) CalcIndex(key string) uint32 {
return r.hashToCRC32(r.hashKey(key))
}

// AddNode 添加节点
func (r *HashRing) AddNode(node *Node) {

r.Lock()

defer r.Unlock()

var numberOfNode uint32 = 1
if r.NumberOfVirtual > 0 {
numberOfNode = r.NumberOfVirtual
}

var i uint32
for i = 0; i < numberOfNode; i++ {
index := r.hashToCRC32(r.hashKey(r.virtualKey(node.ID, i)))
r.VRing.Insert(&VNode{Index: index, NodeID: node.ID})
}

r.Nodes.Store(node.ID, node)
r.NodeStatus[node.ID] = false

r.NRing.Insert(node)

r.NodeCount++
}

// RemoveNode 删除节点
func (r *HashRing) RemoveNode(nodeID string) bool {
r.Lock()
defer r.Unlock()

if val, ok := r.Nodes.Load(nodeID); ok {

node := val.(*Node)

var numberOfNode uint32 = 1
if r.NumberOfVirtual > 0 {
numberOfNode = r.NumberOfVirtual
}

var i uint32
for i = 0; i < numberOfNode; i++ {
index := r.hashToCRC32(r.hashKey(r.virtualKey(node.ID, i)))
r.VRing.Delete(&VNode{Index: index, NodeID: node.ID})
}

r.Nodes.Delete(node.ID)
delete(r.NodeStatus, node.ID)

r.NRing.Delete(node)

r.NodeCount--
}

return true
}

func (r *HashRing) Node(ID string) *Node {
if node, ok := r.Nodes.Load(ID); ok {
return node.(*Node)
}
return nil
}

func (r *HashRing) IsOnline(ID string) bool {
ok, online := r.NodeStatus[ID]
return ok && online
}

func (r *HashRing) SetOffline(ID string) {
r.Lock()
defer r.Unlock()

if b, ok := r.NodeStatus[ID]; ok && b {
r.NodeStatus[ID] = false
r.NodeOkCount--
}
}

func (r *HashRing) SetOnline(ID string) {
r.Lock()
defer r.Unlock()

if b, ok := r.NodeStatus[ID]; ok && !b {
r.NodeOkCount++
r.NodeStatus[ID] = true
}
}

func (r *HashRing) RandomGetNodes(num int) []*Node {

if r.NodeOkCount <= 0 {
return nil
}

if r.NodeOkCount < uint32(num) {
num = int(r.NodeOkCount)
}

nodes := make([]*Node, num)

ids := make([]string, 0)
for id, ok := range r.NodeStatus {
if ok {
ids = append(ids, id)
}
}

indexes := utils.GenerateRandomNumber(0, len(ids), num)

for i, idx := range indexes {
if node, ok := r.Nodes.Load(ids[idx]); ok {
nodes[i] = node.(*Node)
}
}

return nodes
}

// GetNode 定位节点
// @params key 直接通过key值获取
func (r *HashRing) GetNode(key string) (uint32, string) {

keyIndex := r.hashToCRC32(r.hashKey(key))

return r.GetNodeByIndex(keyIndex)
}

// GetNodeMissNodeID 获取节点,排除不要的的节点
// @params key 直接通过key获取,NodeID需要排除的节点ID
func (r *HashRing) GetNodeMissNodeIDs(key string, NodeIDs []string) (uint32, string) {

if len(NodeIDs) <= 0 {
return r.GetNode(key)
}

if uint32(len(NodeIDs)) >= r.NodeCount || r.NodeCount <= 0 {
return 0, ""
}

for _, id := range NodeIDs {
r.SetOffline(id)
}

index, id := r.GetNode(key)

for _, id := range NodeIDs {
r.SetOffline(id)
}

return index, id
}

// GetNodeUpDownNodes 获取节点上下游节点
// @params NodeID节点ID
func (r *HashRing) GetNodeUpDownNodes(NodeID string) (string, string) {

if NodeID == "" || !r.NodeStatus[NodeID] || r.NodeCount <= 0 {
return "", ""
}

if r.NRing.Len() > 1 {

up := r.NRing.Max().(*Node).ID
down := r.NRing.Min().(*Node).ID

r.NRing.Descend(&Node{ID: NodeID}, func(item rbtree.Item) bool {
if CalcCRC32([]byte(NodeID)) == CalcCRC32([]byte(item.(*Node).ID)) {
return true
}
up = item.(*Node).ID
return false
})

r.NRing.Ascend(&Node{ID: NodeID}, func(item rbtree.Item) bool {
if CalcCRC32([]byte(NodeID)) == CalcCRC32([]byte(item.(*Node).ID)) {
return true
}
down = item.(*Node).ID
return false
})

return up, down
}

return "", ""
}

// GetNodeByIndex 定位节点
// @params keyIndex 通过索引值获取
func (r *HashRing) GetNodeByIndex(keyIndex uint32) (uint32, string) {

if r.VRing.Len() > 0 {

minVNodeOfRing := r.VRing.Min().(*VNode)

vNode := minVNodeOfRing

r.VRing.Ascend(&VNode{Index: keyIndex}, func(item rbtree.Item) bool {
vNode = item.(*VNode)
if r.NodeStatus[vNode.NodeID] == false {
return true
}
return false
})

if !r.NodeStatus[vNode.NodeID] {
r.VRing.Ascend(minVNodeOfRing, func(item rbtree.Item) bool {
vNode = item.(*VNode)
if r.NodeStatus[vNode.NodeID] == false {
return true
}
return false
})
}

return vNode.Index, vNode.NodeID
}

return 0, ""
}

// PrintNodes 打印所有真实节点
func (r *HashRing) PrintNodes() {

if r.NodeCount <= 0 {
fmt.Println("nodes is empty")
return
}

r.Nodes.Range(func(key, value interface{}) bool {
node := value.(*Node)
fmt.Println(strings.Repeat("=", 30))
fmt.Println("NodeID:", node.ID)
fmt.Println("NodeHost:", node.Host)
fmt.Println("NodeKey :", node.nodeKey())
fmt.Println()
return true
})
}

// TraversalVRing 遍历虚拟节点二叉树
func (r *HashRing) TraversalVRing() {
r.VRing.Ascend(r.VRing.Min(), func(item rbtree.Item) bool {
fmt.Printf("vNode %d => %s\n", item.(*VNode).Index, item.(*VNode).NodeID)
return true
})
}

// TraversalNRing 遍历真实节点二叉树
func (r *HashRing) TraversalNRing() {
r.NRing.Ascend(r.NRing.Min(), func(item rbtree.Item) bool {
fmt.Printf("Node %d => %s\n", CalcCRC32([]byte(item.(*Node).ID)), item.(*Node).ID)
return true
})
}

// NewHashRing 新建一个hash环
func New(numOfVNode uint32) *HashRing {
r := new(HashRing)
r.Nodes = new(sync.Map)
r.NodeStatus = make(map[string]bool)
r.NodeCount = 0
r.NumberOfVirtual = numOfVNode

r.VRing = rbtree.New()
r.NRing = rbtree.New()
return r
}

// CalcCRC32 计算crc32
func CalcCRC32(data []byte) uint32 {
iEEE := crc32.NewIEEE()
io.WriteString(iEEE, string(data))
return iEEE.Sum32()
}