Akka-CQRS(6)- read-side

前面我们全面介绍了在akka-cluster环境下实现的CQRS写端write-side。简单来说就是把发生事件描述作为对象严格按发生时间顺序写入数据库。这些事件对象一般是按照二进制binary方式如blob存入数据库的。cassandra-plugin的表结构如下:

CREATE KEYSPACE IF NOT EXISTS akka
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };

CREATE TABLE IF NOT EXISTS akka.messages (
  used boolean static,
  persistence_id text,
  partition_nr bigint,
  sequence_nr bigint,
  timestamp timeuuid,
  timebucket text,
  writer_uuid text,
  ser_id int,
  ser_manifest text,
  event_manifest text,
  event blob,
  meta_ser_id int,
  meta_ser_manifest text,
  meta blob,
  message blob,
  tags set<text>,
  PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket))
  WITH gc_grace_seconds =864000
  AND compaction = {
    'class' : 'SizeTieredCompactionStrategy',
    'enabled' : true,
    'tombstone_compaction_interval' : 86400,
    'tombstone_threshold' : 0.2,
    'unchecked_tombstone_compaction' : false,
    'bucket_high' : 1.5,
    'bucket_low' : 0.5,
    'max_threshold' : 32,
    'min_threshold' : 4,
    'min_sstable_size' : 50
    };

事件对象是存放在event里的,是个blob类型字段。下面是个典型的写动作示范:

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(ActionGo) { event =>
        updateState(event)
      }
  }

这些事件描述的存写即写这个ActionGo时不会影响到实际业务数据状态。真正发生作用,改变当前业务数据状态的是在读端read-side。也就是说在另一个线程里有个程序也按时间顺序把这些二进制格式的对象读出来、恢复成某种结构如ActionGo类型、然后按照结构内的操作指令对业务数据进行实际操作处理,这时才会产生对业务数据的影响。做个假设:如果这些事件不会依赖时间顺序的话是不是可以偷偷懒直接用一种pub/sub模式把reader放在订阅subscriber端,如下:

//写端
  import DistributedPubSubMediator.Publish
  val mediator = DistributedPubSub(context.system).mediator

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(DataUpdated) { event =>
        updateState(event)
        mediator ! Publish(persistentId, event,sendOneMessageToEachGroup = true)
      }
  }

//读端
  val mediator = DistributedPubSub(context.system).mediator
  mediator ! Subscribe(persistentId, self)
  def receive = {
    case DataUpdated: Event ⇒
        updateDataTables()
  }

这种pub/sub模式的特点是消息收发双方耦合度非常松散,但同时也存在订阅方sub即reader十分难以控制的问题,而且可以肯定的是订阅到达消息无法保证是按发出时间顺序接收的,我们无法控制akka传递消息的过程。因为业务逻辑中一个动作的发生时间顺序往往会对周围业务数据产生不同的影响,所以现在只能考虑事件源event-sourcing这种模式了。es方式的CQRS是通过数据库表作为读写间隔实现写端程序和读端程序的分离。写端只管往数据库写数据操作指令,读端从同一数据库位置读出指令进行实质的数据处理操作,所以读写过程中会产生一定的延迟,读端需要不断从数据库抽取pull事件。而具体pull的时段间隔如何设定也是一个比较棘手的问题。无论如何,akka提供了Persistence-Query作为一种CQRS读端工具。我们先从一个简单的cassandra-persistence-query用例开始:

// obtain read journal by plugin id
val readJournal =
  PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.runForeach { pack =>
  updateDatabase(pack.event)
}

eventsByPersistenceId(...)构建了一个akka-stream的Source[EventEnvelope,_]。这个EventEnvelope类定义如下:

/**
 * Event wrapper adding meta data for the events in the result stream of
 * [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries.
 */
final case class EventEnvelope(
  offset:        Offset,
  persistenceId: String,
  sequenceNr:    Long,
  event:         Any)

上面这个event字段就是从数据库读取的事件对象。EventEnvelope是以流元素的形式从数据库中提出。eventsByPersistenceId(...)启动了一个数据流,然后akka-persistence-query会按refresh-interval时间间隔重复运算这个流stream。refresh-interval可以在配置文件中设置,如下面的cassandra-plugin配置:

cassandra-query-journal {
  # Implementation class of the Cassandra ReadJournalProvider
  class = "akka.persistence.cassandra.query.CassandraReadJournalProvider"

  # Absolute path to the write journal plugin configuration section
  write-plugin = "cassandra-journal"

  # New events are retrieved (polled) with this interval.
  refresh-interval = 3s

...
}

以上描述的是一种接近实时的读取模式。一般来讲,为了实现高效、安全的事件存写,我们会尽量简化事件结构,这样就会高概率出现一个业务操作单位需要多个事件来描述,那么如果在完成一项业务操作单元的所有事件存写后才开始读端的动作不就简单多了吗?而且还比较容易控制。虽然这样会造成某种延迟,但如果以业务操作为衡量单位,这种延迟应该是很正常的,可以接受的。现在每当完成一项业务的所有事件存写后在读端一次性成批把事件读出来然后进行实质的数据操作,应当可取。akka-persistence-query提供了下面这个函数:

  /**
   * Same type of query as `eventsByPersistenceId` but the event stream
   * is completed immediately when it reaches the end of the "result set". Events that are
   * stored after the query is completed are not included in the event stream.
   */
  override def currentEventsByPersistenceId(
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long): Source[EventEnvelope, NotUsed] = ...

我们可以run这个stream把数据读入一个集合里,然后可以在任何一个线程里用这个集合演算业务逻辑(如我们前面提到的写端状态转变维护过程),可以用router/routee模式来实现一种在集群节点中负载均衡式的分配reader-actor作业节点。

下一篇准备对应前面的CQRS Writer Actor 示范里的akka-cluster-pos进行rCQRS-Reader-Actor示范。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏用户画像

Mac搭建Spark环境

版权声明:本文为博主-姜兴琪原创文章,未经博主允许不得转载。 https://blog.csdn.net/jxq0816/article/details...

43920
来自专栏黑泽君的专栏

大数据技术之_19_Spark学习_06_Spark 源码解析小结

1、spark 一开始使用 akka 作为网络通信框架,spark 2.X 版本以后完全抛弃 akka,而使用 netty 作为新的网络通信框架。 最主要原因...

10830
来自专栏Frank909

c++ 开发中利用yaml-cpp读写yaml配置文件

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/briblue/article/details/895...

1.1K40
来自专栏数据分析1480

大数据之脚踏实地学16--Scala列表、元组与映射

在上一期的《大数据之脚踏实地学15--Scala的数组操作》分享中,介绍了Scala的数组相关知识,借助于Array函数可以构造定长数组(即数组一旦定义好长度,...

7210
来自专栏Leetcode名企之路

看了很多技术书,为啥仍然写不出项目?

这大概是还在读书的同学最大的困惑了。自己明明看了很多书,感觉不到自己的进步,很有挫败感。计算机科学是一门实践的科学,你发现你看了《现代操作系统》,《CSAPP》...

12630
来自专栏银河系资讯

使用Akka实现并发

所以我从一个简单的Java程序开始,运行一个while循环直到EOF,然后进行JDBC调用来存储值。这是需要花一个小时才完成了,但后来我意识到程序的运行时比创建...

20020
来自专栏数据分析1480

大数据之脚踏实地学15--Scala的数组操作

Scala中的数组是一种非常重要的数据结构,它是用来存储同类型元素的容器,除此Scala还有其他存储数据的容器,例如元组、列表、映射等。在本期的内容分享中,我们...

8610
来自专栏数据分析1480

大数据之脚踏实地学14--Scala自定义函数

函数,其最大的好处在于避免了代码的重复编写,可以使编程过程更加地高效。尽管在《大数据之脚踏实地学12--Scala数据类型与运算符》和《大数据之脚踏实地学13-...

13410
来自专栏有困难要上,没有困难创造困难也要上!

从零开始构建Flink开发项目-Scala版

今天要做一个Flink的测试,因此需要创建一个简单的Flink项目,于是找到了下面这种方式来创建一个Flink启动项目。

39530
来自专栏码匠的流水账

聊聊hazelcast的PhiAccrualFailureDetector

本文主要研究一下hazelcast的PhiAccrualFailureDetector

11820

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励