怎么做貨物收發(fā)的網(wǎng)站網(wǎng)站銷售怎么推廣
1. 執(zhí)行命令的入口方法
redis也是通過hook執(zhí)行命令,initHooks時,會將redis的hook放在第一個
通過hook調(diào)用到process方法,process方法內(nèi)部再調(diào)用_process
2. 線程池初始化
redis在新建單客戶端、sentinel客戶端、cluster客戶端等,都會newConnPool初始化線程池
2.1.1. NewClient方式初始化連接池
// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {opt.init()c := Client{baseClient: &baseClient{opt: opt,},}c.init()// 初始化線程池c.connPool = newConnPool(opt, c.dialHook)return &c
}
2.1.2. NewFailoverClient方式初始化連接池
// NewFailoverClient returns a Redis client that uses Redis Sentinel
// for automatic failover. It's safe for concurrent use by multiple
// goroutines.
// zhmark 2024/6/13 NewFailoverClient
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {if failoverOpt.RouteByLatency {panic("to route commands by latency, use NewFailoverClusterClient")}if failoverOpt.RouteRandomly {panic("to route commands randomly, use NewFailoverClusterClient")}sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))copy(sentinelAddrs, failoverOpt.SentinelAddrs)// todo:2024/6/26 有問題,每次都是換成1、3、2// 將 sentinelAddrs 切片中的元素順序隨機打亂,實現(xiàn)隨機化效果rand.Shuffle(len(sentinelAddrs), func(i, j int) {//交換 sentinelAddrs 中第 i 個和第 j 個元素sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]})failover := &sentinelFailover{opt: failoverOpt,sentinelAddrs: sentinelAddrs,}opt := failoverOpt.clientOptions()// 初始化賦值連接建立函數(shù)opt.Dialer = masterReplicaDialer(failover)opt.init()var connPool *pool.ConnPoolrdb := &Client{baseClient: &baseClient{opt: opt,},}rdb.init()
// 初始化線程池connPool = newConnPool(opt, rdb.dialHook)rdb.connPool = connPoolrdb.onClose = failover.Closefailover.mu.Lock()// 關(guān)閉老的有問題的地址連接//如:發(fā)現(xiàn)新讀取的主節(jié)點地址和本地保存的不一樣,將之前和老的主節(jié)點連接斷開// addr是新的master地址failover.onFailover = func(ctx context.Context, addr string) {_ = connPool.Filter(func(cn *pool.Conn) bool {// 如果連接的遠程地址與 addr 不同,則返回 true,表示要關(guān)閉此連接;否則返回 false,表示保留該連接return cn.RemoteAddr().String() != addr})}failover.mu.Unlock()return rdb
}
2.1. NewClusterClient方式初始化線程池
cluster模式和上面的NewClient、NewFailoverClient不一樣。cluster模式new的時候不會初始化連接池,而是等執(zhí)行命令時,獲取所有節(jié)點,每個節(jié)點新建一個redisClient,每個client單獨一個連接池
2.1.1. 初始化NewClusterClient時不會新建連接池
// NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient {// 初始化opt,其中會初始化NewClient方法opt.init()c := &ClusterClient{opt: opt,nodes: newClusterNodes(opt),}// 獲取所有主從節(jié)點信息,并保存在本地c.state = newClusterStateHolder(c.loadState)// 保存命令詳情c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)c.cmdable = c.Processc.initHooks(hooks{dial: nil,process: c._process,pipeline: c.processPipeline,txPipeline: c.processTxPipeline,})return c
}
2.1.2. 執(zhí)行命令時,通過cmdNode執(zhí)行到NewClient,初始化線程池
通過clOpt的NewClient方法,初始化client,進而初始化線程池
2.1.3. 然而clOpt的NewClient方法什么時候初始化賦值的呢
在NewClusterClient方法的opt.init()中
3. 如何新建連接
總覽圖
3.1.1. 第一次執(zhí)行命令時,go-redis會先通過cmdNode方法,獲取所有的節(jié)點信息
3.1.2. 底層調(diào)用到ClusterSlots方法,觸發(fā)redis.go中_process方法,內(nèi)部調(diào)用_withConn方法,通過getConn方法獲取可用連接
3.1.3. getConn方法內(nèi)部發(fā)現(xiàn)無可用連接,則會調(diào)用newConn
3.1.4. newConn內(nèi)部,調(diào)用連接池的dialConn方法觸發(fā)調(diào)用
3.1.5. dialConn調(diào)用配置項的Dialer方法
3.1.6. p.cfg.Dialer在newConnPool時候初始化的,通過Dialer方法,觸發(fā)dialer
3.1.7. 而dialer是newClient時傳入的dialhook,至此直接觸發(fā)了dialhook
3.1.8. sentinel模式也是在NewFailoverClient時傳入的dialhook
3.1.9. redis自己的dialHook內(nèi)部,執(zhí)行的是opt的Dialer方法
3.1.10. 此Dialer方法是在NewClient中opt.init()初始化方法中賦值的,如果沒有自定義,就用默認的建連方法
3.1.11. 默認的建連方法很簡單,調(diào)用go底層的net建立連接
3.1.12. sentinel模式不一樣,NewFailoverClient方法有自定義建連方法
3.1.13. 里面實現(xiàn)了讀寫分離
4. 閑置連接如何關(guān)閉
看是否有配置MinIdleConns
和MaxIdleConns
。如果有配置了MinIdleConns
,那么在NewConnPool、popIdle、removeConn時,都會調(diào)用checkMinIdleConns
補充創(chuàng)建最低閑置連接數(shù)
// Minimum number of idle connections which is useful when establishing
// new connection is slow.
// Default is 0. the idle connections are not closed by default.
MinIdleConns int
// Maximum number of idle connections.
// Default is 0. the idle connections are not closed by default.
MaxIdleConns int
每次執(zhí)行完方法,會釋放連接
5. 如何控制閑置連接數(shù)大小
6. 如何控制總連接數(shù)
poolSize:控制最大并發(fā)量
turn可能為0,閑置連接數(shù)為最大poolSize
7. 如何保持連接池內(nèi)的連接健康
每次Get連接時,會檢查連接是否健康
func (p *ConnPool) Get(ctx context.Context, isReadCmd bool) (*Conn, error) {if p.closed() {return nil, ErrClosed}// 排隊if err := p.waitTurn(ctx); err != nil {return nil, err}for {p.connsMu.Lock()// 獲取一個可用的連接cn, err := p.popIdle(isReadCmd)p.connsMu.Unlock()if err != nil {p.freeTurn()return nil, err}if cn == nil {break}// 讀請求走replica,只是多一層保護if p.cfg.ReadMode == _const.READ_MODE_REPLICA {if isReadCmd && cn.remoteType != REMOTE_TYPE_REPLICA {continue}// 寫請求不走replicaif !isReadCmd && cn.remoteType == REMOTE_TYPE_REPLICA {continue}}if !p.isHealthyConn(cn) {_ = p.CloseConn(cn)continue}atomic.AddUint32(&p.stats.Hits, 1)return cn, nil}atomic.AddUint32(&p.stats.Misses, 1)// zhmark 2024/6/18 如果連接池里沒有可用的連接,那么新建連接newcn, err := p.newConn(ctx, true, isReadCmd)if err != nil {p.freeTurn()return nil, err}return newcn, nil
}
7.1. isHealthyConn內(nèi)方法解析
// zhmark 2024/7/8 連接關(guān)鍵檢查,維護連接池連接健康
func (p *ConnPool) isHealthyConn(cn *Conn) bool {now := time.Now()// ConnMaxLifetime 默認為0if p.cfg.ConnMaxLifetime > 0 && now.Sub(cn.createdAt) >= p.cfg.ConnMaxLifetime {return false}// ConnMaxIdleTime Default is 30 minutes. -1 disables idle timeout checkif p.cfg.ConnMaxIdleTime > 0 && now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {return false}if connCheck(cn.netConn) != nil {return false}cn.SetUsedAt(now)return true
}
7.1.1. 連接使用時長檢驗
-
- ConnMaxLifetime默認為0,如果配置了ConnMaxLifetime,那么如果當前時間離連接創(chuàng)建時間超過ConnMaxLifetime,則會判定連接為不健康,進而關(guān)閉連接
7.1.2. 連接空閑時長檢驗
-
- ConnMaxIdleTime,默認為30分鐘,如果連接超過ConnMaxIdleTime時間未使用,則會判定連接為不健康
7.1.3. 檢查底層網(wǎng)絡(luò)連接狀態(tài)
func connCheck(conn net.Conn) error {// Reset previous timeout._ = conn.SetDeadline(time.Time{})sysConn, ok := conn.(syscall.Conn)if !ok {return nil}rawConn, err := sysConn.SyscallConn()if err != nil {return err}var sysErr errorif err := rawConn.Read(func(fd uintptr) bool {var buf [1]byten, err := syscall.Read(int(fd), buf[:])switch {case n == 0 && err == nil:sysErr = io.EOFcase n > 0:sysErr = errUnexpectedReadcase err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:sysErr = nildefault:sysErr = err}return true}); err != nil {return err}return sysErr
}
8. 如何實時監(jiān)控連接池狀態(tài)
PoolStats