前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送

初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送

作者头像
为少
发布2021-05-27 19:11:04
1.5K0
发布2021-05-27 19:11:04
举报
文章被收录于专栏:黑客下午茶黑客下午茶

使用 SSE(Server-Sent Events) 进行 HTTP 服务器推送

这个示例是一个类似 twitter 的 web 应用程序,使用 Server-Sent Events 来支持实时刷新。

运行

docker-compose up

然后, 浏览 http://localhost:8080

您可以添加自己的帖子或点击按钮获得随机生成的帖子。

无论哪种方式,feeds 列表和 feed 中的帖子都应该是最新的。尝试使用第二个浏览器窗口查看更新。

它是如何工作的

  • 可以创建和更新帖子。
  • 帖子可以包含标签。
  • 每个标签都有自己的 feed,其中包含来自该标签的所有帖子。
  • 所有的帖子都存储在 MySQL 中。这就是写模型。
  • 所有 feed 都异步更新并存储在 MongoDB 中。这是读模型。

为什么要使用单独的写和读模型?

对于这个示例应用程序,使用多语言持久性(两个数据库引擎)当然有些过头了。我们这样做是为了展示这个技术,以及如何很容易地将它应用到 Watermill。

专用的读模型对于具有高读/写比率的应用程序是一种有用的模式。所有写操作都被原子地应用到写模型(在我们的例子中是 MySQL)。事件处理程序异步更新读模型(我们使用 Mongo)。

读取模型中的数据可以按原样使用。也可以独立于写模型进行扩展。

请记住,要使用此模式,应用程序中必须接受最终的一致性。而且,在大多数用例中,您可能不需要使用它。务实!

SSE Router

SSERouter 来自 watermill-http。当创建一个新的路由器时,你需要传递一个上游订阅者。来自该订阅服务器的消息将触发通过 HTTP 推送更新。

在本例中,我们使用 NATS 作为 Pub/Sub,但这可以是 Watermill 支持的任何 Pub/Sub。

sseRouter, err := watermillHTTP.NewSSERouter(
    watermillHTTP.SSERouterConfig{
        UpstreamSubscriber: router.Subscriber,
        ErrorHandler:       watermillHTTP.DefaultErrorHandler,
    },
    router.Logger,
)

Stream Adapters(流适配器)

要使用 SSERouter,你需要准备一个带有两个方法的 StreamAdapter

GetResponse 类似于标准的 HTTP 处理程序。修改现有的处理程序来匹配这个签名应该非常容易。

Validate 是一个额外的方法,它告诉我们是否应该为特定的 Message 推送更新。

type StreamAdapter interface {
	// GetResponse returns the response to be sent back to client.
	// Any errors that occur should be handled and written to `w`, returning false as `ok`.
	GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
	// Validate validates if the incoming message should be handled by this handler.
	// Typically this involves checking some kind of model ID.
	Validate(r *http.Request, msg *message.Message) (ok bool)
}

Validate 示例如下所示。它检查消息是否来自与用户通过 HTTP 请求发送的相同的 post ID。

func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
	postUpdated := PostUpdated{}

	err := json.Unmarshal(msg.Payload, &postUpdated)
	if err != nil {
		return false
	}

	postID := chi.URLParam(r, "id")

	return postUpdated.OriginalPost.ID == postID
}

如果你想为每条消息触发一个更新,你可以简单地返回 true

func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
	return true
}

在开始 SSERouter 之前,您需要添加带有特定主题的处理程序。 AddHandler 返回一个可以在任何路由库中使用的标准 HTTP 处理程序。

postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)

// ...

r.Get("/posts/{id}", postHandler)

Event handlers(事件处理程序)

该示例使用 Watermill 进行所有异步通信,包括 SSE。

发布了以下事件:

  • PostCreated
    • 将 post 添加到贴子中包含标签的所有 feeds 中。
  • FeedUpdated
    • 将更新推送到当前访问 feed 页面的所有客户端。
  • PostUpdated
    • a) 对于现有标签,帖子内容将在标签中更新。
    • b) 如果添加了新的标签,文章将被添加到标签的 feed 中。
    • c) 如果标签已删除,则该帖子将从标签的 feed 中删除。
    • 将更新推送给所有当前访问 post 页面的客户端。
    • 使用帖子中存在的标签更新所有 feeds 中的帖子

前端 app

前端应用程序是使用 Vue.js 和 Bootstrap 构建的。

最有趣的部分是 EventSource 的使用。

this.es = new EventSource('/api/feeds/' + this.feed)

this.es.addEventListener('data', event => {
    let data = JSON.parse(event.data);
    this.posts_stream = data.posts;
}, false);

Refs

  • watermill.io
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 黑客下午茶 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用 SSE(Server-Sent Events) 进行 HTTP 服务器推送
    • 运行
      • 它是如何工作的
        • 为什么要使用单独的写和读模型?
        • SSE Router
        • Stream Adapters(流适配器)
      • Event handlers(事件处理程序)
        • 前端 app
          • Refs
          相关产品与服务
          云数据库 SQL Server
          腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档