前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊cheddar的tx

聊聊cheddar的tx

作者头像
code4it
发布2021-04-16 13:14:51
5410
发布2021-04-16 13:14:51
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下cheddar的tx

MessageAction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessageAction.java

代码语言:javascript
复制
public class MessageAction {

    private final TypedMessage typedMessage;
    private final int delaySeconds;

    public MessageAction(final TypedMessage typedMessage, final int delaySeconds) {
        this.typedMessage = typedMessage;
        this.delaySeconds = delaySeconds;
    }

    public TypedMessage message() {
        return typedMessage;
    }

    public int delay() {
        return delaySeconds;
    }

    public void apply(final MessageSender<TypedMessage> messageSender) {
        if (delay() > 0) {
            messageSender.sendDelayedMessage(typedMessage, delaySeconds);
        } else {
            messageSender.send(typedMessage);
        }
    }

    public void apply(final MessagePublisher<TypedMessage> messagePublisher) {
        messagePublisher.publish(typedMessage);
    }

}

MessageAction定义了typedMessage、delaySeconds属性,它提供了两个apply方法,接收messageSender参数的apply方法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接收messagePublisher参数的apply方法执行messagePublisher.publish

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

代码语言:javascript
复制
public interface MessageSender<T extends Message> {

    /**
     * Send a message
     * @param message Message to send
     * @throws MessageSendException
     */
    void send(T message) throws MessageSendException;

    /**
     * Send a message, where the message is not visible to receivers for the specified delay duration
     * @param message Message to send
     * @param delaySeconds Duration for which sent message is invisible to receivers
     * @throws MessageSendException
     */
    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
}

MessageSender接口定义了send、sendDelayedMessage方法

TransactionalResource

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/TransactionalResource.java

代码语言:javascript
复制
public interface TransactionalResource {

    void begin() throws TransactionException;

    void commit() throws TransactionException;

    void abort() throws TransactionException;
}

TransactionalResource接口定义了begin、commit、abort方法

TransactionalMessageSender

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessageSender.java

代码语言:javascript
复制
public class TransactionalMessageSender implements MessageSender<TypedMessage>, TransactionalResource {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private final MessageSender<TypedMessage> messageSender;
    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();

    public TransactionalMessageSender(final MessageSender<TypedMessage> messageSender) {
        this.messageSender = messageSender;
    }

    private MessagingTransaction getCurrentTransaction() {
        if (currentTransaction.get() == null) {
            throw new NonExistentTransactionException();
        }
        return currentTransaction.get();
    }

    @Override
    public void begin() throws TransactionException {
        if (currentTransaction.get() != null) {
            throw new NestedTransactionException(currentTransaction.get());
        }
        currentTransaction.set(new MessagingTransaction());
        logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());
    }

    @Override
    public void commit() throws TransactionException {
        final MessagingTransaction transaction = getCurrentTransaction();
        logger.trace("Committing transaction: " + transaction.transactionId());
        transaction.applyActions(messageSender);
        currentTransaction.remove();
        logger.trace("Transaction successfully committed: " + transaction.transactionId());
    }

    @Override
    public void send(final TypedMessage typedMessage) throws MessageSendException {
        final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addMessage(typedMessage);
    }

    @Override
    public void sendDelayedMessage(final TypedMessage typedMessage, final int delay) throws MessageSendException {
        final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addDelayedMessage(typedMessage, delay);
    }

    @Override
    public void abort() throws TransactionException {
        currentTransaction.remove();
    }
}

TransactionalMessageSender实现了MessageSender、TransactionalResource接口;begin方法给currentTransaction设置新的MessagingTransaction;commit方法获取MessagingTransaction,执行applyActions方法,最后执行currentTransaction.remove();abort方法执行currentTransaction.remove()方法;send方法执行transaction.addMessage;sendDelayedMessage方法执行addDelayedMessage

MessagePublisher

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessagePublisher.java

代码语言:javascript
复制
public interface MessagePublisher<T extends Message> {

    /**
     * Forward a message for publication
     * @param message
     * @throws MessagePublishException
     */
    void publish(T message) throws MessagePublishException;

}

MessagePublisher接口定义了publish方法

TransactionalMessagePublisher

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/TransactionalMessagePublisher.java

代码语言:javascript
复制
public class TransactionalMessagePublisher implements MessagePublisher<TypedMessage>, TransactionalResource {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    private final MessagePublisher<TypedMessage> messagePublisher;
    private final ThreadLocal<MessagingTransaction> currentTransaction = new ThreadLocal<MessagingTransaction>();

    public TransactionalMessagePublisher(final MessagePublisher<TypedMessage> messagePublisher) {
        this.messagePublisher = messagePublisher;
    }

    private MessagingTransaction getCurrentTransaction() {
        if (currentTransaction.get() == null) {
            throw new NonExistentTransactionException();
        }
        return currentTransaction.get();
    }

    @Override
    public void begin() throws TransactionException {
        if (currentTransaction.get() != null) {
            throw new NestedTransactionException(currentTransaction.get());
        }
        currentTransaction.set(new MessagingTransaction());
        logger.trace("Beginning transaction: " + currentTransaction.get().transactionId());
    }

    @Override
    public void commit() throws TransactionException {
        final MessagingTransaction transaction = getCurrentTransaction();
        logger.trace("Committing transaction: " + transaction.transactionId());
        transaction.applyActions(messagePublisher);
        currentTransaction.remove();
        logger.trace("Transaction successfully committed: " + transaction.transactionId());
    }

    @Override
    public void publish(final TypedMessage typedMessage) throws MessagePublishException {
        final MessagingTransaction transaction = getCurrentTransaction();
        transaction.addMessage(typedMessage);
    }

    @Override
    public void abort() throws TransactionException {
        currentTransaction.remove();
    }
}

TransactionalMessagePublisher实现了MessagePublisher、TransactionalResource接口;begin方法给currentTransaction设置新的MessagingTransaction;commit方法获取MessagingTransaction,执行applyActions方法,最后执行currentTransaction.remove();abort方法执行currentTransaction.remove()方法;publish方法执行transaction.addMessage

Transaction

Cheddar/cheddar/cheddar-tx-api/src/main/java/com/clicktravel/cheddar/infrastructure/tx/Transaction.java

代码语言:javascript
复制
public interface Transaction {

    String transactionId();

}

Transaction接口定义了transactionId方法

MessagingTransaction

Cheddar/cheddar/cheddar-tx/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/tx/MessagingTransaction.java

代码语言:javascript
复制
public class MessagingTransaction implements Transaction {

    private final Queue<MessageAction> messageActions;

    private final String transactionId;

    public MessagingTransaction() {
        messageActions = new LinkedList<>();
        transactionId = UUID.randomUUID().toString();
    }

    @Override
    public String transactionId() {
        return transactionId;
    }

    public void applyActions(final MessagePublisher<TypedMessage> messagePublisher) {
        while (!messageActions.isEmpty()) {
            final MessageAction messageAction = messageActions.remove();
            messagePublisher.publish(messageAction.message());
        }
    }

    public void applyActions(final MessageSender<TypedMessage> messageSender) {
        while (!messageActions.isEmpty()) {
            final MessageAction messageAction = messageActions.remove();
            messageAction.apply(messageSender);
        }
    }

    public void addMessage(final TypedMessage typedMessage) {
        messageActions.add(new MessageAction(typedMessage, 0));
    }

    public void addDelayedMessage(final TypedMessage typedMessage, final int delay) {
        messageActions.add(new MessageAction(typedMessage, delay));
    }

}

MessagingTransaction方法实现了Transaction接口;其transactionId方法返回的是构造器生成的UUID;applyActions方法遍历messageAction,分别执行messagePublisher.publish及messageAction.apply(messageSender)

小结

cheddar的tx提供了TransactionalMessagePublisher、TransactionalMessageSender,它们都实现了TransactionalResource接口;其commit方法都执行了transaction.applyActions;MessageAction提供了两个apply方法,接收messageSender参数的apply方法当delay大于0时执行messageSender.sendDelayedMessage,否则执行messageSender.send(typedMessage);接收messagePublisher参数的apply方法执行messagePublisher.publish。

doc

  • Cheddar
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MessageAction
  • MessageSender
  • TransactionalResource
  • TransactionalMessageSender
  • MessagePublisher
  • TransactionalMessagePublisher
  • Transaction
  • MessagingTransaction
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档