前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang redis 客户端源码阅读(2)连接池初始化

golang redis 客户端源码阅读(2)连接池初始化

作者头像
golangLeetcode
发布2022-08-02 16:08:05
5990
发布2022-08-02 16:08:05
举报
文章被收录于专栏:golang算法架构leetcode技术php

初始化连接池的核心代码如下

代码语言:javascript
复制
  client.pool = &redis.Pool{
    MaxIdle:     client.MaxIdle,
    IdleTimeout: time.Duration(client.IdleTimeoutS) * time.Second,
    MaxActive:   client.MaxActive,
    Dial: func() (redis.Conn, error) {
      var c redis.Conn
      var err error
      for i := 0; i < len(client.Servers)+1; i++ {
        //随机挑选一个IP
        index := common.RandIntn(len(client.Servers))
        client.current_index = index
        c, err = redis.DialTimeout("tcp", client.Servers[index],
          time.Duration(client.ConnTimeoutMs)*time.Millisecond,
          time.Duration(client.ReadTimeoutMs)*time.Millisecond,
          time.Duration(client.WriteTimeoutMs)*time.Millisecond)
          }
        }
        }

除了超时和最大活跃连接数最大空闲连接数外,最重要的就是指定连接函数。

连接函数只是定义,在用的时候才连接,连接的调用链路如下:

conn.go

redis.DialTimeout(){}

代码语言:javascript
复制
func Dial(network, address string, options ...DialOption) (Conn, error) {

然后调用

代码语言:javascript
复制
 net.Dial的dial函数进行tcp连接,接着
代码语言:javascript
复制
"AUTH"验证和db选择
代码语言:javascript
复制
"SELECT"

返回一个连接

连接池的使用:

代码语言:javascript
复制
conn := client.pool.Get()
  defer conn.Close()
  _, err := conn.Do("SET", key, value)

1,从池中捞一个链接

2,发送

3,放回池子

连接池的定义

代码语言:javascript
复制
type Pool struct {

  // Dial is an application supplied function for creating and configuring a
  // connection.
  //
  // The connection returned from Dial must not be in a special state
  // (subscribed to pubsub channel, transaction started, ...).
  Dial func() (Conn, error) //连接函数

  // TestOnBorrow is an optional application supplied function for checking
  // the health of an idle connection before the connection is used again by
  // the application. Argument t is the time that the connection was returned
  // to the pool. If the function returns an error, then the connection is
  // closed.
  TestOnBorrow func(c Conn, t time.Time) error 
  //每次从连接池取出连接的时候,检查连接的健康度,如果放回错误,则释放这个连接

  // Maximum number of idle connections in the pool.
  MaxIdle int  //最大空闲连接

  // Maximum number of connections allocated by the pool at a given time.
  // When zero, there is no limit on the number of connections in the pool.
  MaxActive int  //如果是0 无限制,非0 一定时间端内,池子则最大的连接数

  // Close connections after remaining idle for this duration. If the value
  // is zero, then idle connections are not closed. Applications should set
  // the timeout to a value less than the server's timeout.
  IdleTimeout time.Duration //如果是0 连接不关闭,非0 ,剩余关闭时间

  // If Wait is true and the pool is at the MaxActive limit, then Get() waits
  // for a connection to be returned to the pool before returning.
  Wait bool  //当Wait 为true 时,并且池子则最大活跃连接数达到最大限制,获取连接的方法需要等待,有连接被放回池子,才能使用

  // mu protects fields defined below.
  mu     sync.Mutex
  cond   *sync.Cond
  closed bool
  active int

  // Stack of idleConn with most recently used at the front.
  idle list.List  //存放空闲连接的链表
}

获取可用连接函数(放回的连接用完后,需要用户自己释放)

其实这里返回的连接不是最原始的连接,而是池化连接

代码语言:javascript
复制
type pooledConnection struct {
  p     *Pool
  c     Conn
  state int
}

即对原始连接进行了包装

代码语言:javascript
复制
// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {
  c, err := p.get()
  if err != nil {
    return errorConnection{err}
  }
  return &pooledConnection{p: p, c: c}
}

//释放
func (pc *pooledConnection) Close() error {
  c := pc.c
  if _, ok := c.(errorConnection); ok {
    return nil
  }
  pc.c = errorConnection{errConnClosed}

  if pc.state&internal.MultiState != 0 {
    c.Send("DISCARD")
    pc.state &^= (internal.MultiState | internal.WatchState)
  } else if pc.state&internal.WatchState != 0 {
    c.Send("UNWATCH")
    pc.state &^= internal.WatchState
  }
  if pc.state&internal.SubscribeState != 0 {
    c.Send("UNSUBSCRIBE")
    c.Send("PUNSUBSCRIBE")
    // To detect the end of the message stream, ask the server to echo
    // a sentinel value and read until we see that value.
    sentinelOnce.Do(initSentinel)
    c.Send("ECHO", sentinel)
    c.Flush()
    for {
      p, err := c.Receive()
      if err != nil {
        break
      }
      if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
        pc.state &^= internal.SubscribeState
        break
      }
    }
  }
  c.Do("")
  pc.p.put(c, pc.state != 0)
  return nil
}
代码语言:javascript
复制

// get prunes stale connections and returns a connection from the idle list or
// creates a new connection.
func (p *Pool) get() (Conn, error) {
  p.mu.Lock()

  // Prune stale connections.

  if timeout := p.IdleTimeout; timeout > 0 {
    for i, n := 0, p.idle.Len(); i < n; i++ {
      e := p.idle.Back()
      if e == nil {//没有空闲连接了
        break
      }
      ic := e.Value.(idleConn)
      if ic.t.Add(timeout).After(nowFunc()) {//连接已经超时
        break
      }
      p.idle.Remove(e) //从空闲连接中移除
      p.release()//1,空闲连接数减一  2,给所有等待获取连接的协程发信号
      p.mu.Unlock()
      ic.c.Close()//以下几种,状态特殊处理,
      //最后将连接   重新放回连接池头部   ,如果达到最大连接数,则挤掉尾部连接,并放回
        //const (
        //WatchState = 1 << iota
        //MultiState
        //SubscribeState
        //MonitorState
        //)
      p.mu.Lock()
    }
  }

  for {

    // Get idle connection.

    for i, n := 0, p.idle.Len(); i < n; i++ {
      e := p.idle.Front()
      if e == nil {
        break
      }
      ic := e.Value.(idleConn)
      p.idle.Remove(e)//从空闲连接中取出
      test := p.TestOnBorrow //检查连接是否可用
      p.mu.Unlock()
      if test == nil || test(ic.c, ic.t) == nil {
        return ic.c, nil  //可用就直接放回
      }
      ic.c.Close()  //关闭不可用连接,(放回链表头部)
      p.mu.Lock()
      p.release()
    }

    // Check for pool closed before dialing a new connection.

    if p.closed {
      p.mu.Unlock()
      return nil, errors.New("redigo: get on closed pool")
    }

    // Dial new connection if under limit.

    if p.MaxActive == 0 || p.active < p.MaxActive {
      dial := p.Dial // 没有达到最大活跃连接数,重新生成一个连接,并返回
      p.active += 1
      p.mu.Unlock()
      c, err := dial()
      if err != nil {
        p.mu.Lock()
        p.release()
        p.mu.Unlock()
        c = nil
      }
      return c, err
    }

    if !p.Wait {
      p.mu.Unlock()
      return nil, ErrPoolExhausted
    }

    if p.cond == nil {
      p.cond = sync.NewCond(&p.mu)
    }
    p.cond.Wait() //循环中等待事件发生
  }
}

func (p *Pool) put(c Conn, forceClose bool) error {
  err := c.Err()
  p.mu.Lock()
  if !p.closed && err == nil && !forceClose {
    p.idle.PushFront(idleConn{t: nowFunc(), c: c})
    if p.idle.Len() > p.MaxIdle {//达到最大空闲连接数,将队列尾部的连接,弹出
      c = p.idle.Remove(p.idle.Back()).(idleConn).c
    } else {
      c = nil
    }
  }

  if c == nil {
    if p.cond != nil {
      p.cond.Signal()   //没有达到最大空闲连接数,发信号,重新生成一个连接
    }
    p.mu.Unlock()
    return nil
  }

  p.release()
  p.mu.Unlock()  
  return c.Close() //达到最大连接数,关闭连接
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-04-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档