前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从围绕API到围绕数据-使用流式编程构建更简洁的架构

从围绕API到围绕数据-使用流式编程构建更简洁的架构

作者头像
超级大猪
发布2022-11-29 17:30:16
7730
发布2022-11-29 17:30:16
举报
文章被收录于专栏:大猪的笔记大猪的笔记

背景

在服务刚刚搭建时,通常的思维就是根据API编写业务逻辑:

// SendStream ... 
func (d *Svc) SendStream(stream MyApi_data.ProxyDialOut_SendStreamServer) error { 
 for { 
 ... 
        data, err := stream.Recv() 
 if err != nil { 
            logrus.Errorf("recv error:%v", err) 
 return err 
 } 
 ... 
 // 对data做相关的操作 
 } 
} 

在服务暴露出越来越多的API后,相似的操作会越来越多。此时会进行抽象和封装,提取公共操作,例如提取函数、建立工厂等。

比如,在已有的API中添加监控统计。虽然对统计器做了抽象(对象或者函数),但可能仍然需要侵入到所有不同的API实现中。

// SendStream ... 
func (d *MyApiSvc) SendStream(stream MyApi_data.ProxyDialOut_SendStreamServer) error { 
 for { 
 ... 
        data, err := stream.Recv() 
 if err != nil { 
            logrus.Errorf("recv error:%v", err) 
 return err 
 } 
 ... 
 // 对data做相关的操作 
 ... 
 // 添加一个共享的监控统计器,调用上报业务,每个api都需要改动 
        counter.Add("MyApi", 1) 
 } 
} 

这在简单项目中无可厚非,但长此以往,随着各种功能的加入,API的业务代码会迅速臃肿起来。

后续,会发现每个API都各不相同,却又有公共部分。所以不得不写出大量形容相似的代码。这在部门大部分项目中都屡见不鲜

究其原因,这是因为抽象层次不够造成的。

摒除以API为中心的编程模式

在网络编程中,一般会引入中间件(比如trpc的filter)来处理共有逻辑,比如鉴权,日志,panic处理等。

但中间件一般太过于抽象并不直观,使得编写调试不易。但它的思路值得借鉴。

在对业务进行思考后,突发奇想。虽然对客户端(用户)而言,每个API都是服务(消费者)。但对于具体处理而言,每个API同时也是生产者

每个API看成data source,生产数据(data),就是对api最底层的抽象。

在这里,引入一个简单的流式编程包go-streams(github.com/reugn/go-streams),方便快速建立流式编程的架构。

建立抽象:每个API都是datasource

每个api,都实现Source的接口,将自己收到的数据,无脑封装往下一跳怼

import "github.com/reugn/go-streams/extension" 
type Source interface{ 
 GetSource() *extension.ChanSource 
} 

实现抽象:为每个API服务都创建chan,这是数据源的本质

type MyApiSvc struct { 
    name     string 
    ctx      context.Context 
    ch       chan any // 就是它 
    protocol string 
} 
// GetSource 实现Source接口 
func (t *MyApiSvc) GetSource() *extension.ChanSource { 
 return extension.NewChanSource(t.ch) 
} 
type DataItem struct { 
    data    any 
    session map[string]any 
} 
// SendStream ... 
func (d *MyApiSvc) SendStream(stream MyApi_data.ProxyDialOut_SendStreamServer) error { 
 for { 
 ... 
        data, err := stream.Recv() 
 if err != nil { 
            logrus.Errorf("recv error:%v", err) 
 return err 
 } 
 ... 
 // 这里不对数据做任何处理,封装之后,直接丢到chan里 
        td := new(DataItem) 
        td.session = make(map[string]any) 
        td.session["ip"] = ip 
        td.session["trace_id"] = grand.S(8) 
        td.data = data 
        d.ch <- td 
 } 
} 

每个api的chango-streams封装为一个数据源ChanSource类型。

将各种API的原始数据封装为DataItem在流中统一处理,内置session是神来之笔。这个session会包含每条数据的个性化信息。可以由每个步骤增添并提供给下一步骤使用。

这样,在编写业务逻辑时就能站在更上层、数据的角度思考问题。

流式处理

在上面,每个数据源都已经被封装为一个ChanSource(本质是chan),现在来统一规划业务逻辑。

使用go-streams,将整个业务逻辑抽象成数据流的多个步骤:

image-20220926132816218
image-20220926132816218

此编程模式的特色之处在于:

  1. 每个步骤接收上一个节点的数据,处理之后,将数据发往下一跳。编写单一步骤的时候,只需要考虑本步骤处理的事情,思维量大大减少。
  2. 在单个步骤,处理是并发的,但在不同的步骤,处理是顺序的。
  3. 围绕数据编程,方便抽象施加统一的处理过程,比如getParser,getSender两个工厂函数。
  4. 可以任意的在节点间统一的新增其它的处理,不侵入已经编写好的业务逻辑。每个节点都有前驱和后继,拥有无限可能。没错,这就是面向切面编程
    source := getDataSource(ctx, cfg.Name) // cfg.Name == "MyApi",通过工厂函数载入配置,获得interface `Source` 
 // 调用接口 
    source.GetSource().Via(flow.NewMap(func(i interface{}) interface{} { // 步骤1,创建日志 
 // 从用户发来的每条消息都被打散成为了数据源的一条数据 
        msg := i.(model.*DataItem) 
        traceID := msg.GetSession()["trace_id"].(string) 
 // 从数据的session中获取数据的附加信息 
        tags := map[string]interface{}{ 
 "trace_id": traceID, 
 "ip":       msg.GetSession()["ip"], 
 "name":     c.Name, 
 } 
        log := logrus.WithFields(tags) 
 // 这个步骤只是为了添加一个日志对象 
 return []any{msg, log} 
 // 使用8个协程来执行这个步骤 
 }, 8)).Via(flow.NewMap(func(i interface{}) interface{} { // 步骤2,解析数据 
        arr := i.([]any) // 这里的i是上一步骤return的数据 
        msg := arr[0].(*DataItem) 
        log := arr[1].(*logrus.Entry) 
        parser := getParser(cfg.Name) // 这个工厂函数是每种数据源的个性化处理。根据配置获取一个解析器 
 // 解析数据 
        data, err := parser(ctx, msg, c.Name, msg.GetSession()["ip"]) 
 if err != nil { 
            log.Error(err) 
 return err 
 } 
 return []any{data, log} 
 }, 8)).Via(flow.NewMap(func(i interface{}) interface{} { // 步骤3,发送数据到下个服务 
        arr,ok := i.([]any) // 这里的i,就是上一步骤return的数据 
 if !ok{ 
 return i // 如果上一步骤return的是error,则直接跳过不再解析 
 } 
        data := arr[0].(*MyApiData) // 这里的data,已经是上一步骤解析出来的数据 
        log := arr[1].(*logrus.Entry) 
 // 发数数据 
        sender := getSender(cfg.Name) // 这个工厂函数为不同的数据源分配一个发送器 
        sender.Send(qdata) 
 return i 
 }, 8)).Via(flow.NewMap(func(i interface{}) interface{} { // 步骤4,统计发送成功的数据量 
        arr, ok := i.([]any) // 这里的i,就是上一步骤return的数据 
 if ok{ 
            msg := arr[0].(*DataItem) 
            log := arr[1].(*logrus.Entry) 
 // 内部统计 
            log.Info("send success") 
            controller.TraceAfter(msg.GetSession()["ip"]) 
 } 
 return i 
 }, 8)).To(extension.NewIgnoreSink()) 

为什么要使用go-streams

  1. 库非常的简单,实际就是对go chan的封装。简单是一种美,简单的东西一般不容易出错。
  2. 隐含了流式编程的主要思想,它并没有什么黑科技,但使用它会强制我们使用面向数据的,抽象的方式来思考问题。最终写出低耦合可调测的代码。这才是难能可贵的。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-09-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 摒除以API为中心的编程模式
    • 建立抽象:每个API都是datasource
      • 实现抽象:为每个API服务都创建chan,这是数据源的本质
      • 流式处理
      • 为什么要使用go-streams
      相关产品与服务
      消息队列 TDMQ
      消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档