专栏首页码匠的流水账聊聊go.cqrs的DomainRepository

聊聊go.cqrs的DomainRepository

本文主要研究一下go.cqrs的DomainRepository

DomainRepository

// DomainRepository is the interface that all domain repositories should implement.
type DomainRepository interface {
    //Loads an aggregate of the given type and ID
    Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error)

    //Saves the aggregate.
    Save(aggregate AggregateRoot, expectedVersion *int) error
}

DomainRepository定义了Load、Save方法

GetEventStoreCommonDomainRepo

// GetEventStoreCommonDomainRepo is an implementation of the DomainRepository
// that uses GetEventStore for persistence
type GetEventStoreCommonDomainRepo struct {
    eventStore         *goes.Client
    eventBus           EventBus
    streamNameDelegate StreamNamer
    aggregateFactory   AggregateFactory
    eventFactory       EventFactory
}

// Load will load all events from a stream and apply those events to an aggregate
// of the type specified.
//
// The aggregate type and id will be passed to the configured StreamNamer to
// get the stream name.
func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error) {

    if r.aggregateFactory == nil {
        return nil, fmt.Errorf("The common domain repository has no Aggregate Factory.")
    }

    if r.streamNameDelegate == nil {
        return nil, fmt.Errorf("The common domain repository has no stream name delegate.")
    }

    if r.eventFactory == nil {
        return nil, fmt.Errorf("The common domain has no Event Factory.")
    }

    aggregate := r.aggregateFactory.GetAggregate(aggregateType, id)
    if aggregate == nil {
        return nil, fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s", aggregateType)
    }

    streamName, err := r.streamNameDelegate.GetStreamName(aggregateType, id)
    if err != nil {
        return nil, err
    }

    stream := r.eventStore.NewStreamReader(streamName)
    for stream.Next() {
        switch err := stream.Err().(type) {
        case nil:
            break
        case *url.Error, *goes.ErrTemporarilyUnavailable:
            return nil, &ErrRepositoryUnavailable{}
        case *goes.ErrNoMoreEvents:
            return aggregate, nil
        case *goes.ErrUnauthorized:
            return nil, &ErrUnauthorized{}
        case *goes.ErrNotFound:
            return nil, &ErrAggregateNotFound{AggregateType: aggregateType, AggregateID: id}
        default:
            return nil, &ErrUnexpected{Err: err}
        }

        event := r.eventFactory.GetEvent(stream.EventResponse().Event.EventType)

        //TODO: No test for meta
        meta := make(map[string]string)
        stream.Scan(event, &meta)
        if stream.Err() != nil {
            return nil, stream.Err()
        }
        em := NewEventMessage(id, event, Int(stream.EventResponse().Event.EventNumber))
        for k, v := range meta {
            em.SetHeader(k, v)
        }
        aggregate.Apply(em, false)
        aggregate.IncrementVersion()
    }

    return aggregate, nil

}

// Save persists an aggregate
func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error {

    if r.streamNameDelegate == nil {
        return fmt.Errorf("The common domain repository has no stream name delagate.")
    }

    resultEvents := aggregate.GetChanges()

    streamName, err := r.streamNameDelegate.GetStreamName(typeOf(aggregate), aggregate.AggregateID())
    if err != nil {
        return err
    }

    if len(resultEvents) > 0 {

        evs := make([]*goes.Event, len(resultEvents))

        for k, v := range resultEvents {
            //TODO: There is no test for this code
            v.SetHeader("AggregateID", aggregate.AggregateID())
            evs[k] = goes.NewEvent("", v.EventType(), v.Event(), v.GetHeaders())
        }

        streamWriter := r.eventStore.NewStreamWriter(streamName)
        err := streamWriter.Append(expectedVersion, evs...)
        switch e := err.(type) {
        case nil:
            break
        case *goes.ErrConcurrencyViolation:
            return &ErrConcurrencyViolation{Aggregate: aggregate, ExpectedVersion: expectedVersion, StreamName: streamName}
        case *goes.ErrUnauthorized:
            return &ErrUnauthorized{}
        case *goes.ErrTemporarilyUnavailable:
            return &ErrRepositoryUnavailable{}
        default:
            return &ErrUnexpected{Err: e}
        }
    }

    aggregate.ClearChanges()

    for k, v := range resultEvents {
        if expectedVersion == nil {
            r.eventBus.PublishEvent(v)
        } else {
            em := NewEventMessage(v.AggregateID(), v.Event(), Int(*expectedVersion+k+1))
            r.eventBus.PublishEvent(em)
        }
    }

    return nil
}

GetEventStoreCommonDomainRepo定义了eventStore、eventBus、streamNameDelegate、aggregateFactory、eventFactory属性,其Load方法先通过r.aggregateFactory.GetAggregate获取aggregate,再通过r.streamNameDelegate.GetStreamName(aggregateType, id)获取streamName,然后通过r.eventStore.NewStreamReader去遍历event,挨个执行aggregate.Apply(em, false)及aggregate.IncrementVersion();其Save方法先通过aggregate.GetChanges()获取resultEvents,再遍历resultEvents构造goes.Event,之后通过streamWriter.Append写入,然后执行aggregate.ClearChanges(),最后执行r.eventBus.PublishEvent

小结

go.cqrs的DomainRepository定义了Load、Save方法;GetEventStoreCommonDomainRepo实现了DomainRepository接口,其Load方法主要是读取event,然后挨个执行aggregate.Apply;其Save方法主要是将aggregate.GetChanges()转换为event,然后通过streamWriter.Append写入,然后执行aggregate.ClearChanges(),最后执行r.eventBus.PublishEvent。

doc

  • go.cqrs

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-04-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊go.cqrs的DomainRepository

    go.cqrs的DomainRepository定义了Load、Save方法;GetEventStoreCommonDomainRepo实现了DomainRep...

    codecraft
  • 聊聊go.cqrs的AggregateRoot

    go.cqrs的AggregateRoot接口定义了AggregateID、OriginalVersion、CurrentVersion、IncrementVe...

    codecraft
  • 聊聊go.cqrs的Dispatcher

    go.cqrs的Dispatcher接口定义了Dispatch、RegisterHandler方法;InMemoryDispatcher定义了map[strin...

    codecraft
  • 聊聊go.cqrs的EventHandler

    go.cqrs的EventHandler定义了Handle方法;InternalEventBus的PublishEvent方法会遍历指定event.EventT...

    codecraft
  • 聊聊go.cqrs的AggregateRoot

    go.cqrs的AggregateRoot接口定义了AggregateID、OriginalVersion、CurrentVersion、IncrementVe...

    codecraft
  • 聊聊go.cqrs的Dispatcher

    go.cqrs的Dispatcher接口定义了Dispatch、RegisterHandler方法;InMemoryDispatcher定义了map[strin...

    codecraft
  • 聊聊go.cqrs的EventHandler

    go.cqrs的EventHandler定义了Handle方法;InternalEventBus的PublishEvent方法会遍历指定event.EventT...

    codecraft
  • 聊聊flink的TimeCharacteristic

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/Time...

    codecraft
  • 聊聊flink的SourceFunction

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

    codecraft
  • 聊聊flink的CsvReader

    flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.jav...

    codecraft
  • 聊聊flink的BoltWrapper

    flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper....

    codecraft
  • 聊聊flink的EventTime

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/func...

    codecraft
  • 聊聊flink的ParallelIteratorInputFormat

    本文主要研究一下flink的ParallelIteratorInputFormat

    codecraft
  • 聊聊flink的RichParallelSourceFunction

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

    codecraft
  • 聊聊flink的InputFormatSourceFunction

    flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/envi...

    codecraft
  • 聊聊flink的MemoryStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBacken...

    codecraft
  • 聊聊flink的OperatorStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorSta...

    codecraft
  • 聊聊flink的MemCheckpointStreamFactory

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointS...

    codecraft
  • 聊聊storm的OpaquePartitionedTridentSpoutExecutor

    本文主要研究一下storm的OpaquePartitionedTridentSpoutExecutor

    codecraft

扫码关注云+社区

领取腾讯云代金券