写模型是领域行为的真实持有者,继续我们的例子,仓储接口将被简化如下:
interface PostRepository{
public function save(Post $post);
public function byId(PostId $id);}
现在PostRepository
已经从所有读关注点中分离出来,除了一个:byId
方法,负责通过 ID 来加载聚合以便我们对其进行操作。那么只要这一步完成,所有的查询方法都将从Post
模型中剥离出来,只留下命令方法。这意味着我们可以有效地摆脱所有getter方法和任何其它暴露Post
聚合信息的方法。取而代之的是,通过订阅聚合模型来发布领域事件,以触发写模型投影:
class AggregateRoot{
private $recordedEvents = []; protected function recordApplyAndPublishThat(
DomainEvent $domainEvent
)
{
$this->recordThat($domainEvent);
$this->applyThat($domainEvent);
$this->publishThat($domainEvent);
} protected function recordThat(DomainEvent $domainEvent)
{
$this->recordedEvents[] = $domainEvent;
} protected function applyThat(DomainEvent $domainEvent)
{
$modifier = 'apply' . get_class($domainEvent);
$this->$modifier($domainEvent);
} protected function publishThat(DomainEvent $domainEvent)
{
DomainEventPublisher::getInstance()->publish($domainEvent);
} public function recordedEvents()
{
return $this->recordedEvents;
} public function clearEvents()
{
$this->recordedEvents = [];
}}class Post extends AggregateRoot{
private $id;
private $title;
private $content;
private $published = false;
private $categories; private function __construct(PostId $id)
{
$this->id = $id;
$this->categories = new Collection();
} public static function writeNewFrom($title, $content)
{
$postId = PostId::create();
$post = new static($postId);
$post->recordApplyAndPublishThat(
new PostWasCreated($postId, $title, $content)
);
} public function publish()
{
$this->recordApplyAndPublishThat(
new PostWasPublished($this->id)
);
} public function categorizeIn(CategoryId $categoryId)
{
$this->recordApplyAndPublishThat(
new PostWasCategorized($this->id, $categoryId)
);
} public function changeContentFor($newContent)
{
$this->recordApplyAndPublishThat(
new PostContentWasChanged($this->id, $newContent)
);
} public function changeTitleFor($newTitle)
{
$this->recordApplyAndPublishThat(
new PostTitleWasChanged($this->id, $newTitle)
);
}}
所有触发状态改变的动作都通过领域事件来实现。对于每一个已发布的领域事件,都有一个对应的apply
方法负责状态的改变:
class Post extends AggregateRoot{// ...
protected function applyPostWasCreated(
PostWasCreated $event
)
{
$this->id = $event->id();
$this->title = $event->title();
$this->content = $event->content();
} protected function applyPostWasPublished(
PostWasPublished $event
)
{
$this->published = true;
} protected function applyPostWasCategorized(
PostWasCategorized $event
)
{
$this->categories->add($event->categoryId());
} protected function applyPostContentWasChanged(
PostContentWasChanged $event
)
{
$this->content = $event->content();
} protected function applyPostTitleWasChanged(
PostTitleWasChanged $event
)
{
$this->title = $event->title();
}}
读模型,同时也称为查询模型,是一个纯粹的从领域中提取的非规范化的数据模型。事实上,使用CQRS
,所有的读取侧都被视为基础设施关注的表述过程。一般来说,当使用CQRS
时,读模型与 UI 所需有关,与组合视图的 UI 复杂性有关。在一个关系型数据库中定义读模型的情况下,最简单的方法就是建立数据表与 UI 视图一对一的关系。这些数据表和 UI 视图将用写模型投影更新,由写一侧发布的领域事件来触发:
-- Definition of a UI view of a single post with its commentsCREATE TABLE single_post_with_comments (id INTEGER NOT NULL,post_id INTEGER NOT NULL,post_title VARCHAR(100) NOT NULL,post_content TEXT NOT NULL,post_created_at DATETIME NOT NULL,comment_content TEXT NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;-- Set up some dataINSERT INTO single_post_with_comments VALUES(1, 1, "Layered" , "Some content", NOW(), "A comment"),(2, 1, "Layered" , "Some content", NOW(), "The comment"),(3, 2, "Hexagonal" , "Some content", NOW(), "No comment"),(4, 2, "Hexagonal", "Some content", NOW(), "All comments"),(5, 3, "CQRS", "Some content", NOW(), "This comment"),(6, 3, "CQRS", "Some content", NOW(), "That comment");-- Query itSELECT * FROM single_post_with_comments WHERE post_id = 1;
这种架构风格的一个重要特征就是,读模型应该完全是一次性的,因为应用的真实状态是由写模型来处理。这意味着读模型在需要时,可以用写模型投影来移除和重建。
这里我们可以看到一个博客应用里的一些可能存在的视图的例子:
SELECT * FROM posts_grouped_by_month_and_year ORDER BY month DESC,year ASC;SELECT * FROM posts_by_tags WHERE tag = "ddd";SELECT * FROM posts_by_author WHERE author_id = 1;
需要特别指出的是,CQRS
并不约束读模型的定义和实现要用关系型数据库,它取决于被构建的应用实际所需。它可以是关系型数据库,面向文档的数据库,键-值型存储,或任意适合应用所需的存储引擎。在博客帖子应用里,我们使用Elasticsearch
– 一个面向文档的数据库 – 来实现一个读模型:
class PostsController{
public function listAction()
{
$client = new ElasticsearchClientBuilder::create()->build();
$response = $client->search([
'index' => 'blog-engine',
'type' => 'posts',
'body' => [
'sort' => [
'created_at' => ['order' => 'desc']
]
]
]);
return [
'posts' => $response
];
}}
读模型被彻底地简化为针对Elasticsearch
的单个查询索引。
这表明读模型并不真正需要一个对象关系映射器,因为这是多余的。然而,写模型可能会得益于对象关系映射的使用,因为这允许你根据应用程序所需要来组织和构建读模型。
接下来便是棘手的部分。如何用写模型同步读模型?我们之前已经说过,通过使用写模型事务中捕获的领域事件来完成它。对于捕获的每种类型的领域事件,将执行一个特定的投影。因此,将设置领域事件和投影间的一个一对一的关系。
让我们看看配置投影的一个例子,以便我们得到一个更好的方法。首先,我们需要定义一个投影接口:
interface Projection{
public function listensTo();
public function project($event);}
所以为PostWasCreated
事件定义一个Elasticsearch
投影如下述一般简单:
namespace Infrastructure\Projection\Elasticsearch;use Elasticsearch\Client;use PostWasCreated;class PostWasCreatedProjection implements Projection{
private $client; public function __construct(Client $client)
{
$this->client = $client;
} public function listensTo()
{
return PostWasCreated::class;
} public function project($event)
{
$this->client->index([
'index' => 'posts',
'type' => 'post',
'id' => $event->getPostId(),
'body' => [
'content' => $event->getPostContent(),// ...
]
]);
}}
Projector
的实现就是一种特殊的领域事件监听器。它与默认的领域事件监听器的主要区别在于 Projector
触发了一组领域事件而不是仅仅一个:
namespace Infrastructure\Projection;class Projector{
private $projections = []; public function register(array $projections)
{
foreach ($projections as $projection) {
$this->projections[$projection->eventType()] = $projection;
}
} public function project(array $events)
{
foreach ($events as $event) {
if (isset($this->projections[get_class($event)])) {
$this->projections[get_class($event)]
->project($event);
}
}
}}
下面的代码展示了 projector
和事件间的流向:
$client = new ElasticsearchClientBuilder::create()->build();$projector = new Projector();$projector->register([new Infrastructure\Projection\Elasticsearch\PostWasCreatedProjection($client),new Infrastructure\Projection\Elasticsearch\PostWasPublishedProjection($client),new Infrastructure\Projection\Elasticsearch\PostWasCategorizedProjection($client),new Infrastructure\Projection\Elasticsearch\PostContentWasChangedProjection($client),new Infrastructure\Projection\Elasticsearch\PostTitleWasChangedProjection($client),]);$events = [new PostWasCreated(/* ... */),new PostWasPublished(/* ... */),new PostWasCategorized(/* ... */),new PostContentWasChanged(/* ... */),new PostTitleWasChanged(/* ... */),];$projector->project($event);
这里的代码是一种同步技术,但如果需要的话也可以是异步的。你也通过在视图层放置一些警告通知来让客户知道这些不同步的数据。
对于接下来的例子,我们将结合使用 amqplib PHP 扩展和 ReactPHP:
// Connect to an AMQP broker$cnn = new AMQPConnection();$cnn->connect();// Create a channel$ch = new AMQPChannel($cnn);// Declare a new exchange$ex = new AMQPExchange($ch);$ex->setName('events');$ex->declare();// Create an event loop$loop = ReactEventLoopFactory::create();// Create a producer that will send any waiting messages every half asecond$producer = new Gos\Component\React\AMQPProducer($ex, $loop, 0.5);$serializer = JMS\Serializer\SerializerBuilder::create()->build();$projector = new AsyncProjector($producer, $serializer);$events = [
new PostWasCreated(/* ... */),
new PostWasPublished(/* ... */),
new PostWasCategorized(/* ... */),
new PostContentWasChanged(/* ... */),
new PostTitleWasChanged(/* ... */),];$projector->project($event);
为了能让它工作,我们需要一个异步的 projector
。这有一个原生的实现如下:
namespace Infrastructure\Projection;use Gos\Component\React\AMQPProducer;use JMS\Serializer\Serializer;class AsyncProjector{
private $producer;
private $serializer; public function __construct(
Producer $producer,
Serializer $serializer
)
{
$this->producer = $producer;
$this->serializer = $serializer;
} public function project(array $events)
{
foreach ($events as $event) {
$this->producer->publish(
$this->serializer->serialize(
$event, 'json'
)
);
}
}}
在 RabbitMQ 交换机上的事件消费者如下:
// Connect to an AMQP broker$cnn = new AMQPConnection();$cnn->connect();// Create a channel$ch = new AMQPChannel($cnn);// Create a new queue$queue = new AMQPQueue($ch);$queue->setName('events');$queue->declare();// Create an event loop$loop = React\EventLoop\Factory::create();$serializer = JMS\Serializer\SerializerBuilder::create()->build();$client = new Elasticsearch\ClientBuilder::create()->build();$projector = new Projector();$projector->register([
new Infrastructure\Projection\Elasticsearch\
PostWasCreatedProjection($client),
new Infrastructure\Projection\Elasticsearch\
PostWasPublishedProjection($client),
new Infrastructure\Projection\Elasticsearch\
PostWasCategorizedProjection($client),
new Infrastructure\Projection\Elasticsearch\
PostContentWasChangedProjection($client),
new Infrastructure\Projection\Elasticsearch\
PostTitleWasChangedProjection($client),]);// Create a consumer$consumer = new Gos\Component\ReactAMQP\Consumer($queue, $loop, 0.5, 10);// Check for messages every half a second and consume up to 10 at a time.$consumer->on(
'consume',
function ($envelope, $queue) use ($projector, $serializer) {
$event = $serializer->unserialize($envelope->getBody(), 'json');
$projector->project($event);
});$loop->run();
从现在开始,只需让所有所需的仓储使用 projector
实例,然后让它们调用投影过程就可以了:
class DoctrinePostRepository implements PostRepository{
private $em;
private $projector; public function __construct(EntityManager $em, Projector $projector)
{
$this->em = $em;
$this->projector = $projector;
} public function save(Post $post)
{
$this->em->transactional(
function (EntityManager $em) use ($post) {
$em->persist($post);
foreach ($post->recordedEvents() as $event) {
$em->persist($event);
}
}
);
$this->projector->project($post->recordedEvents());
} public function byId(PostId $id)
{
return $this->em->find($id);
}}
Post
实例和记录事件在同一个事务中触发和持久化。这就确保没有事件丢失,只要事务成功了,我们就会把它们投影到读模型中。因此,在写模型和读模型之间不存在不一致的情况。
用 ORM 还是不用 ORM 一个非常普遍的问题就是当实现 CQRS 时,是否真正需要一个对象关系映射(
ORM
)。我们真的认为,写模型使用ORM
是极好的,同时有使用工具的所有优点,这将帮助我们节省大量的工作,只要我们使用了关系型数据库。但我们不应该忘了我们仍然需要在关系型数据库中持久化和检索写模型状态。
CQRS
是一个非常强大和灵活的架构。在收集和保存领域事件(在聚合操作期间发生)这方面,它有一个额外的好处,就是给你领域中发生的事件一个高度的细节。因为领域事件描述了过去发生的事情,它对于领域的意义,使它成为战术模式的一个关键点。
小心记录太多事件 越来越多的事件是一种坏味道。在领域中记录事件也许是一种成瘾,这也最有可能被企业激励。作为一条经验法则,记住要保持简单。
通过使用 CQRS
,我们可以在领域层记录所有发生的相关性事件。领域的状态可以通过重现之前记录的领域事件来呈现。我们只需要一个工具,用一致的方法来存储所有这些事件。所以我们需要储存事件。
事件源背后的基本原理是用一个线性的事件集来表现聚合的状态。
用 CQRS
,我们基本上可以实现如下:Post
实体用领域事件输出他的状态,但它的持久化,可以将对象映射至数据表。
事件源则更进一步。按照之前的做法,如果我们使用数据表存储所有博客帖子的状态,那么另外一个表存储所有博客帖子评论的状态,依次类推。而使用事件源我们则只需要一张表:一个数据库中附加的单独的一张表,来存储所有领域模型中的所有聚合发布的所有的领域事件。是的,你得看清了,是单独的一张表。
按照这种模型思路,像对象关系映射的工具就不再需要了。唯一需要的工具就是一个简单的数据抽象层,通过它来附加事件:
interface EventSourcedAggregateRoot{
public static function reconstitute(EventStream $events);}class Post extends AggregateRoot implements EventSourcedAggregateRoot{
public static function reconstitute(EventStream $history)
{
$post = new static($history->getAggregateId());
foreach ($events as $event) {
$post->applyThat($event);
}
return $post;
}}
现在 Post
聚合有一个方法,当给定一组事件集(或者说事件流)时,可以一步步重现状态直到当前状态,这些都在保存之前。下一步将构建一个 PostRepository
适配器端口从 Post
聚合中获取所有已发布的事件,并将它们添加到数据存储区,所有的事件都存储在这里。这就是我们所说的事件存储:
class EventStorePostRepository implements PostRepository{
private $eventStore;
private $projector; public function __construct($eventStore, $projector)
{
$this->eventStore = $eventStore;
$this->projector = $projector;
} public function save(Post $post)
{
$events = $post->recordedEvents();
$this->eventStore->append(new EventStream(
$post->id(),
$events)
);
$post->clearEvents();
$this->projector->project($events);
}}
这就是为什么 PostRepository
的实现看起来像我们使用一个事件存储来保存所有 Post
聚合发布的事件。现在我们需要一个方法,通过历史事件来重新存储一个聚合。Post
聚合实现的 reconsititute
方法,它通过事件触发来重建博客帖子状态,此刻派上用场:
class EventStorePostRepository implements PostRepository{
public function byId(PostId $id)
{
return Post::reconstitute(
$this->eventStore->getEventsFor($id)
);
}}
事件存储就像是负责关于保存和存储事件流的驮马。它的公共 API 由两个简单方法组成:它们是 append
和 getEventsFrom
. 前者追加一个事件流到事件存储,后者加载所有事件流来重建聚合。
我们可以通过一个键-值实现来存储所有事件:
class EventStore{
private $redis;
private $serializer; public function __construct($redis, $serializer)
{
$this->redis = $redis;
$this->serializer = $serializer;
} public function append(EventStream $eventstream)
{
foreach ($eventstream as $event) {
$data = $this->serializer->serialize(
$event, 'json'
);
$date = (new DateTimeImmutable())->format('YmdHis');
$this->redis->rpush(
'events:' . $event->getAggregateId(),
$this->serializer->serialize([
'type' => get_class($event),
'created_on' => $date,
'data' => $data
], 'json')
);
}
} public function getEventsFor($id)
{
$serializedEvents = $this->redis->lrange('events:' . $id, 0, -1);
$eventStream = [];
foreach ($serializedEvents as $serializedEvent) {
$eventData = $this->serializerdeserialize(
$serializedEvent,
'array',
'json'
);
$eventStream[] = $this->serializer->deserialize(
$eventData['data'],
$eventData['type'],
'json'
);
}
return new EventStream($id, $eventStream);
}}
这里的事件存储的实现是基于 Redis,一个广泛使用的键-值存储器。追加在列表里的事件使用一个 event
前缀:除此之外,在持久化这些事件之前,我们提取一些像类名或者创建时间之类的元数据,这些在之后会派上用场。
显然,就性能而言,聚合总是通过重现它的历史事件来达到最终状态是非常奢侈的。尤其是当事件流有成百上千个事件。克服这种局面最好的办法就是从聚合中拍摄一个快照,只重现快照拍摄后发生的事件。快照就是聚合状态在给定时刻的一个简单的序列化版本。它可以基于聚合的事件流的事件序号,或者基于时间。第一种方法,每 N 次事件触发时就要拍摄一次快照(例如每20,50,或者200次)。第二种方法,每 N 秒就要拍摄一次。
在下面的例子中,我们使用第一种方法。在事件的元数据中,我们添加一个附加字段,版本,即从我们开始重现聚合历史状态之处:
class SnapshotRepository{
public function byId($id)
{
$key = 'snapshots:' . $id;
$metadata = $this->serializer->unserialize(
$this->redis->get($key)
);
if (null === $metadata) {
return;
}
return new Snapshot(
$metadata['version'],
$this->serializer->unserialize(
$metadata['snapshot']['data'],
$metadata['snapshot']['type'],
'json'
)
);
} public function save($id, Snapshot $snapshot)
{
$key = 'snapshots:' . $id;
$aggregate = $snapshot->aggregate();
$snapshot = [
'version' => $snapshot->version(),
'snapshot' => [
'type' => get_class($aggregate),
'data' => $this->serializer->serialize(
$aggregate, 'json'
)
]
];
$this->redis->set($key, $snapshot);
}}
现在我们需要重构 EventStore
类,来让它使用 SnapshotRepository
在可接受的次数内加载聚合:
class EventStorePostRepository implements PostRepository{
public function byId(PostId $id)
{
$snapshot = $this->snapshotRepository->byId($id);
if (null === $snapshot) {
return Post::reconstitute(
$this->eventStore->getEventsFrom($id)
);
}
$post = $snapshot->aggregate();
$post->replay(
$this->eventStore->fromVersion($id, $snapshot->version())
);
return $post;
}}
我们只需要定期拍摄聚合快照。我们可以同步或者异步地通过监视事件存储进程来实现。下面的代码例子简单地演示了聚合快照的实现:
class EventStorePostRepository implements PostRepository{
public function save(Post $post)
{
$id = $post->id();
$events = $post->recordedEvents();
$post->clearEvents();
$this->eventStore->append(new EventStream($id, $events));
$countOfEvents = $this->eventStore->countEventsFor($id);
$version = $countOfEvents / 100;
if (!$this->snapshotRepository->has($post->id(), $version)) {
$this->snapshotRepository->save(
$id,
new Snapshot(
$post, $version
)
);
}
$this->projector->project($events);
}}
是否需要 ORM? 从这种架构风格的用例中明显可知,仅仅使用 ORM 来持久/读取 使用未免太过度了。就算我们使用关系型数据库来存储它们,我们也仅仅只是从事件存储中持久/读取事件而已。
在这一章,因为有大量可选的架构风格,你可能会感到一点困惑。为了做出明显的选择,你不得不在它们中考虑和权衡。不过一件事是明确的:大泥球是不可取的,因为代码很快就会变质。分层架构是一个更好的选择,但它也带来一些缺点,例如层与层之间的紧耦合。可以说,最合适的选择就是六边形架构,因为它可以作为一个基础的架构来使用,它能促进高层次的解耦并且带来内外应用间的对称性,这就是为什么我们在大多数场景下推荐使用它。
我们还可以看到 CQRS
和事件源这些相对灵活的架构,可以帮助你应对严重的复杂性。CQRS
和事件源都有它们的场景,但不要让它的魅力因素分散你判断它们本身提供的价值。由于它们都存在一些开销,你应该有技术原因来证明你必须得使用它。这些架构风格确实有用,在大量的 CQRS
仓储查找方法中,和事件源事件触发量上,你可以很快受到这些风格的启发。如果查找方法的数量开始增长,仓储层开始变得难以维护,那么是时候开始考虑使用 CQRS
来分离读写关注了。之后,如果每个聚合操作的事件量趋向于增长,业务也对更细粒度的信息感兴趣,那么一个选项就该考虑,转向事件源是否能够获得回报。
摘自Brian Foote和Joseph Yoder的一篇论文: 大泥球就是杂乱无章的,散乱泥泞的,牵连交织的意大利式面条代码。