Package singleflight provides a duplicate function call suppression mechanism.
singleflight包提供了一种抑制重复函数调用的机制
在处理多个goroutine同时调用同一函数时,SingleFlight可以只让一个goroutine去实际调用该函数,等到这个goroutine返回结果时,再将结果返回给其他几个同时调用该函数的goroutine.
这样可以减少并发调用的数量,减少对下游服务的并发重复请求,比较常见的使用场景是用来防止缓存击穿
达到归并回源的效果
缓存击穿
如在双11时,维护有一个全局的活动是否结束的key,由运营配置,5分钟过期,重新从数据库里取.
当这个 Key 正好过期失效时, 大量请求会打到数据库上(即缓存击穿).
而用 SingleFlight 来解决缓存击穿问题再合适不过. 只需要只允许这些对同一个 Key 的并发请求中的一个能到数据库中查询,而后这些并发的请求可以共享该结果.
package main import ( "errors" "fmt" "golang.org/x/sync/singleflight" "log" "sync" ) var errorNotExist = errors.New("not exist") func main() { var wg sync.WaitGroup wg.Add(10) //模拟10个并发 for i := 0; i < 10; i++ { go func() { defer wg.Done() data, err := getData("key") if err != nil { fmt.Print(err) return } fmt.Println(data) fmt.Println("---------") }() } wg.Wait() } var g singleflight.Group //获取数据 func getData(key string) (string, error) { data, err := getDataFromCache(key) if err == errorNotExist { //模拟从db中获取数据 data, err = getDataFromDB(key) if err != nil { log.Println(err) return "", err } //TOOD: set cache } else if err != nil { return "", err } return data, nil } //模拟从cache中获取值,cache中无该值 func getDataFromCache(key string) (string, error) { return "", errorNotExist } //模拟从数据库中获取值 func getDataFromDB(key string) (string, error) { fmt.Printf("get %s from database\n", key) return "数据库中的数据", nil }
执行结果为:
get key from database 数据库中的数据 get key from database get key from database 数据库中的数据 --------- get key from database get key from database 数据库中的数据 --------- --------- 数据库中的数据 get key from database 数据库中的数据 --------- get key from database get key from database 数据库中的数据 --------- --------- 数据库中的数据 --------- get key from database 数据库中的数据 --------- 数据库中的数据 --------- get key from database 数据库中的数据 ---------
可以看得到10个请求都走了db. 用singlefligth 包优化一下 getData:
//获取数据 func getData(key string) (string, error) { data, err := getDataFromCache(key) if err == errorNotExist { //模拟从db中获取数据 v, err, _ := g.Do(key, func() (interface{}, error) { return getDataFromDB(key) //set cache }) if err != nil { log.Println(err) return "", err } //TOOD: set cache data = v.(string) } else if err != nil { return "", err } return data, nil }
执行结果为:
get key from database 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 --------- 数据库中的数据 ---------
可以看得到只有一个请求走到了db,且其他请求也返回了正确的值. 从而可以大大降低DB的压力
源码行数不多,加上注释一共212行.
点击查看 golang.org/x/sync/semaphore/semaphore.go源码:
go
复制代码
// Copyright 2013 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package singleflight provides a duplicate function call suppression // mechanism. package singleflight // import "golang.org/x/sync/singleflight" import ( "bytes" "errors" "fmt" "runtime" "runtime/debug" "sync" ) // errGoexit indicates the runtime.Goexit was called in // the user given function. var errGoexit = errors.New("runtime.Goexit was called") // A panicError is an arbitrary value recovered from a panic // with the stack trace during the execution of given function. type panicError struct { value interface{} stack []byte } // Error implements error interface. func (p *panicError) Error() string { return fmt.Sprintf("%v\n\n%s", p.value, p.stack) } func newPanicError(v interface{}) error { stack := debug.Stack() // The first line of the stack trace is of the form "goroutine N [status]:" // but by the time the panic reaches Do the goroutine may no longer exist // and its status will have changed. Trim out the misleading line. if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { stack = stack[line+1:] } return &panicError{value: v, stack: stack} } // call is an in-flight or completed singleflight.Do call type call struct { wg sync.WaitGroup // These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. // 函数的返回值,在 wg 返回前只会写入一次 val interface{} err error // forgotten indicates whether Forget was called with this call's key // while the call was still in flight. // 使用调用了 Forgot 方法 forgotten bool // These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. // 统计调用次数以及返回的 channel dups int chans []chan 0 } // DoChan is like Do but returns a channel that will receive the // results when they are ready. // // The returned channel will not be closed. // Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上: // 如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据 func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan 0 { go panic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. } else { panic(e) } } else if c.err == errGoexit { // Already in the process of goexit, no need to call again // 已经准备退出了,也就不用做其他操作了 } else { // Normal return // 正常情况下向 channel 写入数据 for _, ch := range c.chans { ch 0} } } }() // 使用一个匿名函数来执行 func() { defer func() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // whether this is a panic or a runtime.Goexit. // // Unfortunately, the only way we can distinguish the two is to see // whether the recover stopped the goroutine from terminating, and by // the time we know that, the part of the stack trace relevant to the // panic has been discarded. // 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误 // 后面在上层重新 panic if r := recover(); r != nil { c.err = newPanicError(r) } } }() // 如果 fn 没有 panic 就会执行到这一步,如果 panic 了就不会执行到这一步 // 所以可以通过这个变量来判断是否 panic 了 c.val, c.err = fn() normalReturn = true }() if !normalReturn { recovered = true } } // Forget tells the singleflight to forget about a key. Future calls // to Do for this key will call the function rather than waiting for // an earlier call to complete. // 用于手动释放某个 key 下次调用就不会阻塞等待了 func (g *Group) Forget(key string) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete(g.m, key) g.mu.Unlock() }
接收一个字符串Key和一个待调用的函数,会返回调用函数的结果和错误. 使用Do方法时,会根据提供的Key判断是否去真正调用fn函数.同一个 key,在同一时间只有第一次调用Do方法时才会去执行fn函数,其他并发的请求会等待调用的执行结果.
Do方法的执行逻辑是每次调用Do方法都会先去获取互斥锁,随后判断在映射表里是否已经有Key对应的fn函数调用信息的call结构体。
当不存在时,证明是这个Key的第一次请求,那么会初始化一个call结构体指针,增加SingleFlight内部持有的sync.WaitGroup计数器到1。释放互斥锁,然后阻塞的等待doCall方法执行fn函数的返回结果 当存在时,增加call结构体内代表fn重复调用次数的计数器dups,释放互斥锁,然后使用WaitGroup等待fn函数执行完成。
call结构体的val 和 err 两个字段只会在 doCall方法中执行fn有返回结果后才赋值,所以当 doCall方法 和 WaitGroup.Wait返回时,函数调用的结果和错误会返回给Do方法的所有调用者。
doCall方法会去实际调用fn函数,因为call结构体初始化后forgotten字段的默认值是false,fn调用有返回后,会把对应的Key删掉。这样这轮请求都返回后,下一轮使用同一的Key的请求会重新调用执行一次fn函数。
类似Do方法,只不过是异步调用.它会返回一个通道,等fn函数执行完,产生了结果后,就能从这个 chan 中接收这个结果.
它的执行逻辑和Do方法类似,唯一不同的是调用者不用阻塞等待调用的返回, DoChan方法会创建一个chan Result通道返回给调用者,调用者通过这个通道就能接受到fn函数的结果。这个chan Result通道,在返回给调用者前会先放到call结构体的维护的通知队列里,待fn函数返回结果后DoChan方法会把结果发送给通知队列中的每个通道。
在SingleFlight中删除一个Key. 这样一来,之后这个Key的Do方法调用会执行fn函数,而不是等待前一个未完成的fn 函数的结果.
即 "一荣俱荣,一损俱损"
Go并发编程(十二) Singleflight
Go Singleflight导致死锁问题分析
项目中有大量使用,场景基本都是用于防止缓存击穿.
另外,
net标准库里使用的lookupGroup结构,将对相同域名的DNS记录查询合并成一个查询.
net库提供的DNS记录查询方法LookupIp, 使用lookupGroup这个SingleFlight进行合并查询的相关操作(使用的是异步查询的方法DoChan)
net库的 h2_hundle.go ,以及[golang.org/x/net/http2/client_conn_pool.go],都试图用SingleFlight来优化现有代码
Docker之前的某个版本,/docker/builder/fscache/fscache.go中有使用到SingleFlight
Copyright © 2023 leiyu.cn. All Rights Reserved. 磊宇云计算 版权所有 许可证编号:B1-20233142/B2-20230630 山东磊宇云计算有限公司 鲁ICP备2020045424号
磊宇云计算致力于以最 “绿色节能” 的方式,让每一位上云的客户成为全球绿色节能和降低碳排放的贡献者