基于Go语言使用NATS Streaming构建分布式系统和微服务

基于 Go 语言使用 NATS Streaming构建分布式系统和微服务

一段时间前,我写了一篇名为向 Go 语言开发者介绍 NATS 的博客文章以便使用Apcera NATS 作为基于GO语言的构建分布式系统和微服务的消息系统。在本文中,我将介绍NATS Streaming 服务器,它建立在 NATS 服务器顶端,提供你发布在 NATS 上的消息的永久日志。

NATS 是一个开源的,轻量级和高性能的云端本地消息系统。我喜欢使用 NATS 基于Go语言构建分布式系统因为它的性能和简洁。构建分布式系统总是带来很多复杂性,使用一个简单而高性能的消息系统就成了一个重要的决策了。NATS 有两个可互操作的模块:核心 NATS 平台 —— 简称为NATS的NATS 服务器(可执行文件名是 gnatsd)和 NATS Streaming (可执行文件名是nats-streaming-server

NATS Streaming 简介

基本的NATS服务器专为高性能和简洁而设计,它不会为通过NATS发布的消息提供持久性存储。缺乏对消息的持久存储对于许多分布式系统来说都是一个问题。例如,假设你的一个用户系统在你发布消息时发生故障,则该用户系统不会收到该消息,因此你必须提供处理此类情况的架构方法。再举一个例子,假设你想要在现有的分布式系统环境中添加一个新系统,你希望从现有的分布式系统环境中收到所有的消息以获取数据历史记录,但由于缺乏永久性存储,你将无法从基本的 NATS 服务器中获取数据。而另一方面,NATS Streaming附带一个持久性存储,用于为通过 NATS 服务器发布的消息提供日志。如果你需要持久性消息传输和交付保证,则可以使用 NATS Streaming 而不是核心 NATS 平台。

NATS Streaming是一个非常高性能,轻量级和可靠的流式平台,它构建在核心NATS平台的顶层,提供持久性日志。NATS Streaming 是用 Go 语言编写的。它可以用来添加事件流,交付保证和将历史数据重放到NATS。请记住,NATS Streaming 不是一个独立的服务器,但它使用 NATS 服务器(gnatsd)。简而言之,NATS Streaming 内嵌 NATS 服务器作为消息传输服务器,并提供了一个额外的功能,可以为事件流系统提供持久性日志使用。

NATS 流媒体提供了以下高级功能集:

  • 基于日志的持久性
  • 至少一次的交付模式,提供可靠的消息传输
  • 按照每次订购的价格进行匹配
  • 重播/重新启动
  • 最终值语义

NATS Streaming的高级功能类似于 Apache Kafka 的功能,但当你考虑简单性而非复杂性时前者更优。由于 NATS Streaming 相对来说是一项新技术,与 Apache Kafka 相比,它在某些领域需要改进,尤其是为负载均衡场景提供更好的解决方案。

频道 ( Channel )

频道是 NATS Streaming 服务器中最重要的概念。频道是发送数据并从中消耗的客户端。与基本的 NATS 服务器不同,NATS Streaming 服务器不支持频道的通配符。你可以使用配置来控制频道数量。发布到频道的消息存储在频道内的消息日志中,如下图所示。

持久化日志的文件存储

上图描述了一个持久性日志的文件存储,其中名为order-notification的目录用于存储同名频道的消息。

消息日志

发布到频道的消息会附加到持久存储中日志的末尾。消息的限度(limit)可以配置。如果为全部频道或特定频道配置限度,当达到限度时,旧的消息将被删去来限制永久性日志的大小,以便追加新的消息。默认情况下,NATS Streaming 使用内存来存储消息,但可以通过配置来修改。

设置 NATS Streaming

为了下载和安装NATS Streaming,请使用GitHub 发布页面中预建发布的二进制文件或使用名为 nats-streaming 的正式 Docker 镜像。你还可以使用 go get 命令获取 NATS 流:

go get github.com/nats-io/nats-streaming-server

为了用 Go 语言创建 NATS 客户端应用程序,请使用go get命令下载并安装 Go 语言包:

go get github.com/nats-io/go-nats-streaming

要运行NATS流,请运行名为 nats-streaming-server 的二进制文件:

nats-streaming-server

默认情况下,NATS Streaming使用内存存储来存储消息,因此如果NATS服务器关闭,你将丢失消息。所以更好的选择是在运行 NATS Streaming 服务器时通过提供store(存储)标志来使用文件存储,如下所示:

nats-streaming-server \
--store file \
--dir ./data \
--max_msgs 0 \
--max_bytes 0

这里是用来运行NATS Streaming服务器的标志:

--store <string>           Store type: MEMORY|FILE (default: MEMORY)
--dir <string>             For FILE store type, this is the root directory
 --max_msgs <int>           Max number of messages per channel (0 for unlimited)
--max_bytes <size>         Max messages total size per channel (0 for unlimited)

在用于运行 NATS Streaming 服务器的上述配置中,指定了配置选项,用于在根目录数据使用文件存储器来存储消息日志,并指定每个频道无限数量的消息和无限制消息可存储到消息日志中。

下图描述了 NATS Streaming 服务器正在运行一个名为“test-cluster”的集群:

NATS Streaming 服务器使用嵌入式 NATS 服务器运行

当你运行NATS Streaming 服务器时,嵌入式 NATS 服务器将自动启动并监听默认端口4222上的客户端连接。因此,你不需要用 NATS Streaming 服务器手动运行 NATS 服务器。

使用NATS Streaming 构建分布式系统

在构建分布式系统时,你可以使用NATS Streaming 作为神经系统(Nervous System)供你的应用程序将事件发布到数据流,并以异步方式在不同系统之间交换消息。请记住,NATS Streaming 不是一个典型的消息传输系统,但它不仅仅是一个提供事件流传输平台的消息传输系统。最近,很多人在不了解其核心功能的情况下将 Apache Kafka 用作简单的消息传输系统。

在微服务架构中使用NATS流

在构建分布式系统时,微服务模式是一个不错的选择。在微服务体系结构中,你可以通过构建一套可独立可部署的小型模块化服务来构建应用程序。当你从单一应用转向微服务架构时,你需要解决许多实际挑战。例如,一个商业交易可能跨越几个微服务,因为我们把一个单一的系统分解成几个自治的服务。一个事务可能需要在多个微服务中执行一致性操作,在多个微服务中你还需要管理数据一致性。为了解决微服务在管理分布式事务和数据一致性方面的实际挑战,事件驱动的体系结构是强烈推荐的方法。在各种事件驱动架构中,我强烈建议采用 Event Sourcing (事件源),这是一个以事件为中心的架构,通过组合各种事件来构建应用程序的状态。Event Sourcing 处理事件存储库中的不可变日志事件,其中每个日志(对域对象进行的状态更改)表示一个应用程序状态。因为应用程序中的每个状态更改都被视为不可变的日志,所以你可以轻松排除应用程序故障,并且还可以随时返回到特定版本的应用程序状态。事件存储与 NATS Streaming 的消息日志完全相同,发布到频道的消息将附加到日志中。NATS Streaming 目前不支持持久化日志的数据库系统,但我希望这个功能可以在近期到来, 这个功能也可以Event Sourcing 的实现中供你的分布式应用作为事件存储。

当你使用事件驱动架构构建微服务时,可以使用 NATS Streaming 作为事件流式传输平台,在域事件发生在聚合状态更改(DDD聚合)或简单域实体上时通过频道发布事件时,以便其他微服务可以订阅这些来自频道的消息和执行自己的操作,并发布其他一些事件以供其他微服务商了解到某些状态发生变化。并且为频道上发布的消息提供持续的日志记录,NATS Streaming 以一种高效的方式为你构建现代分布式系统提供了消息传输功能。

NATS Streaming 示例

本文的主要目标不是讨论与微服务相关的模式,而是通过使用示例演示向 Go 语言开发人员介绍NATS Streaming,以便你可以在微服务相关模式中找到一些流畅的实现。该示例的源代码可以在GitHub这里获取到。该示例由以下 Go 语言的包组成:

  • pb:协议缓冲区 (Protocol Buffers) 定义用来描述消息类型和RPC端点。
  • orderservice:供客户创建订单 (Order) 的 HTTP API 服务器。当出现一个新的订单,则触发 “OrderCreated” 事件,因此,它会调用由 eventstore 提供的 gRPC 方法“CreateEvent” 将事件发布到Event Store。
  • eventstore:一个gRPC服务器和一个NATS Streaming客户端,它将域事件保存到 Event Store 中并在 NATS Streaming 频道上发布事件。此示例假定应用程序的状态由各种事件组成(Event Sourcing 模式的流式实现)。所有命令操作都作为事件永久保存到 Event Store 中。这里的 CockroachDB 用于保存事件。
  • restuarantservice:一种NATS Streaming 客户端,当新订单通过 orderservice 创建且信息从evenstore上通过频道的 “order-notification” 发布时,可以从 NATS Streaming 频道的 “order-notification” 上订阅信息以获取消息。
  • orderquery-store1:一种NATS流客户端,它在 NATS Streaming 频道 “order-notification” 中用QueueGroup(一种NATS消息传输模式)订阅消息,以在事件发生在聚合订单上时获取消息。此软件包的目标是基于 Event Store中保存的域事件来保存用于查询数据的数据模型。示例演示假定正在将单独的数据模型用于命令操作和查询操作(CQRS)。由于你要为命令和查询保留单独的数据模型,因此可以在数据模型上使用非规范化数据集以进行查询。这里的 CockroachDB 用于保存查询模型的数据集。在实际场景中,单独的数据库将被用于命令和查询模型。
  • orderquery-store2:一个NATS Streaming 客户端,它通过NATS Streaming“order-notification” 频道订阅带有 QueueGroup 的消息。orderquery-store1orderquery-store2都在做同样的事情 -- 执行数据复制的逻辑用于制作数据查询的存储,这些数据是从Event Store中构建的。为了分配数据复制逻辑,它用作QueueGroup 订户客户端(orderquery-store1orderquery-store2)
  • store:这是一个共享库包,提供持久化逻辑用来处理 CockroachDB 数据库。请注意,CockroachDB 是一个用 Go 语言编写的出色分布式数据库系统。稍后我会撰写关于 CockroachDB 的博客文章。

发布事件

以下是来自 eventstore 的代码块,当它调用RPC方法 CreateEvent 时将在NATS Streaming上发布事件:

清单1. NATS Streaming Publisher客户端

package main

import (
	"context"
	"log"
	"net"

	stan "github.com/nats-io/go-nats-streaming"
	"google.golang.org/grpc"

	"github.com/shijuvar/gokit/examples/nats-streaming/pb"
	"github.com/shijuvar/gokit/examples/nats-streaming/store"
)

const (
	port      = ":50051"
	clusterID = "test-cluster"
	clientID  = "event-store"
)

type server struct{}

// CreateOrder RPC creates a new Event into EventStore
// CreateOrder RPC 创建了一个新的事件到 EventStore
func (s *server) CreateEvent(ctx context.Context, in *pb.Event) (*pb.Response, error) {
	// Persist data into EventStore database
    // 保存数据到 EventStore 数据库
	command := store.EventStore{}
	err := command.CreateEvent(in)
	if err != nil {
		return nil, err
	}
	// Publish event on NATS Streaming Server
    // 在NATS Streaming 服务器上发布消息
	go publishEvent(in)
	return &pb.Response{IsSuccess: true}, nil
}

// publishEvent publish an event via NATS Streaming server
// publishEvent 通过 NATS Streaming 服务器发布消息
func publishEvent(event *pb.Event) {
	// Connect to NATS Streaming server
    // 连接 NATS Streaming 服务器
	sc, err := stan.Connect(
		clusterID,
		clientID,
		stan.NatsURL(stan.DefaultNatsURL),
	)
	if err != nil {
		log.Print(err)
		return
	}
	defer sc.Close()
	channel := event.Channel
	eventMsg := []byte(event.EventData)
	// Publish message on subject (channel)
    // 在对象(频道) 上发布消息
	sc.Publish(channel, eventMsg)
	log.Println("Published message on channel: " + channel)
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	// Creates a new gRPC server
    // 创建一个新的 gRPC 服务器
	s := grpc.NewServer()
	pb.RegisterEventStoreServer(s, &server{})
	s.Serve(lis)
}

NATS Streaming 客户端的连接功能,可以建立到NATS Streaming服务器的连接。你必须提供 集群ID( Cluster ID )客户端ID (Client ID) 才能连接到 NATS Streaming 服务器。Client ID非常重要, 它是由服务器用来唯一标识NATS流媒体客户端的。因此具有相同客户端ID的两个连接是不可能的。

方法发布(Publish) NATS Streaming 连接是用来通过提供一个对象(频道)和消息来发布消息的。在示例演示中,频道和消息数据均来自 gRPC 客户端应用程序(orderservice)。在这里,我们提供名称为“订单通知”的频道,并且消息orderservice中传递过来用于创建一个订单,其中每个JSON字符串表示一个订单。

API方法Publish将消息同步发布到集群,并等待ACK(确认)。API方法PublishAsync 异步则发布消息。这将为正发送到集群的消息返回一个 GUID。

清单2.异步发布消息至 NATS Streaming Server

ackHandler := func(ackedNuid string, err error) {
        if err != nil {
            log.Printf("Error publishing message id %s: %v\n", ackedNuid, err.Error())
        } else {
            log.Printf("Received ACK for message id %s\n", ackedNuid)
        }
    }
	
 channel := event.Channel
 eventMsg := []byte(event.EventData)
 // Publish message on subject (channel)
 // 在对象(频道) 上发布消息
 nuid, err := sc.PublishAsync(channel, eventMs, ackHandler) 
 if err != nil {
	log.Printf("Error publishing message %s: %v\n", nuid, err.Error())
 }

创建订户客户端

基本的NATS服务器没有提供持久性日志,订阅消息的能力非常有限。当你发布消息时,如果订阅者客户端关闭,它将无法接收来自服务器的消息。由于NATS Streaming 服务器有持久化日志功能,它提供了很多从 NATS 服务器订阅消息的功能。

客户端在给定的频道上创建一个 NATS Streaming 订阅,并且该频道上的消息将从消息日志发送到订阅者客户端。在创建订阅时,服务器将发送由订阅客户端提供的最大数量订阅消息(你可以指定订阅消息的最大数量)。当从订户客户端收到消息时,ACK(确认)将被发送到服务器。ACK将自动发送,但你也可以将其配置为手动向服务器发送ACK。

有几种类型的 NATS Streaming 订阅:

  • 定期的
  • 持久化的
  • 队列组 (Queue Group)
  • 重传 (Redelivery)

在示例演示中,我们使用的是持久订阅,它允许订户客户端从先前停止的位置恢复消息使用。通过持久订阅,NATS Streaming 服务器即使在客户端连接关闭后也可以维护订阅者客户端的状态。持久订阅通过提供一个持久化的名称来创建。你还可以为队列组创建的订户客户端使用持久订阅。

这里是代码块,它创建一个用于restaurantservice持久化订阅的订阅者客户端,以接收在频道 “order-notification”上发布的消息:

清单3.订阅来自频道“order-notification”消息的NATS Streaming 客户端

package main

import (
	"encoding/json"
	"log"
	"runtime"
	"time"

	stan "github.com/nats-io/go-nats-streaming"

	"github.com/shijuvar/gokit/examples/nats-streaming/pb"
)

const (
	clusterID = "test-cluster"
	clientID  = "restaurant-service"
	channel   = "order-notification"
	durableID = "restaurant-service-durable"
)

func main() {
	sc, err := stan.Connect(
		clusterID,
		clientID,
		stan.NatsURL(stan.DefaultNatsURL),
	)

	if err != nil {
		log.Fatal(err)
	}
	// Subscribe with manual ack mode, and set AckWait to 60 seconds
    // 以手动ack模式订阅, 且设置 AckWait 时间至60s
	aw, _ := time.ParseDuration("60s")
	sc.Subscribe(channel, func(msg *stan.Msg) {
		msg.Ack() // Manual ACK
		order := pb.Order{}
		// Unmarshal JSON that represents the Order data
        // 代替 Order数据的编排化JSON
		err := json.Unmarshal(msg.Data, &order)
		if err != nil {
			log.Print(err)
			return
		}
		// Handle the message
        // 处理消息
		log.Printf("Subscribed message from clientID - %s for Order: %+v\n", clientID, order)

	}, stan.DurableName(durableID),
		stan.MaxInflight(25),
		stan.SetManualAckMode(),
		stan.AckWait(aw),
	)
	runtime.Goexit()

通过使用 go-nats-streaming 包的 DurableName 函数提供一个持久化名称,订阅者客户端将通过持久化订阅在频道“订单通知”上创建。订阅者客户端通过使用函数SetManualAckMode配置为手动发送ACK 。在某些情况下,你可能更愿意手动发送ACK。一旦你配置为手动发送ACK,你必须显式调用 NATS Streaming 消息的函数Ack

msg.Ack() // Manual ACK 手动ACK

如果你尚未指定SetManualAckMode,则会在调用订户的消息处理程序后自动发送ACK。

NATS Streaming 为给定频道上的订户客户提供至少一次的消息传输。如果在配置的超市间隔(默认值为30秒)内没有收到ACK,NATS Streaming 将尝试重新传送消息。函数AckWait用于配置超时间隔。前面的代码块设置60秒的超时间隔。你还可以通过使用函数MaxInflight来限制NATS Streaming 服务器发送的没有ACK的最大消息数。

使用队列组创建订户客户端

订阅者客户端可以通过指定一个队列组来创建。具有相同队列名称的同一频道的多个订户客户端形成队列组。队列订阅者可让你分发多个订户的消息处理。当你在频道上发布消息时,该消息将被发送到同一队列组其中一个用户。当你在短时间内发布数百万条消息时,如果消息处理的顺序并不重要,则排队订户可以高效地并行分发消息处理,并且提供高性能。

在该演示示例中,当域事件发生时, 消息从eventstore应用程序发布,并且消息从以下三个用户的 “order-notificaton” 频道上订阅:

  1. restaurantservice
  2. orderquery-store1
  3. orderquery-store2

在三个订阅者中,restaurantservice 是一个没有队列组的持久订阅者,但其余两个订阅者形成一个具有相同队列名称的队列组。当消息在“order-notificaton”频道上发布时,restaurantservice 将始终收到该消息,但队列组的一个订阅者orderquery-store1orderquery-store2)将收到该消息。我们还可以通过为同一队列组中的所有订阅者提供相同的持久化名称来提供持久选项来创建队列订阅者。在示例演示中,队列订户用于实现查询模型的数据复制,因为所有命令操作都作为一系列事件持续存在,即域实体状态更改的不可变日志DDD Aggregates(聚合)。拥有单独的命令和查询模型,对于构建具有 Event Sourcing 和 CQRS 的微服务来说是一个很好的选择。

以下是创建队列订阅者的代码块:

package main

import (
	"encoding/json"
	"log"
	"runtime"

	stan "github.com/nats-io/go-nats-streaming"

	"github.com/shijuvar/gokit/examples/nats-streaming/pb"
	"github.com/shijuvar/gokit/examples/nats-streaming/store"
)

const (
	clusterID  = "test-cluster"
	clientID   = "order-query-store1"
	channel    = "order-notification"
	durableID  = "store-durable"
	queueGroup = "order-query-store-group"
)

func main() {
	// Connect to NATS Streaming server
    // 连接 NATS Streaming 服务器
	sc, err := stan.Connect(
		clusterID,
		clientID,
		stan.NatsURL(stan.DefaultNatsURL),
	)

	if err != nil {
		log.Fatal(err)
	}
	sc.QueueSubscribe(channel, queueGroup, func(msg *stan.Msg) {
		order := pb.Order{}
		err := json.Unmarshal(msg.Data, &order)
		if err == nil {
			// Handle the message
            // 处理消息
			log.Printf("Subscribed message from clientID - %s: %+v\n", clientID, order)
			queryStore := store.QueryStore{}
			// Perform data replication for query model into CockroachDB
            // 执行查询模型的数据复制至 CockroachDB
			err := queryStore.SyncOrderQueryModel(order)
			if err != nil {
				log.Printf("Error while replicating the query model %+v", err)
			}
		}
	}, stan.DurableName(durableID),
	)
	runtime.Goexit()
}

源代码

为演示实例的源代码可以在GitHub这里获取。

集群NATS Streaming服务器

NATS Streaming 相对来说是一项新技术,肯定需要一些改进。NATS Streaming 服务器 目前不支持集群。但热心的NATS团队正在计划为集群提供更好的解决方案,我希望NATS团队很快会提供解决方案。尽管此时NATS Streaming 服务器不支持集群,但你可以对NATS 服务器进行集群,因为基本NATS 服务器支持将集群嵌入到NATS Streaming中。因此,通过运行连接到NATS服务器集群的单个NATS Streaming 服务器来解决集群问题是一种解决方法。

容错

值得注意的是,NATS服务器可以以 Fault Tolerance (FT, 容错)模式运行,通过以其中一个作为 活动服务器(Active Server)其他所有服务器作为备用服务器(Standby Server)的方式形成一组服务器。这将帮助你最大限度地减少单点故障。Fault Tolerance 组(FT组)中的活动服务器访问持久性存储并处理与客户端以及所有备用服务器所有通信,且所有的备用服务器将处于运行状态以检测活动服务器的故障。你可以在同一个 Fault Tolerance 组中拥有多个备用服务器。当FT组中的活动服务器出现故障时,所有备用服务器都将尝试激活,然后一台服务器将成为活动服务器并恢复持久存储, 为所有客户端提供服务。为了供FT组使用永久存储的共享状态,数据存储需要由FT组中的所有服务器装载。

本文的版权归 FesonX 所有,如需转载请联系作者。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

scribe、chukwa、kafka、flume日志系统对比

1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特...

52150
来自专栏PPV课数据科学社区

关键七步,用Apache Spark构建实时分析Dashboard

作者 | Abhinav 译者:王庆 摘要:本文我们将学习如何使用Apache Spark streaming,Kafka,Node.js,Socket.I...

428110
来自专栏架构说

开源日志系统比较:scribe、chukwa、kafka、flume

1. 背景介绍 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下...

407120
来自专栏人云亦云

Storm参数配置及代码优化

44150
来自专栏吴伟祥

Socket套接字简介 转

Socket套接字由远景研究规划局(Advanced Research Projects Agency, ARPA)资助加里福尼亚大学伯克利分校的一个研究组研发...

18810
来自专栏数据派THU

独家 | 一文读懂Hadoop(四):YARN

随着全球经济的不断发展,大数据时代早已悄悄到来,而Hadoop又是大数据环境的基础,想入门大数据行业首先需要了解Hadoop的知识。2017年年初apache发...

408110
来自专栏Spark学习技巧

spark源码单步跟踪阅读-从毛片说起

34850
来自专栏YG小书屋

kudu简介与操作方式

56450
来自专栏张善友的专栏

基于Redis的开源分布式服务Codis

Redis在豌豆荚的使用历程——单实例==》多实例,业务代码中做sharding==》单个Twemproxy==》多个Twemproxy==》Codis,豌豆荚...

26760
来自专栏Albert陈凯

Hadoop数据分析平台实战——010hadoop介绍安装

本课程目标 本课程有以下几个目标: 第一:对hadoop没有了解的学员来说,可以帮助其了解在一般工作中hadoop的基本用法,以及对如何用hadoop有一定的了...

37080

扫码关注云+社区

领取腾讯云代金券