golang mgo的mongo连接池设置:必须手动加上maxPoolSize

摘要:
我们公司的礼品系统使用戈朗的蒙古文图书馆mgo,中间有一些陷阱。总之,golang的mgo库的描述说明了连接重用是启用的。然而,观察和实验表明,这并不能从根本上实现连接控制。连接重用仅在有空闲连接时生效。当没有可用连接时,将连续创建新连接。因此,程序员需要最终限制最大连接=当nil{logkit.Logger.Error}returnresult}golangmain门户启动时,我们将创建一个全局会话,然后每次都使用clonessession信息和连接。使用后,调用会话。Close()以释放连接。

本司礼物系统使用了golang的 mongo库 mgo,中间踩了一些坑,总结下避免大家再踩坑

golang的mgo库说明里是说明了开启连接复用的,但观察实验发现,这并没有根本实现连接的控制,连接复用仅在有空闲连接时生效,高并发时无可用连接会不断创建新连接,所以最终还是需要程序员自行去限制最大连接才行。

废话不多说,开始上代码

GlobalMgoSession, err := mgo.Dial(host)
 
func (m *MongoBaseDao) Get(tablename string, id string, result interface{}) interface{} {
    session := GlobalMgoSession.Clone()
    defer session.Close()
 
    collection := session.DB(globalMgoDbName).C(tablename)
    err := collection.FindId(bson.ObjectIdHex(id)).One(result)
 
    if err != nil {
        logkit.Logger.Error("mongo_base method:Get " + err.Error())
    }
    return result
}

 

golang main入口启动时,我们会创建一个全局session,然后每次使用时clone session的信息和连接,用于本次请求,使用后调用session.Close() 释放连接。

// Clone works just like Copy, but also reuses the same socket as the original
// session, in case it had already reserved one due to its consistency
// guarantees.  This behavior ensures that writes performed in the old session
// are necessarily observed when using the new session, as long as it was a
// strong or monotonic session.  That said, it also means that long operations
// may cause other goroutines using the original session to wait.
func (s *Session) Clone() *Session {
    s.m.Lock()
    scopy := copySession(s, true)
    s.m.Unlock()
    return scopy
}
 
 
// Close terminates the session.  It's a runtime error to use a session
// after it has been closed.
func (s *Session) Close() {
    s.m.Lock()
    if s.cluster_ != nil {
        debugf("Closing session %p", s)
        s.unsetSocket()  //释放当前线程占用的socket 置为nil
        s.cluster_.Release()
        s.cluster_ = nil
    }
    s.m.Unlock()
}

 

Clone的方法注释里说明会重用原始session的socket连接,但是并发请求一大,其他协程来不及释放连接,当前协程会怎么办?

 

func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
    // Read-only lock to check for previously reserved socket.
    s.m.RLock()
    // If there is a slave socket reserved and its use is acceptable, take it as long
    // as there isn't a master socket which would be preferred by the read preference mode.
    if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
        socket := s.slaveSocket
        socket.Acquire()
        s.m.RUnlock()
        logkit.Logger.Info("sgp_test 1 acquireSocket slave is ok!")
        return socket, nil
    }
    if s.masterSocket != nil {
        socket := s.masterSocket
        socket.Acquire()
        s.m.RUnlock()
        logkit.Logger.Info("sgp_test 1  acquireSocket master is ok!")
        return socket, nil
    }
 
    s.m.RUnlock()
 
    // No go.  We may have to request a new socket and change the session,
    // so try again but with an exclusive lock now.
    s.m.Lock()
    defer s.m.Unlock()
    if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
        s.slaveSocket.Acquire()
        logkit.Logger.Info("sgp_test 2  acquireSocket slave is ok!")
        return s.slaveSocket, nil
    }
    if s.masterSocket != nil {
        s.masterSocket.Acquire()
        logkit.Logger.Info("sgp_test 2  acquireSocket master is ok!")
        return s.masterSocket, nil
    }
 
    // Still not good.  We need a new socket.
    sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
 
......
    logkit.Logger.Info("sgp_test 3   acquireSocket cluster AcquireSocket is ok!")
    return sock, nil
 
}

在源码中加debug,结果日志说明一切:

Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok!
Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1 acquireSocket slave is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!

不断的创建连接  AcquireSocket

 $  netstat -nat|grep -i 27017|wc -l

400

如果每个session 不调用close,会达到恐怖的4096,并堵死其他请求,所以clone或copy session时一定要defer close掉

启用maxPoolLimit 参数则会限制总连接大小,连接到限制则当前协程会sleep等待  直到可以创建连接,高并发时锁有问题,会导致多创建几个连接

src/gopkg.in/mgo.v2/cluster.go 
    s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
        if err == errPoolLimit {
            if !warnedLimit {
                warnedLimit = true
                logkit.Logger.Error("sgp_test WARNING: Per-server connection limit reached. " + err.Error())
                log("WARNING: Per-server connection limit reached.")
            }
            time.Sleep(100 * time.Millisecond)
            continue
        }
 
session.go:
// SetPoolLimit sets the maximum number of sockets in use in a single server
  // before this session will block waiting for a socket to be available.
  // The default limit is 4096.
  //
  // This limit must be set to cover more than any expected workload of the
  // application. It is a bad practice and an unsupported use case to use the
  // database driver to define the concurrency limit of an application. Prevent
  // such concurrency "at the door" instead, by properly restricting the amount
  // of used resources and number of goroutines before they are created.
  func (s *Session) SetPoolLimit(limit int) {
      s.m.Lock()
      s.poolLimit = limit
      s.m.Unlock()
  }

连接池设置方法:

1、配置中 增加 

[host]:[port]?maxPoolSize=10

2、代码中 :

dao.GlobalMgoSession.SetPoolLimit(10)

再做压测:

 $  netstat -nat|grep -i 27017|wc -l

15

 

 

结论:

每次clone session之后,操作结束时如果调用 session.Close 则会unset Socket  ,socket refer数减少,如果不设置上限,每个协程请求到来发现无空闲连接就会创建socket连接,直到达到最大值4096,而mongo的连接数上限一般也就是1万,也就是一个端口你只能启动一两个进程保证连接不被撑爆,过多的连接数客户端效率不高,server端更会耗费内存和CPU,所以需要启用自定义连接池 , 启用连接池也需要注意如果有pooMaxLimit个协程执行过长或者死循环不释放socket连接,也会悲剧。

mgo底层socket连接池只在maxPooMaxLimit 范围内实现复用,需要自行优化。

 

 

免责声明:文章转载自《golang mgo的mongo连接池设置:必须手动加上maxPoolSize》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇iOS13适配/黑暗模式的适配/KVC访问私有属性/模态弹窗ViewController 默认样式改变 /LaunchImage即将废弃/蓝牙的权限申请/推送Device Token适配/UIKit 控件变化/StatusBar新增样式centos6 python 安装 sqlite 解决 No module named ‘_sqlite3′下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Golang源码探索(二) 协程的实现原理(转)

Golang最大的特色可以说是协程(goroutine)了, 协程让本来很复杂的异步编程变得简单, 让程序员不再需要面对回调地狱,虽然现在引入了协程的语言越来越多, 但go中的协程仍然是实现的是最彻底的.这篇文章将通过分析golang的源代码来讲解协程的实现原理. 这个系列分析的golang源代码是Google官方的实现的1.9.2版本, 不适用于其他版...

MyBatis的使用

1.使用maven引入依赖: <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.5.4</version>...

ubuntu新建用户不能使用ll等指令,显示出来的信息没有颜色区分的解决方案

ubuntu利用 useradd -m test -g admin 指令,创建用户test及其工作目录。但是登陆后,会出现不能使用很多指令“比如:ll、显示的信息没有颜色”等等此时查看该用户的shell是所用的编译器echo $SHELL #这时会发现使用的/bin/sh把用户所使用的shell编译器修改成/bin/bash即可usermod -m /bi...

【GoLang】golang底层数据类型实现原理

虽然golang是用C实现的,并且被称为下一代的C语言,但是golang跟C的差别还是很大的。它定义了一套很丰富的数据类型及数据结构,这些类型和结构或者是直接映射为C的数据类型,或者是用C struct来实现。了解golang的数据类型和数据结构的底层实现,将有助于我们更好的理解golang并写出质量更好的代码。 基础类型 源码在:$GOROOT/src/...

saltstack--史上最细致安装攻略!亲测无坑

准备一台虚拟机node1: [root@linux-node1 pillar]# ifconfig ens33: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 inet 192.168.88.137 netmask 255.255.255.0 broadcast 1...

tensorflow安装: win10 + RTX2060 + tensorflow1.15.0+ cuda10.0 + VScode

引言: 之前用的tensorflow 1.10版本,发现在训练CNN的时候会自动中止,最后定位到加入卷积层就会导致训练崩溃/中止,只用全连接层却能正常训练。重装一天后无果,干脆全部升级使用tensorflow1.15: 改用WIN10+python3.7+tensorflow1.15.0+CUDA10.0(+cudnn7.6.5)+VScode 顺便记录下...