前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go-protobuf, go-grpc-gateway和代码生成

go-protobuf, go-grpc-gateway和代码生成

原创
作者头像
王磊-字节跳动
修改2019-07-28 21:36:59
3.1K0
修改2019-07-28 21:36:59
举报
文章被收录于专栏:01ZOO01ZOO

代码生成

代码生成是一种常用的生产效率技术。广义上看,编译器通过高级语言生产出低级语言或者机器码,也可以理解为一种代码生成。这种技术在现代的工程实践里往往比较常见:IDE通常自带了一些常见的单元测试生成工具;根据特定的snippet可以生成比较常用的代码片段;在go语言中,由于目前缺乏对范型对支持,为了节约重复代码,通常实现了类似技术也是使用代码生成。

在protobuf生态中,代码生成更为常见,一般来说通过一个proto文件,protoc工具可以生成各个语言的代码,用于搭建一个基于protobuf或者grpc的工具。protoc同时支持以插件等方式,对proto文件进行拓展,生成丰富的代码格式。

代码生成通常第一步是分析生成模板或者DSL文件的语法结构,第二步采用字符串拼凑或者模板替换的方式生成代码。

golang/protobuf

问题: 1. 代码生成怎么做? 2. 如何实现一个类似的parser

golang/protobuf 是golang对protobuf对支持对官方实现,用于从proto文件生成对应对go版本代码文件.

入口在protoc-gen-go/main.go, 本质是显示protoc对一个插件,而protoc对于插件对实现比较直接,protoc会按照protobuf的相关定义解析protoc文件,然后把解析的结果传入插件的stdin, 然后从插件的stdout获取生成文件的所有信息,写文件,完成生成。

代码结构

两个部分:

  1. 实现 protoc的 protocol compiler plugin,作用是从proto文件生成go文件,通过这些go文件,可以读取、操作proto buffer里的内容
  2. A Library:实现了 encoding (marshaling), decoding (unmarshaling), and accessing protocol buffers.
代码语言:txt
复制
├── descriptor: 
├── jsonpb: 
├── proto: library部分
├── protoc-gen-go: 插件部分
├── ptypes: protobuf里面的各种类型

proto: library部分

重要的结构和函数

代码语言:txt
复制
// Message is implemented by generated protocol buffer messages.
// Message 代表生成的数据结构,
// lib.go
type Message interface {
	Reset()         // 重置为0值结构体
	String() string // string格式的pb内容,做了unmarshal
	ProtoMessage()  // 没有实现
}

// 内部使用了一个Buffer做数据存储,以marshal\unmarshal
// lib.go
type Buffer struct {
	buf   []byte // encode/decode byte stream
	index int    // read point

	deterministic bool // deterministic 模式(尽力)same message =》 same bytes
}

// varint-encoded integer =>  int32, int64, uint32, uint64, bool, and enum protocol buffer types.
// 类似的函数还有  DecodeFixed64, DecodeFixed32, DecodeZigzag64, DecodeRawBytes, DecodeStringBytes...

// 和数据格式设计有关:参考 https://blog.csdn.net/erlib/article/details/46345111
// 里面比较关键的设计有: 
// 1. 基于128bits的数值存储方式(Base 128 Varints):每块数据由接连的若干个字节表示(小的数据用1个字节就可以表示),每个字节最高位标识本块数据是否结束(1:未结束,0:结束),低7位表示数据内容。
// 2. 基于序号的协议字段映射 (类似key-value结构)序列号是key,很关键
// 3. 基于无符号数的带符号数表示(ZigZag 编码)
// 4. 协议数据结构 data1_head + data1 ... data表示一个数据如int  0yyyyxxx x: 数据类型 y: 字段序号
// decode.go
func (p *Buffer) DecodeVarint() (x uint64, err error) 
代码语言:txt
复制
// tag 指的是 字段序号
type tagMap struct {
	fastTags []int // 优化,默认为-1,如果对应index 值>=0,那么值就是一个字段序号
	slowTags map[int]int
}

// 代表结构体struct
// StructProperties represents properties for all the fields of a struct.
// decoderTags and decoderOrigNames should only be used by the decoder.
type StructProperties struct {
	Prop             []*Properties  // properties for each field
	reqCount         int            // required count
	decoderTags      tagMap         // map from proto tag to struct field number
	decoderOrigNames map[string]int // map from original name to struct field number
	order            []int          // list of struct field numbers in tag order

	// OneofTypes contains information about the oneof fields in this message.
	// It is keyed by the original name of a field.
	OneofTypes map[string]*OneofProperties
}


// 代表结构体的一个field
// Properties represents the protocol-specific behavior of a single struct field.
// 例如一个字段转化成go加上了 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` 这里面的就是Properties
// 有Parse函数能解析,String函数生成, 定义proto => 生成 => go结构体 => 解析 => 数据到二进制
type Properties struct {
	Name     string // name of the field, for error messages
	OrigName string // original name before protocol compiler (always set)
	JSONName string // name to use for JSON; determined by protoc
	Wire     string
	WireType int
	Tag      int
	Required bool
	Optional bool
	Repeated bool
	Packed   bool   // relevant for repeated primitives only
	Enum     string // set for enum types only
	proto3   bool   // whether this is known to be a proto3 field
	oneof    bool   // whether this is a oneof field

	Default    string // default value
	HasDefault bool   // whether an explicit default was provided

	stype reflect.Type      // set for struct types only
	sprop *StructProperties // set for struct types only

	mtype      reflect.Type // set for map types only
	MapKeyProp *Properties  // set for map types only
	MapValProp *Properties  // set for map types only
}
代码语言:txt
复制
// 生成的代码里面使用这个结构做如 Unmarshal/Marshal/Merge等等操作
// 一个例子: xxx_messageInfo_ServiceConfig 就是InternalMessageInfo类型 
//     func (m *S) XXX_Unmarshal(b []byte) error {
//     	return xxx_messageInfo_S.Unmarshal(m, b)
//     }
// InternalMessageInfo is a type used internally by generated .pb.go files.
// This type is not intended to be used by non-generated code.
// This type is not subject to any compatibility guarantee.
type InternalMessageInfo struct {
	marshal   *marshalInfo
	unmarshal *unmarshalInfo
	merge     *mergeInfo
	discard   *discardInfo
}


// table_unmarshal.go
// Unmarshal is the entry point from the generated .pb.go files.
// This function is not intended to be used by non-generated code.
// This function is not subject to any compatibility guarantee.
// msg contains a pointer to a protocol buffer struct.
// b is the data to be unmarshaled into the protocol buffer.
// a is a pointer to a place to store cached unmarshal information.
func (a *InternalMessageInfo) Unmarshal(msg Message, b []byte) error {
	// Load the unmarshal information for this message type.
	// The atomic load ensures memory consistency.
	u := atomicLoadUnmarshalInfo(&a.unmarshal)
	if u == nil {
		// Slow path: find unmarshal info for msg, update a with it.
		u = getUnmarshalInfo(reflect.TypeOf(msg).Elem())
		atomicStoreUnmarshalInfo(&a.unmarshal, u)
	}
	// Then do the unmarshaling.
	err := u.unmarshal(toPointer(&msg), b)
	return err
}


// 一个go结构 UnMarshal => InternalMessageInfo.UnMarshal => unmarshalInfo.UnMarshal (类型specific, 有cache)
// => 根据field,tag,对不同类型有 unmarshalFieldInfo.unmarshaler 用对应的 unmarshaler做unmarshal
// => decodeVarint (主要就用这个函数,因为各种数字都是这个函数处理,string类型直接读buf)
// 这里的decode相关函数和deocode.go里面的不一样。这里会写到 go结构体的field里面去(pointer)
// decode.go里面的函数可以作为lib 用来做debug
代码语言:txt
复制
// text_parser.go
// text.go 是解析文本格式的protobuf的方法,并不是parse .proto文件

proto-gen-go 插件部分

grpc

代码语言:txt
复制
// grpc/grpc.go
// proto-gen-go 的plugin,生成go结构体的同时,生成grpc的代码,server/client, 以及需要用户实现的interface
// 生成的方式是文本拼接,可读性很差,没有template
// 比如下面的例子
func (g *grpc) generateClientMethod(servName, fullServName, serviceDescVar string, method *pb.MethodDescriptorProto, descExpr string) {
	// ....
	g.P("func (c *", unexport(servName), "Client) ", g.generateClientSignature(servName, method), "{")
	if !method.GetServerStreaming() && !method.GetClientStreaming() {
		g.P("out := new(", outType, ")")
		// TODO: Pass descExpr to Invoke.
		g.P(`err := c.cc.Invoke(ctx, "`, sname, `", in, out, opts...)`)
		g.P("if err != nil { return nil, err }")
		g.P("return out, nil")
		g.P("}")
		g.P()
		return
	}
    // ....
}

plugin

protoc所有插件拿到的就是一个CodeGeneratorRequest的protobuf二进制结构, 插件返回 CodeGeneratorResponse结构

代码语言:txt
复制
// The version number of protocol compiler.
message Version {
  optional int32 major = 1;
  optional int32 minor = 2;
  optional int32 patch = 3;
  // A suffix for alpha, beta or rc release, e.g., "alpha-1", "rc2". It should
  // be empty for mainline stable releases.
  optional string suffix = 4;
}

// An encoded CodeGeneratorRequest is written to the plugin's stdin.
message CodeGeneratorRequest {
  // 所有的proto文件
  repeated string file_to_generate = 1;

  // 插件运行参数
  optional string parameter = 2;

  // proto文件解析后的描述结构,带了所有proto文件的信息,如有多少结构、函数定义,结构有多少字段,每个字段定义等
  repeated FileDescriptorProto proto_file = 15;

  // The version number of protocol compiler.
  optional Version compiler_version = 3;

}

// The plugin writes an encoded CodeGeneratorResponse to stdout.
message CodeGeneratorResponse {
  // Error message.  If non-empty, code generation failed.  T
  optional string error = 1;

  // Represents a single generated file.
  message File {
    // 生成的文件名
    optional string name = 1;

    // 表示文件已经存在,内容插入到文件的一个位置去
    optional string insertion_point = 2;

    // 文件内容
    optional string content = 15;
  }
  repeated File file = 15;
}

FileDescriptorProto文件由descriptor.proto定义,这个是各个语言生成工具的基础。

代码语言:txt
复制
// Describes a complete .proto file.
message FileDescriptorProto {
  optional string name = 1;       // file name, relative to root of source tree
  optional string package = 2;    // e.g. "foo", "foo.bar", etc.
  repeated string dependency = 3;
  repeated int32 public_dependency = 10;.
  repeated int32 weak_dependency = 11;

  // All top-level definitions in this file.
  repeated DescriptorProto message_type = 4;
  repeated EnumDescriptorProto enum_type = 5;
  repeated ServiceDescriptorProto service = 6;
  repeated FieldDescriptorProto extension = 7;

  optional FileOptions options = 8;
  optional SourceCodeInfo source_code_info = 9;
  optional string syntax = 12;
}


...
...

生成的主要函数在protoc-gen-go/generator/generator.go, 生成方式采用字符串拼凑的方式,字符串拼凑的方式较为灵活,但是可读性比较差。

代码语言:txt
复制
// Fill the response protocol buffer with the generated output for all the files we're
// supposed to generate.
func (g *Generator) generate(file *FileDescriptor) {
	//...

	g.P("// This is a compile-time assertion to ensure that this generated file")
	g.P("// is compatible with the proto package it is being compiled against.")
	g.P("// A compilation error at this line likely means your copy of the")
	g.P("// proto package needs to be updated.")
	g.P("const _ = ", g.Pkg["proto"], ".ProtoPackageIsVersion", generatedCodeVersion, " // please upgrade the proto package")
	g.P()

	for _, td := range g.file.imp {
		g.generateImported(td)
	}
	for _, enum := range g.file.enum {
		g.generateEnum(enum)
	}
	for _, desc := range g.file.desc {
		// Don't generate virtual messages for maps.
		if desc.GetOptions().GetMapEntry() {
			continue
		}
		g.generateMessage(desc)
	}
	for _, ext := range g.file.ext {
		g.generateExtension(ext)
	}
	g.generateInitFunction()
	g.generateFileDescriptor(file)

	// Run the plugins before the imports so we know which imports are necessary.
	g.runPlugins(file)

	// Generate header and imports last, though they appear first in the output.
	rem := g.Buffer
	remAnno := g.annotations
	g.Buffer = new(bytes.Buffer)
	g.annotations = nil
	g.generateHeader()
	g.generateImports()
	// ...
	g.Write(rem.Bytes())

	// Reformat generated code and patch annotation locations.
	fset := token.NewFileSet()
	// ...
	fileAST, err := parser.ParseFile(fset, "", original, parser.ParseComments)
    // ...
	ast.SortImports(fset, fileAST)
    // ...
	if g.annotateCode {
		...
	}
}

generateMessage 用于生成类型,已经对应的函数等内容

代码语言:txt
复制
// Generate the type, methods and default constant definitions for this Descriptor.
func (g *Generator) generateMessage(message *Descriptor) {
}

// 对于一个
message Book {
  string title = 1;
  bytes raw_data = 2;
}

// 会生成
type Book struct {
	Title                string   `protobuf:"bytes,1,opt,name=title,proto3" json:"title,omitempty"`
	RawData              []byte   `protobuf:"bytes,2,opt,name=raw_data,json=rawData,proto3" json:"raw_data,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (m *Book) Reset()         { *m = Book{} }
func (m *Book) String() string { return proto.CompactTextString(m) }
func (*Book) ProtoMessage()    {}
func (*Book) Descriptor() ([]byte, []int) {
	return fileDescriptor_ab04eb4084a521db, []int{1}
}

func (m *Book) XXX_Unmarshal(b []byte) error {
	return xxx_messageInfo_Book.Unmarshal(m, b)
}
func (m *Book) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
	return xxx_messageInfo_Book.Marshal(b, m, deterministic)
}
func (m *Book) XXX_Merge(src proto.Message) {
	xxx_messageInfo_Book.Merge(m, src)
}
func (m *Book) XXX_Size() int {
	return xxx_messageInfo_Book.Size(m)
}
func (m *Book) XXX_DiscardUnknown() {
	xxx_messageInfo_Book.DiscardUnknown(m)
}

var xxx_messageInfo_Book proto.InternalMessageInfo

func (m *Book) GetTitle() string {
	if m != nil {
		return m.Title
	}
	return ""
}

func (m *Book) GetRawData() []byte {
	if m != nil {
		return m.RawData
	}
	return nil
}

对于一个proto.go文件,不足以完成所有Unmarshal, Marshal功能,生成的proto.go文件里面会引用 github.com/golang/protobuf/proto, 作为一个go语言的proto功能库,配合完成如Unmarshal, Marshal功的功能,对于grpc的支持更是如此

代码语言:txt
复制
proto.plugin -> go文件 引用 -> proto/lib

grpc-ecosystem/grpc-gateway

grpc-gateway是protoc的另一个插件,同时他对protobuf对描述也做了拓展,用于生成rest风格对http函数和server,代码中建http请求打包转发给grpc server,再将返回解包成json格式,完成对http对支持。通过这个插件,可以只需要添加很少量对代码,给一个gprc服务添加http支持。

和golang/protobuf类似,代码主要分成两个部分,plugin部分,用于生成代码;library部分,用于生成代码使用,完成较为复杂的功能

plugin部分:protoc-gen-grpc-gateway

和golang/protobuf不同,protoc-gen-grpc-gateway使用了模版来生成代码,这样的好处是可读性,可修改性会高很多,通过一种或者多种模版,对应解析出来的语法结构定义中的变量,渲染出生成代码。

protoc-gen-grpc-gateway/gengateway

比较重要的逻辑在generator.go

代码语言:txt
复制
// 这是核心的生成输入文件的语法结构定义,可以看出和golang/protobuf是类似的,无非是protoc解析完成的proto语法结构
// File wraps descriptor.FileDescriptorProto for richer features.
type File struct {
	*descriptor.FileDescriptorProto
	// GoPkg is the go package of the go file generated from this file..
	GoPkg GoPackage
	// Messages is the list of messages defined in this file.
	Messages []*Message
	// Enums is the list of enums defined in this file.
	Enums []*Enum
	// Services is the list of services defined in this file.
	Services []*Service
}


// 核心处理函数,可以看出主要由三个template完成 headerTemplate; handlerTemplate; trailerTemplate
func applyTemplate(p param, reg *descriptor.Registry) (string, error) {
	w := bytes.NewBuffer(nil)
	if err := headerTemplate.Execute(w, p); err != nil {
		return "", err
	}
	//...
	for _, svc := range p.Services {
		for _, meth := range svc.Methods {

			for _, b := range meth.Bindings {
				methodWithBindingsSeen = true
				if err := handlerTemplate.Execute(w, binding{
					//...
			}
		}
    ...
	if err := trailerTemplate.Execute(w, tp); err != nil {
		return "", err
	}
	return w.String(), nil
}

// headerTemplate 和 handlerTemplate的部分内容
	headerTemplate = template.Must(template.New("header").Parse(`
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: {{.GetName}}

/*
Package {{.GoPkg.Name}} is a reverse proxy.

It translates gRPC into RESTful JSON APIs.
*/
package {{.GoPkg.Name}}
import (
	{{range $i := .Imports}}{{if $i.Standard}}{{$i | printf "%s\n"}}{{end}}{{end}}

	{{range $i := .Imports}}{{if not $i.Standard}}{{$i | printf "%s\n"}}{{end}}{{end}}
)

var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
`))

	handlerTemplate = template.Must(template.New("handler").Parse(`
{{if and .Method.GetClientStreaming .Method.GetServerStreaming}}
{{template "bidi-streaming-request-func" .}}
{{else if .Method.GetClientStreaming}}
{{template "client-streaming-request-func" .}}
{{else}}
{{template "client-rpc-request-func" .}}
{{end}}
`))

实践

实战代码:https://github.com/u2takey/mq-gateway

假设现在有一个类似go-grpc-gateway的需求,不同的是,输入是一个消息队列,要求生成的代码完成这样的功能:从消息队列取数据 -> 根据不同的topic/service发送给不同的grpc server -> 处理完成之后返回给消息队列

为了简单,沿用go-grpc-gateway对http handler的定义,如'post /hello' 只是不把他作为一个http path,而是当成一个mq topic, 比如post /hello 对于的topic 即post_/hello, 返回数据的topic为 post_/hello_out

library部分:runtime

和grpc-gateway不同,runtime的定义变成处理mq相关的逻辑,比如定义、注册handler处理不同的topic,转发给对应的grpc client

代码语言:txt
复制
func NewServeMux(amqpURI, exchangeName, exchangeType string) *ServeMux {
	serveMux := &ServeMux{
		exchangeType: exchangeType,
		exchangeName: exchangeName,
		handlers:     make(map[string]handler),
		mqClient:     newMqclient(amqpURI, exchangeName, exchangeType),
	}

	return serveMux
}

// handler注册函数,注册一个h, 接收一个queue的mq消息,经过hanlterfunc处理后publish给queue
func (s *ServeMux) Handle(queueName string, h HandlerFunc) {
	handle := handler{queue: queueName, h: h}
	s.handlers[queueName] = handle

	log.Printf("declaring Queue %q", queueName)
	queue, err := s.mqClient.channel.QueueDeclare(
	//...

	log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange",
		queue.Name, queue.Messages, queue.Consumers)

	queue_out, err := s.mqClient.channel.QueueDeclare(
		//...

	if err = s.mqClient.channel.QueueBind(
		//..

	if err = s.mqClient.channel.QueueBind(
		//...

	deliveries, err := s.mqClient.channel.Consume(
		//...
	)
	if err != nil {
		log.Panic(err)
	}

	go s.consume(deliveries, handle)

	return
}


func (s *ServeMux) consume(deliveries <-chan amqp.Delivery, handle handler) {
	for d := range deliveries {
		out := handle.h(d.Body)
		s.publish(handle.queue, out)
		_ = d.Ack(false)
	}
	log.Printf("handle: deliveries channel closed")
}

plugin部分:protoc-gen-mq-gateway

可以使用大部分protoc-gen-grpc-gateway的的生成逻辑,只要修改生成的template即可, 修改调其中耦合http的逻辑,输入输出都使用[]byte,为了简单,我这里省略的大量元数据、以及错误处理的逻辑.

使用

proto定义, 定义了一个echo service,希望实现的逻辑是根据echo请求,返回一个结果,输入返回都通过mq完成

代码语言:txt
复制
message EchoRequest {
    // common
    string Hello = 1;
    repeated string Names = 2;
}

message EchoResponse {
    string Hello = 1;
}

service EchoService {
    rpc echo(EchoRequest) returns (EchoResponse) {
        option (google.api.http) = {
          post : "/v1/echo"
          body : "*"
        };
    }
}

protoc -I/usr/local/include -I. \
		-I$(GOPATH)/src \
		-I$(GOPATH)/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
		--mq-gateway_out=logtostderr=true:. \
		./simple.proto%

生成代码:

代码语言:txt
复制
package main

import (
	"bytes"
	"context"
	"io"

	"github.com/golang/protobuf/proto"
	"github.com/u2takey/mq-gateway/runtime"
	"github.com/u2takey/mq-gateway/utilities"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/grpclog"
	"google.golang.org/grpc/status"
)

var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray

func request_EchoService_Echo_0(ctx context.Context, marshaler runtime.Marshaler, client EchoServiceClient, req []byte, pathParams map[string]string) (proto.Message, error) {
	var protoReq EchoRequest

	if err := marshaler.NewDecoder(bytes.NewReader(req)).Decode(&protoReq); err != nil && err != io.EOF {
		return nil, status.Errorf(codes.InvalidArgument, "%v", err)
	}

	msg, err := client.Echo(ctx, &protoReq)
	return msg, err

}


func RegisterEchoServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
	conn, err := grpc.Dial(endpoint, opts...)
	if err != nil {
		return err
	}
	defer func() {
		if err != nil {
			if cerr := conn.Close(); cerr != nil {
				grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
			}
			return
		}
		go func() {
			<-ctx.Done()
			if cerr := conn.Close(); cerr != nil {
				grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
			}
		}()
	}()

	return RegisterEchoServiceHandler(ctx, mux, conn)
}


func RegisterEchoServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
	return RegisterEchoServiceHandlerClient(ctx, mux, NewEchoServiceClient(conn))
}

func RegisterEchoServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client EchoServiceClient) error {

	mux.Handle("POST"+"_/v1/echo", func(r []byte) (out []byte) {
		inboundMarshaler, outboundMarshaler := &runtime.JSONPb{OrigName: true}, &runtime.JSONPb{OrigName: true}
		resp, _ := request_EchoService_Echo_0(ctx, inboundMarshaler, client, r, nil)
		out, _ = outboundMarshaler.Marshal(resp)
		return
	})

	return nil
}

组合成一个server

代码语言:txt
复制
package main

import (
	"context"

	"flag"
	"log"
	"net"

	"github.com/u2takey/mq-gateway/runtime"
	"google.golang.org/grpc"
)

var (
	uri          = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
	exchangeName = flag.String("exchange", "test-exchange", "Durable AMQP exchange name")
	exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
)

func main() {
	flag.Parse()

	rpcAddress := "localhost:9008"
	// Establish gateways for incoming HTTP requests.
	mux := runtime.NewServeMux(*uri, *exchangeName, *exchangeType)
	ctx := context.Background()
	dialOpts := []grpc.DialOption{grpc.WithInsecure()}
	err := RegisterEchoServiceHandlerFromEndpoint(ctx, mux, rpcAddress, dialOpts)
	if err != nil {
		log.Panic(err)
	}

	go mux.Start()

	runRPCServer(rpcAddress)
}

// RunRPCServer ...
func runRPCServer(rpcAddress string) {
	listen, err := net.Listen("tcp", rpcAddress)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	rpcs := grpc.NewServer()
	RegisterEchoServiceServer(rpcs, NewEchoService())
	err = rpcs.Serve(listen)
	if err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

// EchoService ...
type EchoService struct {
}

// NewEchoService ...
func NewEchoService() *EchoService {
	return &EchoService{}
}

// DescribeEvent ...
func (s *EchoService) Echo(ctx context.Context, req *EchoRequest) (res *EchoResponse, err error) {

	return &EchoResponse{
		Hello: req.GetHello() + " " + strings.Join(req.GetNames(), ","),
	}, nil
}

mq-http-gateway, 为了便于测试,创建一个http server这个server比较检查,接收请求,把请求放入mq,同时从mq中取结果,作为http请求结果返回

mq-http-gateway代码

代码语言:txt
复制
#!/usr/bin/python

import pika
from flask import Flask, request
import _thread
import queue
import uuid

app = Flask(__name__)

class Client:

    def __init__(self):
        // ...

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        self.response = body

    def call(self, body):
        self.response = None
        self.channel.basic_publish(...)
        while self.response is None:
            self.connection.process_data_events()
        self.connection.close()
        return self.response


@app.route('/v1/echo', methods=['POST'])
def echo():
    c = Client()
    return c.call(request.data)


if __name__ == '__main__':
    credentials = pika.PlainCredentials('root', 'root')
    //...
    channel.queue_declare(queue='POST_/v1/echo', durable=True)

    print("start app")
    app.run(debug=True)

测试

代码语言:txt
复制
# 启动server
➜  simple git:(master) ✗ ./simple -uri amqp://root:root@xxxx
2019/07/28 16:46:48 dialing "amqp://root:root@xxx
2019/07/28 16:46:48 got Connection, getting Channel
2019/07/28 16:46:48 got Channel, declaring Exchange ("test-exchange")
2019/07/28 16:46:48 declaring Queue "POST_/v1/echo"
2019/07/28 16:46:48 declared Queue ("POST_/v1/echo" 0 messages, 0 consumers), binding to Exchange
2019/07/28 16:46:48 declared Queue ("POST_/v1/echo_out" 0 messages, 0 consumers), binding to Exchange
2019/07/28 16:46:48 Starting Consume


# 在另一个console启动mq-gateway.py
➜  simple git:(master) ✗ python mq-gateway.py
start app

# 在另一个console测试,一切正常!
➜  simple git:(master) ✗ curl -H "Content-Type: application/json"  localhost:5000/v1/echo -d '{"Hello": "x", "Names":["a", "b"] }'
{"Hello":"x a,b"}%

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 代码生成
  • golang/protobuf
    • 代码结构
      • proto: library部分
        • proto-gen-go 插件部分
          • grpc
          • plugin
      • grpc-ecosystem/grpc-gateway
        • plugin部分:protoc-gen-grpc-gateway
          • protoc-gen-grpc-gateway/gengateway
      • 实践
        • library部分:runtime
          • plugin部分:protoc-gen-mq-gateway
            • 使用
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档