ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求, 与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。
servicebus的功能比较简单
func (sb *servicebus) Start(c *beehiveContext.Context) { // no need to call TopicInit now, we have fixed topic var ctx context.Context sb.context = c ctx, sb.cancel = context.WithCancel(context.Background()) var htc = new(http.Client) htc.Timeout = time.Second * 10
var uc = new(util.URLClient) uc.Client = htc
//Get message from channel for { select { case <-ctx.Done(): klog.Warning("ServiceBus stop") return default:
} msg, err := sb.context.Receive("servicebus") if err != nil { klog.Warningf("servicebus receive msg error %v", err) continue } go func() { klog.Infof("ServiceBus receive msg") source := msg.GetSource() if source != sourceType { return } resource := msg.GetResource() r := strings.Split(resource, ":") if len(r) != 2 { m := "the format of resource " + resource + " is incorrect" klog.Warningf(m) code := http.StatusBadRequest if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil { sb.context.SendToGroup(modules.HubGroup, response) } return } content, err := json.Marshal(msg.GetContent()) if err != nil { klog.Errorf("marshall message content failed %v", err) m := "error to marshal request msg content" code := http.StatusBadRequest if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil { sb.context.SendToGroup(modules.HubGroup, response) } return } var httpRequest util.HTTPRequest if err := json.Unmarshal(content, &httpRequest); err != nil { m := "error to parse http request" code := http.StatusBadRequest klog.Errorf(m, err) if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil { sb.context.SendToGroup(modules.HubGroup, response) } return } operation := msg.GetOperation() targetURL := "http://127.0.0.1:" + r[0] + "/" + r[1] resp, err := uc.HTTPDo(operation, targetURL, httpRequest.Header, httpRequest.Body) if err != nil { m := "error to call service" code := http.StatusNotFound klog.Errorf(m, err) if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil { sb.context.SendToGroup(modules.HubGroup, response) } return } resp.Body = http.MaxBytesReader(nil, resp.Body, maxBodySize) resBody, err := ioutil.ReadAll(resp.Body) if err != nil { if err.Error() == "http: request body too large" { err = fmt.Errorf("response body too large") } m := "error to receive response, err: " + err.Error() code := http.StatusInternalServerError klog.Errorf(m, err) if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil { sb.context.SendToGroup(modules.HubGroup, response) } return }
response := util.HTTPResponse{Header: resp.Header, StatusCode: resp.StatusCode, Body: resBody} responseMsg := model.NewMessage(msg.GetID()) responseMsg.Content = response responseMsg.SetRoute("servicebus", modules.UserGroup) sb.context.SendToGroup(modules.HubGroup, *responseMsg) }() }}
根据代码分为以下步骤:
type HTTPRequest struct { Header http.Header `json:"header"` Body []byte `json:"body"`}