首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

在EventStore中重新连接到持久连接后,不要重复确认的事件

基础概念

EventStore 是一个开源的分布式事件存储系统,用于存储和检索不可变的事件流。它主要用于支持事件溯源(Event Sourcing)和CQRS(命令查询职责分离)架构。在EventStore中,事件是不可变的,一旦写入就不能更改或删除。

相关优势

  1. 不可变性:事件一旦写入,就不能更改或删除,这提供了数据的一致性和可靠性。
  2. 事件溯源:通过重放事件流,可以重建系统的状态,这对于调试和审计非常有用。
  3. 分布式架构:EventStore支持分布式部署,能够处理高并发和大数据量。
  4. 持久化:事件数据持久化存储,确保数据不会因为系统故障而丢失。

类型

  • 持久连接:客户端与EventStore之间保持长连接,用于实时接收事件。
  • 断线重连:当连接断开后,客户端会尝试重新连接到EventStore。

应用场景

  • 金融系统:用于记录交易历史,支持审计和合规性。
  • 电子商务平台:记录订单和支付事件,支持订单状态的追溯。
  • 游戏系统:记录玩家行为和游戏状态变化,支持游戏回放和数据分析。

问题描述

在EventStore中重新连接到持久连接后,不要重复确认的事件。

原因

当客户端重新连接到EventStore时,可能会重复接收之前已经确认的事件。这是因为EventStore在断线期间可能会缓存一些事件,当客户端重新连接时,这些事件会被重新发送。

解决方法

为了避免重复确认的事件,可以在客户端实现一个去重机制。具体步骤如下:

  1. 事件ID去重:每个事件都有一个唯一的ID,客户端可以维护一个已确认事件的ID列表。
  2. 检查事件ID:在接收到新事件时,先检查事件ID是否已经在已确认列表中。
  3. 更新已确认列表:如果事件ID不在已确认列表中,则处理该事件并将事件ID添加到已确认列表中。

示例代码

以下是一个简单的示例代码,展示如何在客户端实现事件去重:

代码语言:txt
复制
using System;
using System.Collections.Generic;
using EventStore.ClientAPI;

public class EventStoreClient
{
    private readonly IEventStoreConnection _connection;
    private readonly HashSet<string> _confirmedEventIds = new HashSet<string>();

    public EventStoreClient(string connectionString)
    {
        _connection = EventStoreConnection.Create(connectionString);
        _connection.Connected += OnConnected;
        _connection.Disconnected += OnDisconnected;
        _connection.ConnectAsync().Wait();
    }

    private void OnConnected(object sender, ClientConnectionEventArgs e)
    {
        var subscription = _connection.SubscribeToAllAsync(
            fromEvent: StreamPosition.Start,
            resolveLinkTos: true,
            maxSearchWindow: int.MaxValue,
            credentials: null,
            liveBufferSize: 500,
            readBatchSize: 500,
            checkpointAfterMs: 1000,
            checkpointMaxCount: 1000,
            checkpointToken: null,
            connectionClosed: (sender2, e2) => { },
            messagePump: null);

        subscription.Completed += (sender2, e2) =>
        {
            foreach (var resolvedEvent in subscription.Events)
            {
                if (!_confirmedEventIds.Contains(resolvedEvent.Event.Id))
                {
                    ProcessEvent(resolvedEvent.Event);
                    _confirmedEventIds.Add(resolvedEvent.Event.Id);
                }
            }
        };
    }

    private void OnDisconnected(object sender, ClientConnectionEventArgs e)
    {
        // Handle disconnection
    }

    private void ProcessEvent(EventData eventData)
    {
        // Process the event
        Console.WriteLine($"Processing event: {eventData.EventId}");
    }
}

参考链接

EventStore官方文档

通过上述方法,可以有效避免在重新连接到EventStore后重复确认事件的问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

基于Go语言使用NATS Streaming构建分布式系统和微服务

在各种事件驱动架构中,我强烈建议采用 Event Sourcing (事件源),这是一个以事件为中心的架构,通过组合各种事件来构建应用程序的状态。...NATS Streaming 目前不支持持久化日志的数据库系统,但我希望这个功能可以在近期到来, 这个功能也可以Event Sourcing 的实现中供你的分布式应用作为事件存储。...通过持久订阅,NATS Streaming 服务器即使在客户端连接关闭后也可以维护订阅者客户端的状态。持久订阅通过提供一个持久化的名称来创建。你还可以为队列组创建的订户客户端使用持久订阅。...在该演示示例中,当域事件发生时, 消息从eventstore应用程序发布,并且消息从以下三个用户的 “order-notificaton” 频道上订阅: restaurantservice orderquery-store1...因此,通过运行连接到NATS服务器集群的单个NATS Streaming 服务器来解决集群问题是一种解决方法。

12.3K51

什么是事件溯源模式?深度解析基本概念、实现和应用

作为软件开发领域的一种设计模式,事件溯源模式在构建可伸缩、灵活和可维护的系统中发挥着关键作用。...它的核心思想是将系统中的每次状态变化都表示为事件,并将这些事件持久化存储,以便可以重放它们来重新构建系统的状态。这种方法有助于跟踪系统中发生的所有变化,并提供了对历史数据的完整性和可追溯性。...事件存储(Event Store) 事件存储是一个持久化的存储系统,用于保存应用程序中发生的所有事件。这可以是数据库、日志文件或专门的事件存储系统。事件存储需要提供高性能、可扩展性和数据保真度。...聚合根(Aggregate Root) 聚合根是一个概念,代表了一组相关的领域对象。在事件溯源中,聚合根负责接收和应用事件,并确保它们按正确的顺序应用,以维护聚合根的一致性。...最佳实践 粒度控制: 选择适当的事件粒度,不要记录过于底层的状态变化,也不要将所有事务作为一个大事件记录。 版本管理: 对事件模型进行版本管理,以确保系统的演化不会影响旧有事件的处理。

55810
  • 译《领域驱动设计之PHP实现》架构风格(下)

    )); } public function byId(PostId $id) { return $this->em->find($id); }} Post实例和记录事件在同一个事务中触发和持久化...追加在列表里的事件使用一个 event 前缀:除此之外,在持久化这些事件之前,我们提取一些像类名或者创建时间之类的元数据,这些在之后会派上用场。...显然,就性能而言,聚合总是通过重现它的历史事件来达到最终状态是非常奢侈的。尤其是当事件流有成百上千个事件。克服这种局面最好的办法就是从聚合中拍摄一个快照,只重现快照拍摄后发生的事件。...从这种架构风格的用例中明显可知,仅仅使用 ORM 来持久/读取 使用未免太过度了。就算我们使用关系型数据库来存储它们,我们也仅仅只是从事件存储中持久/读取事件而已。...这些架构风格确实有用,在大量的 CQRS 仓储查找方法中,和事件源事件触发量上,你可以很快受到这些风格的启发。

    79020

    什么是事件溯源模式?深度解析基本概念、实现和应用

    作为软件开发领域的一种设计模式,事件溯源模式在构建可伸缩、灵活和可维护的系统中发挥着关键作用。...它的核心思想是将系统中的每次状态变化都表示为事件,并将这些事件持久化存储,以便可以重放它们来重新构建系统的状态。这种方法有助于跟踪系统中发生的所有变化,并提供了对历史数据的完整性和可追溯性。...事件存储(Event Store) 事件存储是一个持久化的存储系统,用于保存应用程序中发生的所有事件。这可以是数据库、日志文件或专门的事件存储系统。事件存储需要提供高性能、可扩展性和数据保真度。...聚合根(Aggregate Root) 聚合根是一个概念,代表了一组相关的领域对象。在事件溯源中,聚合根负责接收和应用事件,并确保它们按正确的顺序应用,以维护聚合根的一致性。...最佳实践 粒度控制: 选择适当的事件粒度,不要记录过于底层的状态变化,也不要将所有事务作为一个大事件记录。 版本管理: 对事件模型进行版本管理,以确保系统的演化不会影响旧有事件的处理。

    26610

    CQRS+ES项目解析-Equinox

    层的应用程序服务,应用程序服务将数据进行封装和转换,然后交给Domain层进行处理,Domain层则调用Infra相关的方法完成持久化、消息发布等功能。...感兴趣的朋友可以参照上篇文章进行了解。 EventStore EventStore也是ES的核心内容,负责对事件的存储、提取工作。...Equinox项目总结 通过分析Equinox项目的结构和代码,我们可以发现,这个项目并不是很完善,作者所说的不要用在生产环境是实话。...在这个项目中,对于ES的实现并不是很优雅,首先EventStore的操作,未提供查询事件的接口,从而导致了需要通过Repository来获取Event,破坏了EventStore的完整性;其次该项目没有完成事件重放功能...,我们只能通过事件查看到数据的变更,但是无法通过重放来获取项目的某个时段的状态的功能;最后,Equinox项目未实现读写分离,对于数据的查询和增加更新等操作都混合在一个Repository中,不利于我们进行读写分离

    63450

    大数据Canal(三):使用Canal同步MySQL数据

    2、Canal同步MySQL数据原理EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog...EventSink是连接EventParser和EventStore的桥梁。EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。...void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作。...Kafka中json格式如下:关于以上json字段解析如下:data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据。...es:事件时间,13位的时间戳。id:事件操作的序列号,1,2,3...isDdl:是否是DDL操作。mysqlType:字段类型。old:旧数据。pkNames:主键名称。sql:SQL语句。

    3K41

    RabbitMQ知识点整理总结

    1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。 2.向Broker请求消费响应的队列中消息,可能会设置响应的回调函数。...一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。...接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。...这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。...下面罗列几种特殊情况: 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。

    65010

    当我们在讨论CQRS时,我们在讨论些神马?

    它把对象的创建、修改、删除等一系列的操作都当作事件(注意:事件和命令还有区别,后面会讲到),持久化的时候只存储事件,存储事件的介质叫做EventStore,当要获取一个对象的最新状态时,通过EventStore...检索该对象的所有Event并重新加载来获取对象的最新状态。...Command和Event 在CQRS+ES的方案中,我们要面对这两个概念,命令和事件。 Command:描述了用户的意图。 Event:描述了对象状态的改变。...当我们基于消息来实现CQRS中的命令和事件发布的时候,我们的系统将会更加的灵活可扩展。...数据审计 数据审计是CQRS带给我们的另一个便利。由于我们存储了所有事件,当我们要获取对象变更记录的时候,只需要将EventStore中的记录查询出来,便可以看到整个的生命周期。

    50930

    rabbitmq基本原理_计算尺使用的是什么原理

    rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。...设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失 设置为临时队列,queue中的数据在系统重启之后就会丢失 设置为自动删除的队列,当不存在用户连接到...另外,ProtoBuf具有速度和空间的优势,使得它现在应用非常广泛; rabbitmq组件断链重连机制 方案一: Rabbitmq在启动时,为rabbitmq设置一个status,在第一次建立连接的时候将其变为...关于消息的重复执行 首先我们可以确认的是,触发消息重复执行的条件会是很苛刻的! 也就说 在大多数场景下不会触发该条件!!! 一般出在任务超时,或者没有及时返回状态,引起任务重新入队列,重新消费!...在rabbtimq里连接的断开也会触发消息重新入队列。 消费任务类型最好要支持幂等性,这样的好处是 任务执行多少次都没关系,顶多消耗一些性能! 如果不支持幂等,比如发送信息?

    29720

    EMQX Enterprise 4.4.12&4.4.13 发布:集群负载重平衡、TDengine 3.0 适配以及子表批量插入

    一方面,在跨版本升级、垂直或水平扩展时要求关闭 EMQX 节点,这会导致节点上所有连接几乎同时断开并重连,增加了集群过载的风险,与此同时非持久会话也将在节点关闭时丢失。...启用节点疏散后,当前节点将停止接受 MQTT 新连接,并将所有连接及会话转移到指定节点,在此过程中客户端通过重连或 MQTT 5.0 Server redirection 机制,经历短暂的断开后会迅速连接到新节点...为修复 Kafka 集成的连接问题,为 Kafka 资源 SSL 连接配置增加 SNI 字段,能够方便的连接到诸如 Confluent Cloud 等启用了 TLS 且集群部署的 Kafka 资源中。...持久会话的 MQTT 客户端重新连接 EMQX 之后,未确认的 QoS1/QoS2 消息不再周期性重发,该行为符合协议规范。 在此之前由 znone.....retry_interval 配置指定该消息的重发间隔(默认为 30s),但当持久会话的 MQTT 客户端重新连接 EMQX 之后,EMQX 只会将队列中缓存的未被确认的消息重发一次而不是按配置的时间间隔重试

    1.4K20

    iOS开发之EventKit框架的应用

    对于系统的Reminders,其主要作用是提供事件列表,用户可以向事件列表中添加提醒事件,同样,提供默认创建了两个事件列表,用户也可以根据需要自行创建新的事件列表,如下图: ?...三、日历事件操作       第三方应用需要操作用户的日历事件,需要获取用户授权,首先需要在info.plist文件中添加如下权限请求字段: ?...:nil]; } } } 三、提醒事件       提醒事件的用法和日历事件的用法基本一致,首先在Reminder应用中,每一个列表就是一个日历,下面代码示例了向列表中插入提醒事件的方法...四、EKEventStore详解       EKEventStore类是EventKit中的核心类,用来对日历和提醒的事件进行操作。...EKEvent对应系统日历中的事件,EKReminder对应系统提醒应用中的事件。

    4.1K51

    面试之ActiveMQ

    这得从 ActiveMQ 的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的节点中配置。...虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除。 那如果文件增大到达了配置中的最大限制的时候会发生什么?...整个系统可连接,但是无法提供服务,就这样挂了。 具体原因不详,解决方案:尽量不要用非持久化消息,非要用的话,将临时文件限制尽可能的调大。 丢消息怎么办?...解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit() 方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。 持久化消息非常慢。...在这种情况下,在 onMessage 方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。

    42100

    面试必备(背)-RabbitMQ八股文系列!

    Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。 向Broker请求消费响应的队列中消息,可能会设置响应的回调函数。...,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。...先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除; 但是因为网络传输等等故障,确认信息没有传送到消息队列...在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重的依据,避免同一条消息被重复消费 保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响...消息列表丢失消息: 消息持久化。 处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。 这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。

    2K20

    RabbitMQ和Kafka到底怎么选(二)

    队列A的consumer在消费的时候,机器宕机,此时客户端和服务端分别做如下动作: 服务端:把mirror queue提升为master queue 客户端:连接到新的master queue 所在的节点进行消费或者生产...时,会把宕机前正在进行消费的的消息全部重新发送一遍,即客户端重连后,消息可能被重复消费,这个时候就必须依靠应用层逻辑来判断来避免重复消费。...在持久化方面,RabbitMQ的master queue每次收到新消息后,都会立刻写入磁盘,并把消息同步给mirror queue。...mirror queue被提升为master queue时,消费者连接到新的master queue上进行消费时就丢了一条消息。...从吞吐量上看,在不要求消息顺序情况下,Kafka完胜;在要求消息先后顺序的场景,性能应该稍逊RabbitMQ(此时Kafka的分片数只能为1)。

    51610

    精选RabbitMQ面试题

    Producer 先连接到 Broker,建立连接 Connection,开启一个信道(Channel)。 向 Broker 请求消费响应的队列中消息,可能会设置响应的回调函数。...下面罗列几种特殊情况: 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。...只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。 这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。...保证数据的最终一致性;下面罗列几种特殊情况: 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。...消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。 场景演示:需求:用户在系统中创建一个订单,如果超过时间用户没有进行支付,那么自动取消订单。

    1.6K21

    为什么最近每份 Android 简历都说 “熟悉 MQTT 协议”?

    如果连接到 broker 时已经有一个重名的 clientId,那么会先断开现有 client 的连接,这将可能导致断开和连接的死循环,因为大多数 MQTT client 有断线重连机制; CleanSession...持久会话: 当 client 连接到 broker 时,可以使用持久连接或非持久连接,CleanSession 标志决定是否使用持久连接(当 CleanSession = 0 时表示持久连接),对于持久会话...4.2 会话状态 当 client 连接到 broker 时,可以使用持久连接或非持久连接,这是通过 CONNECT 消息中的 CleanSession 标志来决定的(当 CleanSession =...4.5 消息重传 标记 DUP = 1 的消息是被重复发送的消息,MQTT 消息重传有 2 种场景: 1、PUBLISH / PUBREL 消息发送后,在规定时间内没有收到确认应答消息,则重传这个消息;...2、在使用持久会话时,client 重新连接后,broker 会自动重传未确认的消息。

    4.4K40

    MySQL Binlog同步HDFS的方案

    EventSink是连接EventParser和EventStore的桥梁。...(在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化) 流式api设计示意图如下: ?...一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance...使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可. canal example 部署 在需要同步的MySQL数据库中创建一个用户...Redis或alluxio中; 数据同步的使用方可以将数据保存到自己的数据库中; 由于kafka的日志是可以重复消费的,并且缓存一段时间,各个使用方可以通过消费kafka的日志来达到既能保持与数据库的一致性

    2.4K30

    长连接(socket)可靠消息架构与海量消息架构浅析

    与短连接不同,长连接不需要每次交换数据时重新建立连接。 在WebSocket或TCP长连接中,数据可以实时双向传输,而在HTTP长连接中,通信仍遵循请求-响应模式,但多个请求可以复用同一个连接。...客户端确认逻辑: 当客户端发送消息后,它应该等待服务器的确认响应。如果客户端没有在预定时间内收到确认,它可能会选择重发消息或记录失败事件。...客户端应实现机制以区分新消息和重发消息,避免在服务器端造成重复处理。 服务器端确认逻辑: 服务器接收到客户端的消息后,应当进行处理,并发送一个确认响应回客户端。...持久化 消息持久化是确保消息不会因为系统故障而丢失的重要机制,在服务器端收到消息后,应该先将其持久化存储,然后再进行处理。...如果是存在大量实时数据的传输,我们去采用客户端消息持久化,这样如果断开连接后,我们不需要消耗服务度的CPU和带宽资源来推送之前的数据包,这样重连机制也更加顺畅。

    57420

    「从零单排canal 05」 server模块源码解析

    ,第一个线程池是Accept线程池,第二个线程池是woker线程池,Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。...主要区别在于,客户端获取batch后,自动ack,这样相对来说肯定更快,但是无法保证可靠性。 在项目中看起来暂时没有使用,我们就不展开了。 4.6 ack方法 进行 batch id 的确认。...确认之后,小于等于此 batchId 的 Message 都会被确认。...从metaManager中移除batchId对应的记录 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始 已经ack的数据,在eventStore中清除 ?...回滚的本质,就是把所有还没ack的batchId都清空,流式api被get但是还没ack的消息会被重新get。

    65720

    2022 最新 RabbitMQ 面试题

    一旦消息被投递到目的队列后, 或者消息被写入磁盘后( 可持久化的消息), 信 道会发送一个确认给生产者( 包含消息唯一 ID)。...这里并没有用到超时机制, RabbitMQ 仅通过 Consumer 的连接中断来确认是否 需要重新发送消息。...保证数据的最终一致性; 下面罗列几种特殊情况 如果消费者接收到消息, 在确认之前断开了连接或取消订阅, RabbitMQ 会认为 消息没有被分发, 然后重新分发给下一个订阅的消费者。...在消息生产时, MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id, 作 为去重的依据( 消息投递失败并重传), 避免重复的消息进入队列; 在消息消费时 ,要求消息体中必须要有一个 bizId...如 果持久化消息在被消费之前 RabbitMQ 重启 , 那么 Rabbit 会自动重建交换器和队列( 以及绑定 ),并重新发布持久化日志文件 中的消息到合适的队列。

    16710
    领券