前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KubeEdge - edgecore servicebus模块源码分析

KubeEdge - edgecore servicebus模块源码分析

作者头像
有点技术
发布2020-07-14 14:57:51
9900
发布2020-07-14 14:57:51
举报
文章被收录于专栏:有点技术有点技术

servicebus

ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求, 与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。

代码逻辑

servicebus的功能比较简单

代码语言:javascript
复制
func (sb *servicebus) Start(c *beehiveContext.Context) {    // no need to call TopicInit now, we have fixed topic    var ctx context.Context    sb.context = c    ctx, sb.cancel = context.WithCancel(context.Background())    var htc = new(http.Client)    htc.Timeout = time.Second * 10
    var uc = new(util.URLClient)    uc.Client = htc
    //Get message from channel    for {        select {        case <-ctx.Done():            klog.Warning("ServiceBus stop")            return        default:
        }        msg, err := sb.context.Receive("servicebus")        if err != nil {            klog.Warningf("servicebus receive msg error %v", err)            continue        }        go func() {            klog.Infof("ServiceBus receive msg")            source := msg.GetSource()            if source != sourceType {                return            }            resource := msg.GetResource()            r := strings.Split(resource, ":")            if len(r) != 2 {                m := "the format of resource " + resource + " is incorrect"                klog.Warningf(m)                code := http.StatusBadRequest                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {                    sb.context.SendToGroup(modules.HubGroup, response)                }                return            }            content, err := json.Marshal(msg.GetContent())            if err != nil {                klog.Errorf("marshall message content failed %v", err)                m := "error to marshal request msg content"                code := http.StatusBadRequest                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {                    sb.context.SendToGroup(modules.HubGroup, response)                }                return            }            var httpRequest util.HTTPRequest            if err := json.Unmarshal(content, &httpRequest); err != nil {                m := "error to parse http request"                code := http.StatusBadRequest                klog.Errorf(m, err)                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {                    sb.context.SendToGroup(modules.HubGroup, response)                }                return            }            operation := msg.GetOperation()            targetURL := "http://127.0.0.1:" + r[0] + "/" + r[1]            resp, err := uc.HTTPDo(operation, targetURL, httpRequest.Header, httpRequest.Body)            if err != nil {                m := "error to call service"                code := http.StatusNotFound                klog.Errorf(m, err)                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {                    sb.context.SendToGroup(modules.HubGroup, response)                }                return            }            resp.Body = http.MaxBytesReader(nil, resp.Body, maxBodySize)            resBody, err := ioutil.ReadAll(resp.Body)            if err != nil {                if err.Error() == "http: request body too large" {                    err = fmt.Errorf("response body too large")                }                m := "error to receive response, err: " + err.Error()                code := http.StatusInternalServerError                klog.Errorf(m, err)                if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {                    sb.context.SendToGroup(modules.HubGroup, response)                }                return            }
            response := util.HTTPResponse{Header: resp.Header, StatusCode: resp.StatusCode, Body: resBody}            responseMsg := model.NewMessage(msg.GetID())            responseMsg.Content = response            responseMsg.SetRoute("servicebus", modules.UserGroup)            sb.context.SendToGroup(modules.HubGroup, *responseMsg)        }()    }}

根据代码分为以下步骤:

  • 拿到回传的beehiveContext,初始化http连接,设置超时十秒
  • 初始化URLClient
  • 从beehiveContext 里面接收接收servicebus的消息,获取消息来源,来源必须是router_rest
  • 然后获取消息包含的resource,根据代码可以看出resource是一个以冒号分割的字符串,根据后续代码可以知道,实际上就是 port:url
  • 获取对应的消息内容,该内容是一个json,最终被反序列化为
代码语言:javascript
复制
type HTTPRequest struct {    Header http.Header `json:"header"`    Body   []byte      `json:"body"`}
  • 获取动作 - 即http的 get/post/put...
  • 发出请求接收返回,判断返回数据量大小,最大为 5*1e6个字节,超过就报错
  • 以接收到的消息ID作为父ID通过返回数据给edgehub
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有点技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • servicebus
  • 代码逻辑
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档