golang服务开发平滑升级之优雅重启

摘要:
转载不错的文档经典平滑升级方案服务器开发运维中,平滑升级是一个老生常谈的话题。当服务升级时,修改status文件内容为off,等待lvs健康检查确认服务为异常状态时主动切断流量,此时进行服务器的升级操作,服务重启完毕后,将status文件内容修改回ok,等待lvs健康检查确认服务正常后导入流量,以此步骤逐步完成剩余的机器的发布操作。优雅重启golang语言http服务的优雅重启开源库也有一些,我们选择Facebook开源的库进行研究。goroutine执行signalHandler,等待SIGTERM和SIGUSR2信号。

转载不错的文档

经典平滑升级方案

服务器开发运维中,平滑升级是一个老生常谈的话题。拿一个http server来说,最常见的方案就是在http server前面加挂一个lvs负载,通过健康检查接口决定负载的导入与摘除。具体来说就是http server 提供一个/status 接口,服务器返回一个status文件,内容为ok,lvs负载定时访问这个接口,判断服务健康状况决定导入流量和切断流量。一般都会定一些策略,比如:访问间隔5秒,健康阈值2,异常阈值2之类的。意思就是每隔5秒访问一次/status接口,2次成功后,确认服务正常,开始导入流量,2次失败确认服务异常切断流量。当服务升级时,修改status文件内容为off,等待lvs健康检查确认服务为异常状态时主动切断流量,此时进行服务器的升级操作,服务重启完毕后,将status文件内容修改回ok,等待lvs健康检查确认服务正常后导入流量,以此步骤逐步完成剩余的机器的发布操作。将以上步骤完善成脚本,拆分为pre(预升级,ok修改为off)、post(发布代码,重启服务)、check(服务检查)、online(上线,off修改为ok)几个动作,与代码发布平台结合基本就实现了一般服务的自动化发版管理。360内部的代码发布平台Furion就是基于此原理工作的。

经典平滑升级方案的问题

一般的web服务使用上述平滑升级方案,基本上已经够用了。那这个方案还有什么问题吗?吹毛求疵的讲,还是有的。

  • 发布过程中,正在发布的机器被摘除,其他机器承压增大。

  • 发布过程仍然花费一些时间,按照上述策略指定的参数,发布一次至少需要20秒,当然我们可以调整参数,但是要面临浪费资源或者网络抖动误判导致切断流量的问题。

  • 切断流量瞬间会导致未完成请求返回不完整。

这些问题一般来说都不算大问题,服务器资源做好冗余就够了,但是当服务器数量很大,服务器QPS很高的情况,小问题也会变大问题。所有寻求完美无缝重启的方案就是解决问题的关键了。

优雅重启

golang语言http服务的优雅重启开源库也有一些,我们选择Facebook开源的库进行研究。代码地址https://github.com/facebookarchive/grace.git。网上的开源库的实现或简单或复杂,其实原理都差不多,执行优雅重启的过程基本如下:

  1. 发布新的bin文件去覆盖老的bin文件

  2. 发送一个信号量,告诉正在运行的进程,进行重启

  3. 正在运行的进程收到信号后,会以子进程的方式启动新的bin文件

  4. 新进程接受新请求,并处理

  5. 老进程不再接受请求,但是要等正在处理的请求处理完成,所有在处理的请求处理完之后,便自动退出 其实我总结了一下,就两个关键点,一个是子进程继承端口监听启动,接受新请求处理;另一个是父进程优雅关闭。通过以上两个步骤基本上就实现了服务的无缝重启,发布过程中流量无损,发布消耗时间理论上最大也就是一个请求的超时时间,回滚服务也很简单,将旧版本服务重发一次就好了。

源码分析

1

使用方法

示例使用了流行的http库 gin,我们一般用法如下

  1. func main(){

  2. engine:=gin.New()

  3. engine.Use(httpserver.NewAccessLogger(),gin.Recovery())

  4. controller.Regist(engine)

  5. srv:=&http.Server{

  6. Addr:":80",

  7. Handler:engine,

  8. ReadTimeout:30*time.Second,

  9. WriteTimeout:30*time.Second,

  10. }

  11. monitor.Init()

  12. srvMonitor:=&http.Server{

  13. Addr:":9900",

  14. Handler:nil,

  15. ReadTimeout:30*time.Second,

  16. WriteTimeout:30*time.Second,

  17. }

  18. grace.Serve(srv,srvMonitor)

  19. }

grace.Serve函数参数是一个切片,可以处理多个server的端口监听继承与优雅关闭。此外还提供了关闭前的hook,使用方法如下:

  1. gracehttp.ServeWithOptions([]*http.Server{srv,srvMonitor},gracehttp.PreStartProcess(func()error{

  2. logger.Info("do PreStartProcess ")

  3. returnnil

  4. }))

在调研中我发现项目上有错误的使用方法,如下:

  1. func startHttp(){

  2. engine:=gin.New()

  3. engine.Use(httpserver.NewAccessLogger(),gin.Recovery())

  4. controller.Regist(engine)

  5. srv:=&http.Server{

  6. Addr:":80",

  7. Handler:engine,

  8. ReadTimeout:30*time.Second,

  9. WriteTimeout:30*time.Second,

  10. }

  11. monitor.Init()

  12. srvMonitor:=&http.Server{

  13. Addr:":9900",

  14. Handler:nil,

  15. ReadTimeout:30*time.Second,

  16. WriteTimeout:30*time.Second,

  17. }

  18. grace.Serve(srv,srvMonitor)

  19. }

  20. func main(){

  21. go startHttp()

  22. //注册信号

  23. go signalHandler()

  24. <-quiet

  25. logger.Info("Close Server")

  26. }

  27. func signalHandler(){

  28. c:=make(chan os.Signal)

  29. signal.Notify(c,syscall.SIGHUP,syscall.SIGINT,syscall.SIGTERM,syscall.SIGKILL,syscall.SIGQUIT)

  30. s:=<-c

  31. logger.Info("get siginal siginal=%v",s)

  32. quiet<-1

  33. }

这里为什么出错了呢,是因为他将grace.Serve(srv,srvMonitor) 放在goroutine里面了,并且自己又监听了一遍信号,这样会导致旧进程优雅关闭前,父进程已经已经退出了,优雅关闭就失效了。

2

关键代码

我们按照程序启动的顺序逻辑来讲,大体如下:

  1. 执行启动端口监听,挂载server,判断当前进程如果是子进程就向父进程发送SIGTERM信号。

  2. goroutine 执行wg.Add 和wg.Wait() ,等待所有挂载的server停止工作后执行退出进程。

  3. goroutine 执行 signalHandler,等待SIGTERM和SIGUSR2信号。收到SIGTERM信号执行每个server的优雅关闭,关闭完后执行wg.Done(),wg全部Done之后在2中执行了退出进程操作;收到SIGUSR2信号时,执行启动子进程操作。

  4. 子进程启动执行1,会向父进程发送SIGTERM信号,父进程收到SIGTERM信号执行3,进行优雅关闭操作。

总结起来就是执行启动重启时,执行shell命令:

  1. pgrep(你的项目名)|xargs kill-SIGUSR2

  2. #(注意:要使用bash)。

你的项目会启动子进程,并继承父进程监听的端口,启动成功后再向父进程发送SIGTERM信号, 旧进程执行优雅关闭。我们看关键的struct

  1. // gracehttp/http.go

  2. type appstruct{

  3. servers[]*http.Server

  4. http*httpdown.HTTP

  5. net*gracenet.Net

  6. listeners[]net.Listener

  7. sds[]httpdown.Server

  8. preStartProcess func()error

  9. errors chan error

  10. }

  11. // httpdown/httpdown.go

  12. type HTTPstruct{

  13. // StopTimeout is the duration before we begin force closing connections.

  14. // Defaults to 1 minute.

  15. StopTimeouttime.Duration

  16. // KillTimeout is the duration before which we completely give up and abort

  17. // even though we still have connected clients. This is useful when a large

  18. // number of client connections exist and closing them can take a long time.

  19. // Note, this is in addition to the StopTimeout. Defaults to 1 minute.

  20. KillTimeouttime.Duration

  21. // Stats is optional. If provided, it will be used to record various metrics.

  22. Statsstats.Client

  23. // Clock allows for testing timing related functionality. Do not specify this

  24. // in production code.

  25. Clockclock.Clock

  26. }

  27. // gracenet/net.go

  28. typeNetstruct{

  29. inherited[]net.Listener

  30. active[]net.Listener

  31. mutex sync.Mutex

  32. inheritOnce sync.Once

  33. // used in tests to override the default behavior of starting from fd 3.

  34. fdStartint

  35. }

我们知道函数调用是从grace.Serve(srv, srvMonitor)开始的,Serve函数会new一个app,一路执行下去关键函数如下:a.run()、a.listen()、a.serve()、 a.wait()、a.signalHandler()、 a.term()、a.net.StartProcess()。

a.run() 大体逻辑如下:

  1. var(

  2. didInherit=os.Getenv("LISTEN_FDS")!=""

  3. ppid=os.Getppid()

  4. )

  5. func(a*app)run()error{

  6. a.listen()

  7. a.serve()

  8. ifdidInherit&&ppid!=1{

  9. syscall.Kill(ppid,syscall.SIGTERM)

  10. }

  11. waitdone:=make(chanstruct{})

  12. go func(){

  13. defer close(waitdone)

  14. a.wait()

  15. }()

  16. select{

  17. caseerr:=<-a.errors:

  18. ...

  19. case<-waitdone:

  20. logger.Printf("Exiting pid %d.",os.Getpid())

  21. returnnil

  22. }

  23. }

启动监听、挂载server,通过环境变量LISTEN_FDS判断当前进程是否为子进程,如果是就发送信号杀父进程。goroutine中执行wait()函数等待优雅关闭或者平滑启动子进程。

a.listen() 关键逻辑如下:

  1. func(a*app)listen()error{

  2. for_,s:=range a.servers{

  3. l,err:=a.net.Listen("tcp",s.Addr)

  4. ......

  5. a.listeners=append(a.listeners,l)

  6. }

  7. returnnil

  8. }

这里看出app struct 中listeners用来存储监听的net.Listener的数组 ,net就是Net,封装了net.ListenTCP等逻辑(这里我只关注了TCP逻辑),inherited 和 active 两个数组分别用来存储继承自父进程的net.Listener 和 启动的net.Listener,这块父进程启动,即首次启动时逻辑很简单,略过,子进程启动,即非首次启动在介绍a.net.StartProccess时细讲。

a.serve() 关键逻辑如下:

  1. func(a*app)serve(){

  2. fori,s:=range a.servers{

  3. a.sds=append(a.sds,a.http.Serve(s,a.listeners[i]))

  4. }

  5. }

这里涉及了app struct里面的两个字段,http和sds。http即 HTTP struct, 这里面封装了http server优雅关闭相关的逻辑,具体的细节很繁琐,我用一个简单的模型来说明一下吧。a.http.Serve(srv,l) 函数封装执行了srv.Serve(l),即挂载srv, 并返回了一个httpdown.server的实例, 这个实例实现了httpdown.Server 接口,如下:

  1. // httpdown/httpdown.go

  2. typeServerinterface{

  3. // Wait waits for the serving loop to finish. This will happen when Stop is

  4. // called, at which point it returns no error, or if there is an error in the

  5. // serving loop. You must call Wait after calling Serve or ListenAndServe.

  6. Wait()error

  7. // Stop stops the listener. It will block until all connections have been

  8. // closed.

  9. Stop()error

  10. }

精简后实现的模型如下:

  1. func(s*server)serve(){

  2. // 即前面提到的 srv.Serve(l),被封装的挂载srv的代码

  3. s.serveErr<-s.server.Serve(s.listener)

  4. close(s.serveDone)

  5. close(s.serveErr)

  6. }

  7. func(s*server)Wait()error{

  8. iferr:=<-s.serveErr;!isUseOfClosedError(err){

  9. returnerr

  10. }

  11. returnnil

  12. }

  13. func(s*server)Stop()error{

  14. s.stopOnce.Do(func(){

  15. closeErr:=s.listener.Close()

  16. <-s.serveDone

  17. ......

  18. // 等待连接关闭或者超时后强杀连接等复杂逻辑

  19. ......

  20. ifcloseErr!=nil&&!isUseOfClosedError(closeErr){

  21. s.stopErr=closeErr

  22. }

  23. })

  24. returns.stopErr

  25. }

s.serveErr <- s.server.Serve(s.listener) 启动成功后会在这里挂住,失败直接返回错误,Wait() 函数提供给a.wait()调用,正常情况也是挂住,等Stop() 里面 closeErr := s.listener.Close() 执行后返回。这块的逻辑要结合 a.wait()、 a.signalHandler()、 a.term() 一起来分析

a.wait() 和 a.term() 的代码

  1. func(a*app)wait(){

  2. varwg sync.WaitGroup

  3. wg.Add(len(a.sds)*2)// Wait & Stop

  4. go a.signalHandler(&wg)

  5. for_,s:=range a.sds{

  6. go func(s httpdown.Server){

  7. defer wg.Done()

  8. iferr:=s.Wait();err!=nil{

  9. a.errors<-err

  10. }

  11. }(s)

  12. }

  13. wg.Wait()

  14. }

  15. func(a*app)term(wg*sync.WaitGroup){

  16. for_,s:=range a.sds{

  17. go func(s httpdown.Server){

  18. defer wg.Done()

  19. iferr:=s.Stop();err!=nil{

  20. a.errors<-err

  21. }

  22. }(s)

  23. }

  24. }

a.run() 函数里面会goroutine 执行 a.wait(),它会goroutine执行信号处理 a.signalHandler() 函数,创建一个WaitGroup 等待所有的httpdown.server执行s.Wait()函数返回。a.signalHandler() 函数基本上逻辑就是监听signal.Notify信号,收到SIGTERM信号执行a.term() ,收到SIGUSR2信号执行a.net.StartProcess()。a.term() 函数就是遍历执行所有httpdown.server的s.Stop(),进行优雅关闭,结合上面的代码来看,每一个s.Stop() 会导致s.Wait() 返回,即执行了两次wg.Done(), 所有httpdown.server 优雅关闭后导致a.wait()返回,进而waitdone关闭, 进程最后退出。下面是a.signalHandler()函数的代码

  1. func(a*app)signalHandler(wg*sync.WaitGroup){

  2. ch:=make(chan os.Signal,10)

  3. signal.Notify(ch,syscall.SIGINT,syscall.SIGTERM,syscall.SIGUSR2)

  4. for{

  5. sig:=<-ch

  6. switchsig{

  7. casesyscall.SIGINT,syscall.SIGTERM:

  8. // this ensures a subsequent INT/TERM will trigger standard go behaviour of

  9. // terminating.

  10. signal.Stop(ch)

  11. a.term(wg)

  12. return

  13. casesyscall.SIGUSR2:

  14. err:=a.preStartProcess()

  15. iferr!=nil{

  16. a.errors<-err

  17. }

  18. // we only return here if there's an error, otherwise the new process

  19. // will send us a TERM when it's ready to trigger the actual shutdown.

  20. if_,err:=a.net.StartProcess();err!=nil{

  21. a.errors<-err

  22. }

  23. }

  24. }

  25. }

a.net.StartProcess() 函数是启动子进程的逻辑,这里需要详细介绍一下

  1. const(

  2. // Used to indicate a graceful restart in the new process.

  3. envCountKey="LISTEN_FDS"

  4. envCountKeyPrefix=envCountKey+"="

  5. )

  6. type filerinterface{

  7. File()(*os.File,error)

  8. }

  9. func(n*Net)StartProcess()(int,error){

  10. listeners,err:=n.activeListeners()

  11. iferr!=nil{

  12. return0,err

  13. }

  14. // Extract the fds from the listeners.

  15. files:=make([]*os.File,len(listeners))

  16. fori,l:=range listeners{

  17. files[i],err=l.(filer).File()

  18. iferr!=nil{

  19. return0,err

  20. }

  21. defer files[i].Close()

  22. }

  23. // Use the original binary location. This works with symlinks such that if

  24. // the file it points to has been changed we will use the updated symlink.

  25. argv0,err:=exec.LookPath(os.Args[0])

  26. iferr!=nil{

  27. return0,err

  28. }

  29. // Pass on the environment and replace the old count key with the new one.

  30. varenv[]string

  31. for_,v:=range os.Environ(){

  32. if!strings.HasPrefix(v,envCountKeyPrefix){

  33. env=append(env,v)

  34. }

  35. }

  36. env=append(env,fmt.Sprintf("%s%d",envCountKeyPrefix,len(listeners)))

  37. allFiles:=append([]*os.File{os.Stdin,os.Stdout,os.Stderr},files...)

  38. process,err:=os.StartProcess(argv0,os.Args,&os.ProcAttr{

  39. Dir:originalWD,

  40. Env:env,

  41. Files:allFiles,

  42. })

  43. iferr!=nil{

  44. return0,err

  45. }

  46. returnprocess.Pid,nil

  47. }

n.activeListeners()返回 n.active中的net.Listener 数组的副本,files是从中提取出的fd列表。注意allFiles在files前面拼接了3个标准输入输出,记住这个数字。env 中修改了环境变量LISTEN_FDS等于listener的数量。这里的启动子进程的方法是os.StartProcess(),我看了其他的开源库都用syscall.ForkExec

  1. fork,err:=syscall.ForkExec(os.Args[0],os.Args,&os.ProcAttr{

  2. Dir:originalWD,

  3. Env:env,

  4. Files:allFiles,

  5. })

两种的区别后续还有待研究。还记得前面没有展开的Net中的inherited 和 active么,这里我们细讲一下。

  1. func(n*Net)Listen(nett,laddrstring)(net.Listener,error){

  2. ......

  3. // 仅关注tcp逻辑

  4. returnn.ListenTCP(nett,addr)

  5. }

  6. func(n*Net)ListenTCP(nettstring,laddr*net.TCPAddr)(*net.TCPListener,error){

  7. iferr:=n.inherit();err!=nil{

  8. returnnil,err

  9. }

  10. n.mutex.Lock()

  11. defer n.mutex.Unlock()

  12. // look for an inherited listener

  13. fori,l:=range n.inherited{

  14. ifl==nil{// we nil used inherited listeners

  15. continue

  16. }

  17. ifisSameAddr(l.Addr(),laddr){

  18. n.inherited[i]=nil

  19. n.active=append(n.active,l)

  20. returnl.(*net.TCPListener),nil

  21. }

  22. }

  23. // make a fresh listener

  24. l,err:=net.ListenTCP(nett,laddr)

  25. iferr!=nil{

  26. returnnil,err

  27. }

  28. n.active=append(n.active,l)

  29. returnl,nil

  30. }

  31. func(n*Net)inherit()error{

  32. varretErr error

  33. n.inheritOnce.Do(func(){

  34. n.mutex.Lock()

  35. defer n.mutex.Unlock()

  36. countStr:=os.Getenv(envCountKey)

  37. ifcountStr==""{

  38. return

  39. }

  40. count,err:=strconv.Atoi(countStr)

  41. // In tests this may be overridden.

  42. fdStart:=n.fdStart

  43. iffdStart==0{

  44. fdStart=3

  45. }

  46. fori:=fdStart;i<fdStart+count;i++{

  47. file:=os.NewFile(uintptr(i),"listener")

  48. l,err:=net.FileListener(file)

  49. iferr!=nil{

  50. file.Close()

  51. retErr=fmt.Errorf("error inheriting socket fd %d: %s",i,err)

  52. return

  53. }

  54. iferr:=file.Close();err!=nil{

  55. retErr=fmt.Errorf("error closing inherited socket fd %d: %s",i,err)

  56. return

  57. }

  58. n.inherited=append(n.inherited,l)

  59. }

  60. })

  61. returnretErr

  62. }

这里ListenTCP 先执行inherit() 将继承来的net.Listener 保存在n.inherited里面,启动时判断是否是继承的listener,没有才 make a fresh listener呢,这里的fdStart 初始值设置为3,就是前面提到的那个数字3 (三个标准输入输出占了3位)。

总结起来启动子进程流程如下:

1、提取listener的fd,修改LISTENFDS环境变量为listener的数量,os.StartProcess启动子进程.

  1. files[i],err=l.(filer).File()

2、子进程启动执行a.net.Listen()时,根据环境变量LISTENFDS和fdStart 变量取出listener

  1. file:=os.NewFile(uintptr(i),"listener")

  2. l,err:=net.FileListener(file)

  3. file.Close()

根据fd创建一个文件,通过文件拿到listener的副本,然后关闭文件。最终a.net.Listen()的逻辑是如果是继承端口就返回一个listener副本,如果不是就启动一个新的listener。3、后续执行a.serve() 挂载server,然后通知父进程优雅关闭等逻辑。

小结

好了,以上就是对gracehttp的源码阅读分析。至此我们对http服务平滑重启是如何实现的已经有一个大致的了解了

免责声明:文章转载自《golang服务开发平滑升级之优雅重启》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇eclipse安装lombokAptana Studio 3 汉化简体中文版下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

使用golang开发mqtt服务压力测试工具 清明

package main import ( "flag" "fmt" "sync" "time" //导入mqtt包 MQTT "github.com/eclipse/paho.mqtt.golang" ) var f MQTT.MessageHandler = func(client MQTT.Clie...

Golang的高级数据类型-切片(slice)实战篇

          Golang的高级数据类型-切片(slice)实战篇                              作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任。        切片(slice)是Go中一种比较特殊的数据结构,这种数据结构更便于使用和管理数据集合,切片是围绕动态数组的概念构建的,可以按需自动增长。   ...

golang读写锁与互斥锁的性能比较

长时间来一直以为在读多写少的场景下,读写锁性能必然优于互斥锁,然而情况恰恰相反 不废话了,先上一段测试代码 func main() { var w = &sync.WaitGroup{} var num = 50000000 var c = make(chan int, 3000) var rwmutexTmp = newRwmut...

(八)golang--复杂类型之指针

首先我们要明确:(1)基本数据类型:变量存的就是值,也叫值类型; (2)获取变量的地址,用&,例如var num int,获取num的地址:&num; (3)指针类型:变量存的是一个地址,这个地址指向的空间存的才是值,例如var ptr *int = &num; (4)获取指针类型所指向的值,使用*,例如var *ptr int,使...

golang json解析

前言 Go 语言自带的 encode/json 包提供了对 JSON 数据格式的编码和解码能力。 解析 JSON 的关键,其实在于如何声明存放解析后数据的变量的类型。 此外使用 json 编码还会有几个需要注意的地方,谨防踩坑。 解析简单JSON 先观察下这段 JSON 数据的组成,name,created 是字符串。id 是整型,fruit 是一个字符串...

golang查找端口号占用的进程号

golang官方包: https://studygolang.com/pkgdoc os 支持获取当前进程pid并kill、但是仅仅限于获取当前进程pid FindProcess().Kill() os.Getpid() 基础用法;但是却没有提供依据端口号获取对应的pid,所以还是执行shell指令对结果集进行过滤获取pid // 获取8299端口对应进程...