我正在尝试创建一个命令处理程序,它将命令视为多个子命令。每个子命令都会生成一个事件(然后应该会更新聚合的状态)。每个子命令的处理依赖于聚合的状态是最新的(来自上一个子命令)。
例如,考虑以下聚合:
package axon.poc
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventsourcing.EventSourcingHandler
import org.axonframework.modelling.command.AggregateIdentifier
import org.axonframework.modelling.command.AggregateLifecycle
import org.axonframework.spring.stereotype.Aggregate
import org.slf4j.LoggerFactory
import java.util.UUID
@Aggregate
class Aggregate() {
companion object {
private val logger = LoggerFactory.getLogger(Aggregate::class.java)
}
@AggregateIdentifier
internal var aggregateId: UUID? = null
private var value: Int = 0
@CommandHandler
constructor(command: Command): this() {
logger.info("generating create event")
var applyMore = AggregateLifecycle.apply(CreatedEvent(command.aggregateId))
for (i in 0 until command.value) {
applyMore = applyMore.andThenApply {
logger.info("generating update event: ${value+1}")
UpdatedEvent(command.aggregateId, value+1)
}
}
logger.info("completed command handler")
}
@EventSourcingHandler
fun on(event: CreatedEvent) {
logger.info("event sourcing handler: $event")
this.aggregateId = event.aggregateId
this.value = 0
}
@EventSourcingHandler
fun on(event: UpdatedEvent) {
logger.info("event sourcing handler: $event")
this.value = event.value
}
}当此代码处理Command(value = 2)时,它会生成
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating create event
[main] INFO org.axonframework.spring.stereotype.Aggregate - completed command handler
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: CreatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)在执行applyMore之前处理第一个事件(CreatedEvent)。然而,即使applyMore是链式的,UpdatedEvent也不会由事件源处理程序处理,直到两者都生成。
我一直在期待(并且希望):
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating create event
[main] INFO org.axonframework.spring.stereotype.Aggregate - completed command handler
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: CreatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 2
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=2)这是Axon的bug吗?还是我误解了应该如何使用它?如何以原子方式处理大量的“命令”?即。全部通过或全部失败。
发布于 2019-10-17 15:30:43
TLDR;这是个bug。
你遇到了一个只在构造函数中出现的非常特殊的情况。从Axon的角度来看,挑战在于您需要一个实例来应用事件。但是,该实例仅在构造函数完成后才可用。
andThenApply函数就是为此目的而提供的(并且您使用它是正确的)。然而,在您的情况下,代码被(错误地)评估得有点太快了。我不得不在本地运行您的代码并进行调试,以找出到底发生了什么。
根本原因是AnnotatedAggregate的andThenApply实现调用了apply,而不是publish。前者将看到它当前正在执行延迟的任务,并在这些任务结束时安排实际的发布。下一个任务也是这样做的。因此,这两个事件最终都是先创建的,然后在它们都创建之后发布。
您是否有兴趣将此作为问题提交到Axon issue tracker中?如此一来,学分就会出现在它们所属的地方。
https://stackoverflow.com/questions/58425444
复制相似问题