专栏首页坚毅的PHPgolang mgo的mongo连接池设置:必须手动加上maxPoolSize

golang mgo的mongo连接池设置:必须手动加上maxPoolSize

本司礼物系统使用了golang的 mongo库 mgo,中间踩了一些坑,总结下避免大家再踩坑

golang的mgo库说明里是说明了开启连接复用的,但观察实验发现,这并没有根本实现连接的控制,连接复用仅在有空闲连接时生效,高并发时无可用连接会不断创建新连接,所以最终还是需要程序员自行去限制最大连接才行。

废话不多说,开始上代码

GlobalMgoSession, err := mgo.Dial(host)   func (m *MongoBaseDao) Get(tablename string, id string, result interface{}) interface{} {     session := GlobalMgoSession.Clone()     defer session.Close()       collection := session.DB(globalMgoDbName).C(tablename)     err := collection.FindId(bson.ObjectIdHex(id)).One(result)       if err != nil {         logkit.Logger.Error("mongo_base method:Get " + err.Error())     }     return result }

golang main入口启动时,我们会创建一个全局session,然后每次使用时clone session的信息和连接,用于本次请求,使用后调用session.Close() 释放连接。

// Clone works just like Copy, but also reuses the same socket as the original // session, in case it had already reserved one due to its consistency // guarantees.  This behavior ensures that writes performed in the old session // are necessarily observed when using the new session, as long as it was a // strong or monotonic session.  That said, it also means that long operations // may cause other goroutines using the original session to wait. func (s *Session) Clone() *Session {     s.m.Lock()     scopy := copySession(s, true)     s.m.Unlock()     return scopy }     // Close terminates the session.  It's a runtime error to use a session // after it has been closed. func (s *Session) Close() {     s.m.Lock()     if s.cluster_ != nil {         debugf("Closing session %p", s)         s.unsetSocket()  //释放当前线程占用的socket 置为nil         s.cluster_.Release()         s.cluster_ = nil     }     s.m.Unlock() }

Clone的方法注释里说明会重用原始session的socket连接,但是并发请求一大,其他协程来不及释放连接,当前协程会怎么办?

func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {     // Read-only lock to check for previously reserved socket.     s.m.RLock()     // If there is a slave socket reserved and its use is acceptable, take it as long     // as there isn't a master socket which would be preferred by the read preference mode.     if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {         socket := s.slaveSocket         socket.Acquire()         s.m.RUnlock()         logkit.Logger.Info("sgp_test 1 acquireSocket slave is ok!")         return socket, nil     }     if s.masterSocket != nil {         socket := s.masterSocket         socket.Acquire()         s.m.RUnlock()         logkit.Logger.Info("sgp_test 1  acquireSocket master is ok!")         return socket, nil     }       s.m.RUnlock()       // No go.  We may have to request a new socket and change the session,     // so try again but with an exclusive lock now.     s.m.Lock()     defer s.m.Unlock()     if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {         s.slaveSocket.Acquire()         logkit.Logger.Info("sgp_test 2  acquireSocket slave is ok!")         return s.slaveSocket, nil     }     if s.masterSocket != nil {         s.masterSocket.Acquire()         logkit.Logger.Info("sgp_test 2  acquireSocket master is ok!")         return s.masterSocket, nil     }       // Still not good.  We need a new socket.     sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)   ......     logkit.Logger.Info("sgp_test 3   acquireSocket cluster AcquireSocket is ok!")     return sock, nil   }

在源码中加debug,结果日志说明一切:

Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok! Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok! Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1 acquireSocket slave is ok! Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok! Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok! Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!

不断的创建连接  AcquireSocket

 $  netstat -nat|grep -i 27017|wc -l

400

如果每个session 不调用close,会达到恐怖的4096,并堵死其他请求,所以clone或copy session时一定要defer close掉

启用maxPoolLimit 参数则会限制总连接大小,连接到限制则当前协程会sleep等待  直到可以创建连接,高并发时锁有问题,会导致多创建几个连接

src/gopkg.in/mgo.v2/cluster.go      s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)         if err == errPoolLimit {             if !warnedLimit {                 warnedLimit = true                 logkit.Logger.Error("sgp_test WARNING: Per-server connection limit reached. " + err.Error())                 log("WARNING: Per-server connection limit reached.")             }             time.Sleep(100 * time.Millisecond)             continue         }   session.go: // SetPoolLimit sets the maximum number of sockets in use in a single server   // before this session will block waiting for a socket to be available.   // The default limit is 4096.   //   // This limit must be set to cover more than any expected workload of the   // application. It is a bad practice and an unsupported use case to use the   // database driver to define the concurrency limit of an application. Prevent   // such concurrency "at the door" instead, by properly restricting the amount   // of used resources and number of goroutines before they are created.   func (s *Session) SetPoolLimit(limit int) {       s.m.Lock()       s.poolLimit = limit       s.m.Unlock()   }

连接池设置方法:

1、配置中 增加 

[host]:[port]?maxPoolSize=10

2、代码中 :

dao.GlobalMgoSession.SetPoolLimit(10)

再做压测:

 $  netstat -nat|grep -i 27017|wc -l

15

结论:

每次clone session之后,操作结束时如果调用 session.Close 则会unset Socket  ,socket refer数减少,如果不设置上限,每个协程请求到来发现无空闲连接就会创建socket连接,直到达到最大值4096,而mongo的连接数上限一般也就是1万,也就是一个端口你只能启动一两个进程保证连接不被撑爆,过多的连接数客户端效率不高,server端更会耗费内存和CPU,所以需要启用自定义连接池 , 启用连接池也需要注意如果有pooMaxLimit个协程执行过长或者死循环不释放socket连接,也会悲剧。

mgo底层socket连接池只在maxPooMaxLimit 范围内实现复用,需要自行优化。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hbase复杂操作的优化- Htable HtablePool

    Htable主要提供表内的操作,put,delete,get,scan等操作 HTablePool 可以建立池,存储HTableInterface接口的实现对象...

    财主刀刀
  • zookeeper学习系列:四、Paxos算法和zookeeper的关系

    一、问题起源 淘宝搜索的博客 http://www.searchtb.com/2011/01/zookeeper-research.html  提到Paxos是...

    财主刀刀
  • zookeeper学习系列:三、利用zookeeper做选举和锁

    之前只理解zk可以做命名,配置服务,现在学习下他怎么用作选举和锁,进一步还可构建master-slave模式的分布式系统。 为什么叫Zoo?“因为要协调的分布式...

    财主刀刀
  • 最简单的php trim函数并不简单

    字符串的处理在任何程序中应该是最最常见的了吧。php 的trim函数就是用来去除字符串的字符串。最常用的就是去除空格了。但是,这个简单的函数,是否真的像你认为的...

    写PHP的老王
  • 多比最新 | 鏖战群雄,UNEC逆市突破,上线涨幅高达600+

    继昨日币圈韭菜血泪控诉之后,原本喧嚣的币圈似乎即将要宣告沉寂一段日子了。在这风口浪尖之下,需要一盆冷水冰一下怒火中烧的小心脏,毕竟今年的熊市实在是持续的太久了。...

    dobitrade
  • python编写简单聊天程序

    渔父歌
  • 网络编程

    阅读目录 一.楔子 二.客户端/服务端架构 三.网络基础 四.套接字(socket)初使用 五.黏包 六.验证客户端链接的合法性 七.socketserver ...

    人生不如戏
  • socket.io实践干货

    sunsky
  • JS-字符串截取方法slice、substring、substr的区别

    slice() 方法可通过指定的开始和结束位置,提取字符串的某个部分,并以新的字符串返回被提取的部分。语法如下:

    TimothyJia
  • Python内置ssl模块编程(1)

    py3study

扫码关注云+社区

领取腾讯云代金券