前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go微服务系列之三

go微服务系列之三

作者头像
李海彬
发布2019-05-08 10:41:37
6960
发布2019-05-08 10:41:37
举报
文章被收录于专栏:Golang语言社区Golang语言社区

作者:稀饭下雪 链接:https://www.jianshu.com/p/9cb474dd451d 来源:简书

在前两篇系列博文中,我已经实现了user-srv、web-srv、api-srv,在新的一篇博文中,我要讲解的是如何在项目中如何使用redis存储session。如果想直接查阅源码或者通过demo学习的,可以访问ricoder_demo。

如何编写一个微服务?这里用的是go的微服务框架go micro,具体的情况可以查阅:http://btfak.com/%E5%BE%AE%E6%9C%8D%E5%8A%A1/2016/03/28/go-micro/

一、构建user-status.srv

1.1 构建UserStatus.proto

定义User的状态操作函数,部分源码如下:

代码语言:javascript
复制
 1syntax = "proto3";
 2
 3package pb;
 4
 5service UserStatus {
 6//通过uid获取session
 7rpc GetSessionByUID(GetSessionByUIDReq) returns (GetSessionByUIDRep) {}
 8//通过token获取session
 9rpc GetSessionByToken(GetSessionByTokenReq) returns (GetSessionByTokenRep) {}
10//获取用户的长连接地址
11rpc GetConnectorAddr(GetConnectorAddrReq) returns (GetConnectorAddrRep) {}
12//更新用户长连接地址(用户建立长连接时调用)
13rpc UpdateConnectorAddr(UpdateConnectorAddrReq) returns (UpdateConnectorAddrRep) {}
14//构建session用户登录时调用,此接口会清除旧session
15rpc NewSession(NewSessionReq) returns (NewSessionRep) {}
16//移除session登出时会调用
17rpc RemoveSession(RemoveSessionReq) returns (RemoveSessionRep) {}
18//token续期
19rpc RefreshSession(RefreshSessionReq) returns (RefreshSessionRep) {}
20//更新用户长连接地址(用户建立长连接时调用)
21rpc UserConnected(UserConnectedReq) returns (UserConnectedRep) {}
22//删除用户的长连接地址(用户长连接断开时调用)
23rpc UserDisonnected(UserDisonnectedReq) returns (UserDisonnectedRep) {}
24//通过uid来移除session
25rpc RemoveSessionByUID(RemoveSessionByUIDReq) returns (RemoveSessionByUIDRep) {}
26//通过token找uid
27rpc GetUserIDByToken(GetUserIDByTokenReq) returns (GetUserIDByTokenRep) {}
28}
29/*
30还有一些定义,完整示例可以查看源码~
31*/

1.2 运行脚本build_proto.sh自动构建userStatus.pb.go

代码语言:javascript
复制
1$ bash ./build_proto.sh

这个build_proto.sh是我自己构建的一个脚本文件,运行之后会在/src/share/pb/文件夹下面生成一个userStatus.pb.go文件

1.3 构建handler,实现userStatus中的函数

我在src文件夹下面添加一个user-status-srv文件夹,并在里边添加一个handler文件夹和utils文件夹,一个存放handler文件,一个存放工具类函数,然后实现handler函数,源码如下:

代码语言:javascript
复制
 1package handler
 2
 3import (
 4    //多个导入包,具体请查看源码
 5)
 6type UserStatusHandler struct {
 7    pool               *redis.Pool
 8    logger             *zap.Logger
 9    namespace          string
10    sessionExpire int
11    tokenExpire int
12}
13
14func NewUserStatusHandler(pool *redis.Pool) *UserStatusHandler {
15    return &UserStatusHandler{
16        pool: pool,
17        sessionExpire: 15 * 86400,
18        tokenExpire:   15 * 86400,
19    }
20}
21
22//GetUserIDByToken GetUIDByToken
23func (s *UserStatusHandler) GetUserIDByToken(ctx context.Context, req *pb.GetUserIDByTokenReq, rsp *pb.GetUserIDByTokenRep) error {
24    return nil  
25}
26/*
27还有其他函数的实现,完整示例可以查看源码~
28*/

这里实现的函数全部先采用空实现,在后面会慢慢添加

1.4 实现main函数,启动service

源码如下:

代码语言:javascript
复制
 1package main
 2
 3import (
 4    //多个导入包,具体查看完整源码
 5)
 6
 7func main() {
 8
 9    // 创建Service,并定义一些参数
10    service := micro.NewService(
11        micro.Name(config.Namespace+"userStatus"),
12        micro.Version("latest"),
13    )
14    // 定义Service动作操作
15    service.Init(
16        micro.Action(func(c *cli.Context) {
17            log.Println("micro.Action test ...")
18            // 注册redis
19            redisPool := share.NewRedisPool(3, 3, 1,300*time.Second,":6379","redis")
20            // 先注册db
21            db.Init(config.MysqlDSN)
22            pb.RegisterUserStatusHandler(service.Server(), handler.NewUserStatusHandler(redisPool), server.InternalHandler(true))
23        }),
24        micro.AfterStop(func() error {
25            log.Println("micro.AfterStop test ...")
26            return nil
27        }),
28        micro.AfterStart(func() error {
29            log.Println("micro.AfterStart test ...")
30            return nil
31        }),
32    )
33
34    log.Println("启动user-status-srv服务 ...")
35
36    //启动service
37    if err := service.Run(); err != nil {
38        log.Panic("user-status-srv服务启动失败 ...")
39    }
40}

由源码可以看出,我在启动service之前先注册了redis、db以及绑定handler,再通过Run启动service。

1.5 查看consul

在浏览器打开 http://127.0.0.1:8500/ ,如果可以在页面中看到对应的srv,则说明service启动成功。如:

二、使用redis

在这一章节中,我将采用redis实现数据的存取。

2.1 新建一个redis.Pool

在main.go函数中,我使用 *share.NewRedisPool(3, 3, 1,300time.Second,":6379","redis") 得到了一个redisPool,NewRedisPool源码如下:

代码语言:javascript
复制
 1func NewRedisPool(maxIdle, maxActive , DBNum int, timeout time.Duration, addr , password string) *redis.Pool {
 2
 3    return &redis.Pool{
 4        MaxActive:   maxActive,
 5        MaxIdle:     maxIdle,
 6        IdleTimeout: timeout,
 7        Wait:        true,
 8        Dial: func() (redis.Conn, error) {
 9            // return redis.DialURL(rawurl)
10            // return redis.Dial("tcp", addr, redis.DialPassword(password), redis.DialDatabase(dbNum))
11            return redis.Dial("tcp", addr, redis.DialPassword(password), redis.DialDatabase(DBNum))
12        },
13        TestOnBorrow: func(c redis.Conn, t time.Time) error {
14            _, err := c.Do("PING")
15            return err
16        },
17    }
18}

在这里我使用的是第三方开源框架,有兴趣的可以查看 https://github.com/garyburd/redigo 了解情况。

2.2 使用redis存取数据

在这里我以NewSession为例,源码如下:

代码语言:javascript
复制
 1func (s *UserStatusHandler) NewSession(ctx context.Context, req *pb.NewSessionReq, rsp *pb.NewSessionRep) error {
 2    var oldSession *pb.Session
 3    defer func() {
 4        utils.SessionFree(oldSession)
 5    }()
 6    fieldMap := make(map[string]interface{}, 0)
 7    fieldMap["Uid"] = req.Id
 8    fieldMap["Address"] = req.Address
 9    fieldMap["Phone"] = req.Phone
10    fieldMap["Name"] = req.Name
11    //生成Token
12    token, err := utils.NewToken(req.Id)
13    if err != nil {
14        log.Println("生成token失败", zap.Error(err), zap.Int32("uid", req.Id))
15        return err
16    }
17
18    //删除所有旧token
19    if err = utils.RemoveUserSessions(req.Id, s.pool); err != nil {
20        log.Println("删除所有旧token失败", zap.Error(err), zap.Int32("uid", req.Id))
21        return err
22    }
23    conn := s.pool.Get()
24    //会话数据写入redis,格式:t:id => map的哈希值
25    if _, err := conn.Do("HMSET", redis.Args{}.Add(utils.KeyOfSession(req.Id)).AddFlat(fieldMap)...); err != nil {
26        conn.Close()
27        log.Println("会话数据写入redis失败", zap.Error(err), zap.String("key", utils.KeyOfSession(req.Id)), zap.Any("参数", fieldMap))
28        return err
29    }
30    //设置t:id的过期时间
31    if _, err := conn.Do("EXPIRE", utils.KeyOfSession(req.Id), s.sessionExpire); err != nil {
32        conn.Close()
33        s.logger.Error("设置session过期时间失败", zap.Error(err), zap.String("key", utils.KeyOfSession(req.Id)))
34        return err
35    }
36
37    //用户token写入set里边,格式:t:uid:set:id => token
38    keyOfSet := utils.KeyOfSet(req.Id)
39    if _, err = conn.Do("SADD", keyOfSet, token); err != nil {
40        conn.Close()
41        log.Println("token写入用户集合失败", zap.Error(err), zap.String("key", keyOfSet), zap.String("参数", token))
42        return err
43    }
44    //设置t:uid:set:id的过期时间
45    if _, err = conn.Do("EXPIRE", keyOfSet, s.sessionExpire); err != nil {
46        conn.Close()
47        log.Println("设置用户token集合过期时间失败", zap.Error(err), zap.String("key", keyOfSet))
48        return err
49    }
50
51    //将token和id对应,格式:token => id
52    if _, err = conn.Do("SETEX", utils.KeyOfToken(token), s.tokenExpire, req.Id); err != nil {
53        conn.Close()
54        log.Println("token写入redis失败", zap.Error(err), zap.String("key", utils.KeyOfToken(token)), zap.Int32("参数", req.Id))
55        return err
56    }
57
58    rsp.Token = token
59    return nil
60}

如代码所示,操作redis的步骤是 conn := s.pool.Get() 先开启一个连接,再通过conn.Do("EXPIRE", keyOfSet, s.sessionExpire) 的一种方式操作redis中的数据,具体的可以查看redis的api,这里有个函数 utils.SessionFree(oldSession) ,这是我在utils包下自定义的一个函数,这个知识点再接下来的知识点中会有涉及。

三、额外讲解sync.Pool

我在项目中使用了sync.pool存储session对象,目的是为了保存和复用session这个临时对象,以减少内存分配,减低gc压力,那么sync.Pool是什么呢?以下是官方给出的解释(自己翻译的):

1. Pool是一个可以存取临时对象的集合。 2. Pool中保存的item都可能在没有任何通知的情况下被自动释放掉,即如果Pool持有该对象的唯一引用,这个item就可能被回收。 3. Pool在被多个线程使用的情况下是安全的。 4. Pool的目的是缓存分配了但是未使用的item用于之后的重用,以减轻GC的压力。也就是说,pool让创建高效的并且线程安全的空闲列表更加容易,不过Pool并不适用于所有空闲列表。 5. Pool的合理用法是用于管理一组被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。 6. Pool比较经典的一个例子在fmt包里,该Pool维护一个动态大小的临时输出缓存仓库,该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。 7. 另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。

以下是Pool的数据类型:

代码语言:javascript
复制
 1type Pool struct {
 2    noCopy noCopy
 3
 4    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
 5    localSize uintptr        // size of the local array
 6
 7    // New optionally specifies a function to generate
 8    // a value when Get would otherwise return nil.
 9    // It may not be changed concurrently with calls to Get.
10    New func() interface{}
11}
12
13// Local per-P Pool appendix.
14type poolLocalInternal struct {
15    private interface{}   // Can be used only by the respective P.
16    shared  []interface{} // Can be used by any P.
17    Mutex                 // Protects shared.
18}
19
20type poolLocal struct {
21    poolLocalInternal
22
23    // Prevents false sharing on widespread platforms with
24    // 128 mod (cache line size) = 0 .
25    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
26}

由注释我们也可以看出,其中的local成员的真实类型是一个poolLocal数组,而localSize是数组长度,poolLocal是真正保存数据的地方。private保存了一个临时对象,shared是保存临时对象的数组,而从private和shared的注释我们也可以看出,一个是属于特定的P私有的,一个是属于所有的P,至于这个P是什么,可以自行参考golang的调度模型,后期我也会专门写一篇相关的博客。其次,Pool是给每个线程分配了一个poolLocal对象,就是说local数组的长度,就是工作线程的数量(size := runtime.GOMAXPROCS(0))。当多线程在并发读写的时候,通常情况下都是在自己线程的poolLocal中存取数据,而只有当自己线程的poolLocal中没有数据时,才会尝试加锁去其他线程的poolLocal中“偷”数据。

我们可以看看Get函数,源码如下:

代码语言:javascript
复制
 1func (p *Pool) Get() interface{} {
 2    if race.Enabled {
 3        race.Disable()
 4    }
 5    l := p.pin()
 6    x := l.private
 7    l.private = nil
 8    runtime_procUnpin()
 9    if x == nil {
10        l.Lock()
11        last := len(l.shared) - 1
12        if last >= 0 {
13            x = l.shared[last]
14            l.shared = l.shared[:last]
15        }
16        l.Unlock()
17        if x == nil {
18            x = p.getSlow()
19        }
20    }
21    if race.Enabled {
22        race.Enable()
23        if x != nil {
24            race.Acquire(poolRaceAddr(x))
25        }
26    }
27    if x == nil && p.New != nil {
28        x = p.New()
29    }
30    return x
31}

这个函数的源码并不难读,在调用Get的时候首先会先在local数组中获取当前线程对应的poolLocal对象,然后再从poolLocal对象中获取private中的数据,如果private中有数据,则取出来直接返回。如果没有则先锁住shared,然后从shared中取出数据后直接返回,如果还是没有则调用getSlow函数。那么为什么这里要锁住shared呢?答案我们可以在getSlow中找到,因为当shared中没有数据的时候,会尝试去其他的poolLocal的shared中偷数据。

代码语言:javascript
复制
 1    // See the comment in pin regarding ordering of the loads.
 2    size := atomic.LoadUintptr(&p.localSize) // load-acquire
 3    local := p.local                         // load-consume
 4    // Try to steal one element from other procs.
 5    pid := runtime_procPin()
 6    runtime_procUnpin()
 7    for i := 0; i < int(size); i++ {
 8        l := indexLocal(local, (pid+i+1)%int(size))
 9        l.Lock()
10        last := len(l.shared) - 1
11        if last >= 0 {
12            x = l.shared[last]
13            l.shared = l.shared[:last]
14            l.Unlock()
15            break
16        }
17        l.Unlock()
18    }
19    return x

tip:该项目的源码(包含数据库的增删查改的demo)可以查看 源代码

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-04-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Golang语言社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档