前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊kingbus的membership_handler.go

聊聊kingbus的membership_handler.go

原创
作者头像
code4it
修改2020-06-17 10:18:40
3180
修改2020-06-17 10:18:40
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下kingbus的membership_handler.go

GetMembers

kingbus/api/membership_handler.go

代码语言:javascript
复制
//GetMembers implements get information of membership, not include lead information
func (h *MembershipHandler) GetMembers(echoCtx echo.Context) error {
    members := h.cluster.Members()
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
}
  • GetMembers方法主要是通过h.cluster.Members()获取members,然后json化

AddMember

kingbus/api/membership_handler.go

代码语言:javascript
复制
//AddMember implements add a member into raft cluster
func (h *MembershipHandler) AddMember(echoCtx echo.Context) error {
    args := struct {
        NodeName string `json:"name"`
        PeerURL  string `json:"peer_url"`
        AdminURL string `json:"admin_url"`
    }{}
    err := echoCtx.Bind(&args)
    if err != nil {
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
​
    isLeader := h.svr.IsLeader()
    if isLeader == false {
        req, err := json.Marshal(args)
        if err != nil {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        resp, err := h.sendToLeader("POST", req)
        if err != nil {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
    }
​
    ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
    defer cancel()
​
    peerURLs, err := types.NewURLs([]string{args.PeerURL})
    if err != nil {
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
    adminURLs, err := types.NewURLs([]string{args.AdminURL})
    if err != nil {
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
    now := time.Now()
​
    member := membership.NewMember(args.NodeName, peerURLs, adminURLs, &(now))
    members, err := h.svr.AddMember(ctx, *member)
    switch {
    case err == membership.ErrIDExists || err == membership.ErrPeerURLexists:
        return echoCtx.JSON(http.StatusConflict, utils.NewResp().SetError(err.Error()))
    case err != nil:
        log.Log.Errorf("error adding member %s (%v)", member.ID, err)
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
​
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
}
  • AddMember方法先通过h.svr.IsLeader()判断isLeader,为false的话通过h.sendToLeader("POST", req)转发给leader,为true的话则通过membership.NewMember创建member,然后通过h.svr.AddMember来添加member

UpdateMember

kingbus/api/membership_handler.go

代码语言:javascript
复制
//UpdateMember implements update member information
func (h *MembershipHandler) UpdateMember(echoCtx echo.Context) error {
    args := struct {
        NodeName   string `json:"name"`
        NewPeerURL string `json:"new_peer_url"`
    }{}
    err := echoCtx.Bind(&args)
    if err != nil {
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
​
    isLeader := h.svr.IsLeader()
    if isLeader == false {
        req, err := json.Marshal(args)
        if err != nil {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        resp, err := h.sendToLeader("PUT", req)
        if err != nil {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
    }
​
    ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
    defer cancel()
​
    m := h.cluster.MemberByName(args.NodeName)
    if m == nil {
        err = fmt.Errorf("no such member: %s", args.NodeName)
        return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(err.Error()))
    }
​
    newMember := membership.Member{
        ID:             m.ID,
        RaftAttributes: membership.RaftAttributes{PeerURLs: []string{args.NewPeerURL}},
    }
    members, err := h.svr.UpdateMember(ctx, newMember)
    switch {
    case err == membership.ErrPeerURLexists:
        return echoCtx.JSON(http.StatusConflict, utils.NewResp().SetError(err.Error()))
    case err == membership.ErrIDNotFound:
        err = fmt.Errorf("no such member: %s", args.NodeName)
        return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(err.Error()))
    case err != nil:
        log.Log.Errorf("error updating member %s (%v)", m.ID, err)
        echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    default:
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
    }
    return nil
}
  • UpdateMember方法先通过h.svr.IsLeader()判断isLeader,为false的话通过h.sendToLeader("PUT", req)转发给leader,为true的话则通过h.svr.UpdateMember来更新member

DeleteMember

kingbus/api/membership_handler.go

代码语言:javascript
复制
//DeleteMember implements remove a member from raft cluster
func (h *MembershipHandler) DeleteMember(echoCtx echo.Context) error {
    args := struct {
        NodeName string `json:"name"`
        PeerURL  string `json:"peer_url"`
    }{}
    err := echoCtx.Bind(&args)
    if err != nil {
        return echoCtx.JSON(http.StatusForbidden, err.Error())
    }
​
    isLeader := h.svr.IsLeader()
    if isLeader == false {
        req, err := json.Marshal(args)
        if err != nil {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        resp, err := h.sendToLeader("DELETE", req)
        if err != nil {
            return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))
    }
​
    ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
    defer cancel()
​
    m := h.cluster.MemberByName(args.NodeName)
    if m == nil {
        msg := fmt.Sprintf("No such member: %s", args.NodeName)
        return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(msg))
    }
​
    log.Log.Debugf("DeleteMember:remove member id is %s", m.ID.String())
​
    members, err := h.svr.RemoveMember(ctx, uint64(m.ID))
    switch {
    case err == membership.ErrIDRemoved:
        msg := fmt.Sprintf("Member permanently removed: %s", args.NodeName)
        return echoCtx.JSON(http.StatusGone, utils.NewResp().SetError(msg))
    case err == membership.ErrIDNotFound:
        msg := fmt.Sprintf("No such member: %s", args.NodeName)
        return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(msg))
    case err != nil:
        log.Log.Errorf("error removing member %s (%v)", args.NodeName, err)
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))
}
  • DeleteMember方法先通过h.svr.IsLeader()判断isLeader,为false的话通过h.sendToLeader("DELETE", req)转发给leader,为true的话则通过h.svr.RemoveMember来删除member

GetCluster

kingbus/api/membership_handler.go

代码语言:javascript
复制
//GetCluster implements get information of raft cluster
func (h *MembershipHandler) GetCluster(echoCtx echo.Context) error {
    members := h.cluster.Members()
    roles := make([]*Role, 0, len(members))
    for _, m := range members {
        r := new(Role)
        r.ID = fmt.Sprintf("%x", uint64(m.ID))
        r.RaftAttributes = m.RaftAttributes
        r.Attributes = m.Attributes
        if uint64(m.ID) == uint64(h.svr.Leader()) {
            r.IsLeader = true
        } else {
            r.IsLeader = false
        }
        roles = append(roles, r)
    }
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(roles))
}
  • GetCluster方法通过h.cluster.Members()获取members,然后遍历members构造roles信息,最后json化返回

UpdateAdminURL

kingbus/api/membership_handler.go

代码语言:javascript
复制
//UpdateAdminURL implements update raft node admin url in raft cluster
func (h *MembershipHandler) UpdateAdminURL(echoCtx echo.Context) error {
    var attributes config.Attributes
    if h.svr.IsLeader() == false {
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(ErrNotLeader.Error()))
    }
    err := echoCtx.Bind(&attributes)
    if err != nil {
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
    data, err := attributes.EncodeWithType()
    if err != nil {
        log.Log.Errorf("attributes EncodeWithType error,err:%v,attributes:%v", err, attributes)
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
​
    err = h.svr.Propose(data)
    if err != nil {
        log.Log.Errorf("Propose attributes error,err:%s,attributes:%v", err, attributes)
        return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
    }
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}
  • UpdateAdminURL方法在h.svr.IsLeader()为false时直接返回error;之后主要执行h.svr.Propose(data)

小结

membership_handler.go提供了GetMembers、AddMember、UpdateMember、DeleteMember、GetCluster、UpdateAdminURL方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • GetMembers
  • AddMember
  • UpdateMember
  • DeleteMember
  • GetCluster
  • UpdateAdminURL
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档