深入理解go map

哈希函数

哈希查找表一般会存在“碰撞”的问题,就是说不同的 key 被哈希到了同一个 bucket。一般有两种应对方法:链表法开放地址法链表法将一个 bucket 实现成一个链表,落在同一个 bucket 中的 key 都会插入这个链表。开放地址法则是碰撞发生后,通过一定的规律,在数组的后面挑选“空位”,用来放置新的 key。

1
装载因子 := 元素数量 / 桶数量

与开放地址法一样,拉链法的装载因子越大,哈希的读写性能就越差,在一般情况下使用拉链法的哈希表装载因子都不会超过 1,当哈希表的装载因子较大时就会触发哈希的扩容,创建更多的桶来存储哈希中的元素,保证性能不会出现严重的下降。如果有 1000 个桶的哈希表存储了 10000 个键值对,它的性能是保存 1000 个键值对的 1/10,但是仍然比在链表中直接读写好 1000 倍。

Golang 使用的哈希算法

Golang 选择哈希算法时,根据 CPU 是否支持 AES 指令集进行判断 ,如果 CPU 支持 AES 指令集,则使用 Aes Hash,否则使用 memhash。

AES 指令集全称是高级加密标准指令集(或称英特尔高级加密标准新指令,简称AES-NI),是一个 x86指令集架构的扩展,用于 Intel 和 AMD 处理器。 利用 AES 指令集实现哈希算法性能很优秀,因为它能提供硬件加速。 查看 CPU 是否支持 AES 指令集:

1
2
3
cat /proc/cpuinfo | grep aes

flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl eagerfpu pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch fsgsbase bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsaveopt

处理哈希冲突

Golang 及多数编程语言都使用链地址法处理哈希冲突。

链地址法

链地址法链地址法是处理哈希冲突最常见的方法,它的实现比开放地址法稍微复杂一些,但平均查找长度较短,用于存储节点的内存是动态申请的,可以节省较多内存。

要将一个键值对 (Key6, Value6) 写入哈希表,需要经过两个步骤:

键值对中的键 Key6 先经过 Hash 算法计算,返回的哈希值定位到一个桶,选择桶的方式是对哈希值取模:

1
index := hash("Key6") % array.len

遍历当前桶中的链表,在遍历链表的过程中会遇到以下两种情况:

1
2
1. 找到键相同的键值对,则更新键对应的值;
2. 没有找到键相同的键值对,则在链表的末尾追加新键值对。

只有可比较的类型才能够作为Map中的key

数据结构

Go中Map是一个KV对集合。底层使用hash table,用链表来解决冲突,出现冲突时,不是每一个Key都申请一个结构通过链表串起来,而是以bmap为最小粒度挂载,一个bmap可以放8个kv。

在哈希函数的选择上,会在程序启动时,检测 cpu 是否支持 aes,如果支持,则使用aes hash,否则使用memhash

每个map的底层结构是hmap,是有若干个结构为bmap的bucket组成的数组。每个bucket底层都采用链表结构。

Go 语言运行时同时使用了多个数据结构组合表示哈希表,其中使用 hmap 结构体来表示哈希

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type hmap struct {
count int // 元素个数
flags uint8 // 用来标记状态
B uint8 // 扩容常量相关字段B是buckets数组的长度的对数 2^B
noverflow uint16 // noverflow是溢出桶的数量,当B<16时,为精确值,当B>=16时,为估计值
hash0 uint32 // 是哈希的种子,它能为哈希函数的结果引入随机性,这个值在创建哈希表时确定,并在调用哈希函数时作为参数传入

buckets unsafe.Pointer // 桶的地址
oldbuckets unsafe.Pointer // 旧桶的地址,用于扩容
nevacuate uintptr // 搬迁进度,扩容需要将旧数据搬迁至新数据,这里是利用指针来比较判断有没有迁移

extra *mapextra // 用于扩容的指针
}

type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow holds a pointer to a free overflow bucket.
nextOverflow *bmap
}

// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8 // tophash用于记录8个key哈希值的高8位,这样在寻找对应key的时候可以更快,不必每次都对key做全等判断
}

//实际上编辑期间会动态生成一个新的结构体
type bmap struct {
topbits [8]uint8
keys [8]keytype
values [8]valuetype
pad uintptr
overflow uintptr
}

bmap 就是常说的“桶”,桶里面会最多装 8 个 key,这些 key之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果是“一类”的,在桶内,又会根据key计算出来的hash值的高8位来决定 key到底落入桶内的哪个位置(一个桶内最多有8个位置)。

image.png

当map的key和value都不是指针,并且 size都小于128字节的情况下,会把bmap标记为不含指针,这样可以避免gc时扫描整个hmap。

但是,bmap其实有一个overflow的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把overflow移动到 hmap的extra 字段来。

  • 如果初始化时生成了溢出桶,会放置到map的extra字段里去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func makemap(t *maptype, hint int, h *hmap) *hmap {
...
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B

if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}

return h
}
1
2
3
4
5
6
type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow holds a pointer to a free overflow bucket.
nextOverflow *bmap
}
  • 当 make 的 hint <= 8 时,会直接在栈上分配一个 bucket,一个 bucket 可以存储8对 KV
  • 当 make 的 hint > 8 && hint <= 52 时,会在堆上分配 bucket,此时不会分配 overflow bucket
  • 当 make 的 hint > 52 时,会在堆上分配 bucket 和 overflow bucket

bmap 是存放 k-v 的地方
key 和 value 是各自放在一起的,并不是 key/value/key/value/... 这样的形式。源码里说明这样的好处是在某些情况下可以省略掉 padding 字段,节省内存空间。减少内存对齐的内存消耗

如果按照 key/value/key/value/... 这样的模式存储,那在每一个 key/value 对之后都要额外 padding 7 个字节;而将所有的 key,value 分别绑定到一起,这种形式 key/key/.../value/value/...,则只需要在最后添加 padding。

每个 bucket 设计成最多只能放 8 个 key-value 对,如果有第 9 个 key-value 落入当前的 bucket,那就需要再构建一个 bucket ,通过 overflow 指针连接起来。

这样随着哈希表存储的数据逐渐增多,会扩容哈希表或者使用额外的桶存储溢出的数据,不会让单个桶中的数据超过 8 个,不过溢出桶只是临时的解决方案,创建过多的溢出桶最终也会导致哈希的扩容。

插入过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func maplit(n *Node, m *Node, init *Nodes) {
a := nod(OMAKE, nil, nil)
a.Esc = n.Esc
a.List.Set2(typenod(n.Type), nodintconst(int64(n.List.Len())))
litas(m, a, init)

entries := n.List.Slice()
if len(entries) > 25 {
...
return
}

// Build list of var[c] = expr.
// Use temporaries so that mapassign1 can have addressable key, elem.
...
}

当哈希表中的元素数量少于或者等于 25 个时,编译器会将字面量初始化的结构体转换成以下的代码,将所有的键值对一次加入到哈希表中:

1
2
3
4
hash := make(map[string]int, 3)
hash["1"] = 2
hash["3"] = 4
hash["5"] = 6

一旦哈希表中元素的数量超过了 25 个,编译器会创建两个数组分别存储键和值,这些键值对会通过如下所示的 for 循环加入哈希:

1
2
3
4
5
6
hash := make(map[string]int, 26)
vstatk := []string{"1", "2", "3", ... , "26"}
vstatv := []int{1, 2, 3, ... , 26}
for i := 0; i < len(vstak); i++ {
hash[vstatk[i]] = vstatv[i]
}

对 key 计算 hash 值,找到要赋值的位置(可能是插入新 key,也可能是更新老 key),对相应位置进行赋值。

函数首先会检查 map 的标志位 flags。如果 flags 的写标志位此时被置 1 了,说明有其他协程在执行“写”操作,进而导致程序 panic。这也说明了 map 对协程是不安全的。

不过因为 Go 语言哈希的扩容不是一个原子的过程,所以**mapassign**** 还需要判断当前哈希是否已经处于扩容状态,避免二次扩容造成混乱。**
image.png

在 B 不为 0 的情况下,会调用 makeBucketArray 函数初始化桶。

当 B < 4 的时候,初始化 hmap 只会生成 8 个桶,不生成溢出桶,因为数据少几乎不可能用到溢出桶;

当 B >= 4 的时候,会额外创建 2^(B−4) 个溢出桶。

Map有多种初始化的方式,如果指定了长度N,在初始化时会生成桶。桶的数量为log2(N).如果map的长度大于了2^4,则会在初始化的时候生成溢出桶。溢出桶的大小为2^(b-4),b为桶的大小。

遍历过程

map 遍历的核心在于理解 2 倍扩容时,老 bucket 会分裂到 2 个新 bucket 中去。而遍历操作,会按照新 bucket 的序号顺序进行,碰到老 bucket 未搬迁的情况时,要在老 bucket 中找到将来要搬迁到新 bucket 来的 key。

查找key

key 经过 Hash 计算后得到 64 位哈希值(64位机器);

用哈希值最后 B 个 bit 位计算它落在哪个桶;

用哈希值高 8 位计算它在桶中的索引位置。

删除过程

mapdelete 函数。它首先会检查 h.flags 标志,如果发现写标位是 1,直接 panic,因为这表明有其他协程同时在进行写操作。

计算 key 的哈希,找到落入的 bucket。检查此 map 如果正在扩容的过程中,直接触发一次搬迁操作。

将 count 值减 1,将对应位置的 tophash 值置成 Empty

同样需要计算出hash的前8位、指定的桶等。

如果在删除期间遇到了哈希表的扩容,就会分流桶中的元素,分流结束之后会找到桶中的目标元素完成键值对的删除工作。

如果查找到了指定的key,则会清空数据,hash位设置为emptyOne. 如果发现后面没有元素,则会设置为emptyRest,并循环向上检查前一个元素是否为空。

扩容过程

1
2
3
4
5
6
7
// 一个桶里最多可以装载的键值对数量:8对
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits

// 触发扩容操作的装载因子临界值是:13 / 2 = 6.5
loadFactorNum = 13
loadFactorDen = 2
1
loadFactor := count / (2^B)  # count 就是 map 的元素个数,2^B 表示 bucket 数量。

触发 map 扩容的时机:在向 map 插入新 key 的时候,会进行条件检测,符合下面这 2 个条件,就会触发扩容:

  1. 装载因子超过阈值,源码里定义的阈值是 6.5。
    1. 翻倍扩容
  2. 哈希使用了太多溢出桶(造成这种现象的原因是不停地插入、删除元素) , overflow 的 bucket 数量过多:当 B 小于 15,也就是 bucket 总数 2^B 小于 2^15 时,如果 overflow 的 bucket 数量超过 2^B;当 B >= 15,也就是 bucket 总数 2^B 大于等于 2^15,如果 overflow 的 bucket 数量超过 2^15。map的扩容不是一个原子的扩容,所以他虽然具备扩容条件,而先要判断该map是否处在扩容状态。
    1. 等量扩容 sameSizeGrow,等量扩容创建的新桶数量只是和旧桶一样,该函数中只是创建了新的桶,并没有对数据进行拷贝和转移
  • 当溢出桶的数量过多,则会进行等量重建。新桶会会存储到buckets字段,旧桶会存储到oldbuckets字段。
  • map中extra字段的溢出桶也同理的进行了转移。

因为 Go 语言哈希的扩容不是一个原子的过程,所以 [runtime.mapassign](https://draveness.me/golang/tree/runtime.mapassign) 还需要判断当前哈希是否已经处于扩容状态,避免二次扩容造成混乱。

扩容的入口是 [runtime.hashGrow](https://draveness.me/golang/tree/runtime.hashGrow)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0

h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
h.extra.nextOverflow = nextOverflow
}

哈希在扩容的过程中会通过 [runtime.makeBucketArray](https://draveness.me/golang/tree/runtime.makeBucketArray) 创建一组新桶和预创建的溢出桶,随后将原有的桶数组设置到 oldbuckets 上并将新的空桶设置到 buckets 上,溢出桶也使用了相同的逻辑更新

哈希在存储元素过多时会触发扩容操作,每次都会将桶的数量翻倍,扩容过程不是原子的,而是通过 [runtime.growWork](https://draveness.me/golang/tree/runtime.growWork) 增量触发的,在扩容期间访问哈希表时会使用旧桶,向哈希表写入数据时会触发旧桶元素的分流。除了这种正常的扩容之外,为了解决大量写入、删除造成的内存泄漏问题,哈希引入了 sameSizeGrow 这一机制,在出现较多溢出桶时会整理哈希的内存减少空间的占用。

扩容是渐进式的,如果 map 处在扩容的过程中,那么当 key 定位到了某个 bucket 后,需要确保这个 bucket 对应的老 bucket 完成了迁移过程。即老 bucket 里的 key 都要迁移到新的 bucket 中来(分裂到 2 个新 bucket),才能在新的 bucket 中进行插入或者更新的操作。

现在到了定位 key 应该放置的位置了,所谓找准自己的位置很重要。准备两个指针,一个(inserti)指向 key 的 hash 值在 tophash 数组所处的位置,另一个(insertk)指向 cell 的位置(也就是 key 最终放置的地址),当然,对应 value 的位置就很容易定位出来了。这三者实际上都是关联的,在 tophash 数组中的索引位置决定了 key 在整个 bucket 中的位置(共 8 个 key),而 value 的位置需要“跨过” 8 个 key 的长度。

在循环的过程中,inserti 和 insertk 分别指向第一个找到的空闲的 cell。如果之后在 map 没有找到 key 的存在,也就是说原来 map 中没有此 key,这意味着插入新 key。那最终 key 的安置地址就是第一次发现的“空位”(tophash 是 empty)。

如果这个 bucket 的 8 个 key 都已经放置满了,那在跳出循环后,发现 inserti 和 insertk 都是空,这时候需要在 bucket 后面挂上 overflow bucket。当然,也有可能是在 overflow bucket 后面再挂上一个 overflow bucket。这就说明,太多 key hash 到了此 bucket。

在正式安置 key 之前,还要检查 map 的状态,看它是否需要进行扩容。如果满足扩容的条件,就主动触发一次扩容操作。

这之后,整个之前的查找定位 key 的过程,还得再重新走一次。因为扩容之后,key 的分布都发生了变化。
最后,会更新 map 相关的值,如果是插入新 key,map 的元素数量字段 count 值会加 1;在函数之初设置的 hashWriting 写标志出会清零。

再来看一下扩容具体是怎么做的。由于 map 扩容需要将原有的 key/value 重新搬迁到新的内存地址,如果有大量的 key/value 需要搬迁,会非常影响性能。因此 Go map 的扩容采取了一种称为“渐进式”地方式,原有的 key 并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket。

hashGrow() 函数实际上并没有真正地“搬迁”,它只是分配好了新的 buckets,并将老的 buckets 挂到了 oldbuckets 字段上。真正搬迁 buckets 的动作在 growWork() 函数中,而调用 growWork() 函数的动作是在 mapassign 和 mapdelete 函数中。也就是插入或修改、删除 key 的时候,都会尝试进行搬迁 buckets 的工作。先检查 oldbuckets 是否搬迁完毕,具体来说就是检查 oldbuckets 是否为 nil。

hashGrow() 函数所做的工作,再来看具体的搬迁 buckets 是如何进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func hashGrow(t *maptype, h *hmap) {
// B+1 相当于是原来 2 倍的空间
bigger := uint8(1)

// 对应条件 2
if !overLoadFactor(int64(h.count), h.B) {
// 进行等量的内存扩容,所以 B 不变
bigger = 0
h.flags |= sameSizeGrow
}
// 将老 buckets 挂到 buckets 上
oldbuckets := h.buckets
// 申请新的 buckets 空间
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)

flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 提交 grow 的动作
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
// 搬迁进度为 0
h.nevacuate = 0
// overflow buckets 数为 0
h.noverflow = 0

// ……
}

主要是申请到了新的 buckets 空间,把相关的标志位都进行了处理:例如标志 nevacuate 被置为 0, 表示当前搬迁进度为 0。

真正执行搬迁工作的 growWork() 函数。

1
2
3
4
5
6
7
8
9
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 确认搬迁老的 bucket 对应正在使用的 bucket
evacuate(t, h, bucket&h.oldbucketmask())

// 再搬迁一个 bucket,以加快搬迁进程
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}

map为什么是无序的

map 在扩容后,会发生 key 的搬迁,原来落在同一个 bucket 中的 key,搬迁后,有些 key 就要远走高飞了(bucket 序号加上了 2^B)。而遍历的过程,就是按顺序遍历 bucket,同时按顺序遍历 bucket 中的 key。搬迁后,key 的位置发生了重大的变化,有些 key 飞上高枝,有些 key 则原地不动。这样,遍历 map 的结果就不可能按原来的顺序了。

在遍历 map 时,并不是固定地从 0 号 bucket 开始遍历,每次都是从一个随机值序号的 bucket 开始遍历,并且是从这个 bucket 的一个随机序号的 cell 开始遍历。这样,即使你是一个写死的 map,仅仅只是遍历它,也不太可能会返回一个固定序列的 key/value 对了。

“迭代 map 的结果是无序的”这个特性是从 go 1.0 开始加入的。

1
2
3
4
5
6
7
8
9
10
11
...
// decide where to start
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
it.startBucket = r & bucketMask(h.B)
it.offset = uint8(r >> h.B & (bucketCnt - 1))

// iterator state
it.bucket = it.startBucket

可以边遍历边删除吗

map 并不是一个线程安全的数据结构。同时读写一个 map 是未定义的行为,如果被检测到,会直接 panic。

上面说的是发生在多个协程同时读写同一个 map 的情况下。 如果在同一个协程内边遍历边删除,并不会检测到同时读写,理论上是可以这样做的。但是,遍历的结果就可能不会是相同的了,有可能结果遍历结果集中包含了删除的 key,也有可能不包含,这取决于删除 key 的时间:是在遍历到 key 所在的 bucket 时刻前或者后。

一般而言,这可以通过读写锁来解决:sync.RWMutex

读之前调用 RLock() 函数,读完之后调用 RUnlock() 函数解锁;写之前调用 Lock() 函数,写完之后,调用 Unlock() 解锁。

另外,sync.Map 是线程安全的 map,也可以使用。

可以对map元素取地址么

不可以,因为一旦发生扩容,key 和 value 的位置就会改变,之前保存的地址也就失效了。

map是线程安全的吗

在查找、赋值、遍历、删除的过程中都会检测写标志,一旦发现写标志置位(等于1),则直接 panic。赋值和删除函数在检测完写标志是复位之后,先将写标志位置位,才会进行之后的操作。

删除掉map中的元素是否会释放内存?

不会,删除操作仅仅将对应的tophash[i]设置为empty,并非释放内存。若要释放内存只能等待指针无引用后被系统gc

参考

面向信仰编程
深入Go的Map使用和实现原理
Go中的map的实现
map的实现原理

分享到 评论