前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊cheddar的MessageSender

聊聊cheddar的MessageSender

作者头像
code4it
发布2021-04-16 14:12:03
2340
发布2021-04-16 14:12:03
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下cheddar的MessageSender

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

代码语言:javascript
复制
public interface MessageSender<T extends Message> {

    /**
     * Send a message
     * @param message Message to send
     * @throws MessageSendException
     */
    void send(T message) throws MessageSendException;

    /**
     * Send a message, where the message is not visible to receivers for the specified delay duration
     * @param message Message to send
     * @param delaySeconds Duration for which sent message is invisible to receivers
     * @throws MessageSendException
     */
    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
}

MessageSender接口定义了send、sendDelayedMessage方法

MessageSenderImpl

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSenderImpl.java

代码语言:javascript
复制
public class MessageSenderImpl<T extends Message> implements MessageSender<T> {

    private final MessageQueue<T> messageQueue;

    public MessageSenderImpl(final MessageQueue<T> messageQueue) {
        this.messageQueue = messageQueue;
    }

    @Override
    public void send(final T message) throws MessageSendException {
        messageQueue.send(message);
    }

    @Override
    public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {
        messageQueue.sendDelayedMessage(message, delaySeconds);
    }

}

MessageSenderImpl实现了MessageSender接口,其send方法委托给了messageQueue.send;其sendDelayedMessage方法委托给了messageQueue.sendDelayedMessage

MessageQueue

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageQueue.java

代码语言:javascript
复制
public interface MessageQueue<T extends Message> {

    /**
     * @return The queue name
     */
    String getName();

    /**
     * Send a message to this message queue
     * @param message Message to send
     * @throws MessageSendException
     */
    void send(T message) throws MessageSendException;

    /**
     * Send a message to this message queue; the message is not visible to receivers for the specified delay duration
     * @param message Message to send
     * @param delaySeconds Duration for which sent message is invisible to receivers
     * @throws MessageSendException
     */
    void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;

    /**
     * Receives any number of messages on this queue, but does not delete them. No order or priority of messages is
     * guaranteed.
     * @return List of received {@code Message}s
     * @throws MessageReceiveException
     */
    List<T> receive() throws MessageReceiveException;

    /**
     * Receives any number of messages on this queue up to the maximum specified, but does not delete them. No order or
     * priority of messages is guaranteed. This call will spend up to the wait time given for a message to arrive in the
     * queue before returning.
     * @param waitTimeSeconds The duration (in seconds) for which the call will wait for a message to arrive in the
     *            queue before returning. If a message is available, the call will return sooner.
     * @param maxMessages The maximum number of messages to return. Will never return more messages than this value but
     *            may return fewer. Values can be from 1 to 10.
     * @return List of received {@code Message}s
     * @throws MessageReceiveException
     */
    List<T> receive(int waitTimeSeconds, int maxMessages) throws MessageReceiveException;

    /**
     * Deletes a message previously received from this queue.
     * @param typedMessage {@code Message} to delete
     * @throws MessageDeleteException
     */
    void delete(T message) throws MessageDeleteException;

}

MessageQueue接口定义了getName、send、sendDelayedMessage、receive、delete方法

InMemoryMessageQueue

Cheddar/cheddar/cheddar-integration-mocks/src/main/java/com/clicktravel/infrastructure/messaging/inmemory/InMemoryMessageQueue.java

代码语言:javascript
复制
public class InMemoryMessageQueue<T extends Message> implements MessageQueue<T>, Resettable {

    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Queue<T> queue = new ConcurrentLinkedQueue<>();
    private final String name;
    private final InMemoryMessageQueuePoller inMemoryMessageQueuePoller;

    @SuppressWarnings("unchecked")
    public InMemoryMessageQueue(final String name, final InMemoryMessageQueuePoller inMemoryMessageQueuePoller,
            final InMemoryExchange<T>... inMemoryExchanges) {
        this.name = name;
        this.inMemoryMessageQueuePoller = inMemoryMessageQueuePoller;

        final List<String> exchangeNames = new ArrayList<>();
        for (final InMemoryExchange<T> inMemoryExchange : inMemoryExchanges) {
            inMemoryExchange.addSubscriber(this);
            exchangeNames.add(inMemoryExchange.getName());
        }
        logger.info("Using in memory message queue: " + name + " with subscriptions to these exchanges: ["
                + StringUtils.join(exchangeNames) + "]");
    }

    @Override
    public void send(final T message) {
        queue.add(message);
        inMemoryMessageQueuePoller.poll();
    }

    @Override
    public void sendDelayedMessage(final T message, final int delaySeconds) {
        send(message); // delay not supported
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public List<T> receive(final int waitTimeSeconds, final int maxMessages) {
        return receive();
    }

    @Override
    public List<T> receive() {
        final T message = queue.peek();
        final List<T> messages = new ArrayList<T>(1);
        if (message != null) {
            messages.add(message);
        }
        return messages;
    }

    @Override
    public void delete(final T message) {
        queue.remove(message);
    }

    @Override
    public String toString() {
        return "InMemoryMessageQueue [name=" + name + ", queue=" + queue + "]";
    }

    @Override
    public void reset() {
        queue.clear();
    }
}

InMemoryMessageQueue实现了MessageQueue、Resettable接口,它定义了ConcurrentLinkedQueue及InMemoryMessageQueuePoller两个属性;send方法会往queue添加message,然后执行inMemoryMessageQueuePoller.poll();sendDelayedMessage方法目前不支持

SqsMessageQueue

Cheddar/cheddar/cheddar-integration-aws/src/main/java/com/clicktravel/infrastructure/messaging/aws/sqs/SqsMessageQueue.java

代码语言:javascript
复制
public abstract class SqsMessageQueue<T extends Message> implements MessageQueue<T> {

    private final SqsQueueResource sqsQueueResource;

    public SqsMessageQueue(final SqsQueueResource sqsQueueResource) {
        this.sqsQueueResource = sqsQueueResource;
    }

    protected abstract String toSqsMessageBody(final T message);

    protected abstract T toMessage(final com.amazonaws.services.sqs.model.Message sqsMessage);

    @Override
    public String getName() {
        return sqsQueueResource.getQueueName();
    }

    @Override
    public void send(final T message) throws MessageSendException {
        try {
            sqsQueueResource.sendMessage(toSqsMessageBody(message));
        } catch (final AmazonClientException e) {
            throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()
                    + "]", e);
        }
    }

    @Override
    public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {
        try {
            sqsQueueResource.sendDelayedMessage(toSqsMessageBody(message), delaySeconds);
        } catch (final AmazonClientException e) {
            throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()
                    + "]", e);
        }
    }

    @Override
    public List<T> receive() throws MessageReceiveException {
        try {
            return toMessages(sqsQueueResource.receiveMessages());
        } catch (final AmazonClientException e) {
            throw new MessageReceiveException("Unable to receive messages on SQS queue:["
                    + sqsQueueResource.getQueueName() + "]", e);
        }
    }

    @Override
    public List<T> receive(final int waitTimeSeconds, final int maxMessages) throws MessageReceiveException {
        try {
            return toMessages(sqsQueueResource.receiveMessages(waitTimeSeconds, maxMessages));
        } catch (final AmazonClientException e) {
            throw new MessageReceiveException("Unable to receive messages on SQS queue:["
                    + sqsQueueResource.getQueueName() + "]", e);
        }
    }

    private List<T> toMessages(final List<com.amazonaws.services.sqs.model.Message> sqsMessages) {
        final ArrayList<T> messages = new ArrayList<>();
        for (final com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages) {
            messages.add(toMessage(sqsMessage));
        }
        return messages;
    }

    @Override
    public void delete(final T message) throws MessageDeleteException {
        try {
            sqsQueueResource.deleteMessage(message.getReceiptHandle());
        } catch (final AmazonClientException e) {
            throw new MessageDeleteException("Unable to delete message on SQS queue:["
                    + sqsQueueResource.getQueueName() + "]", e);
        }
    }

    public SqsQueueResource getSqsQueue() {
        return sqsQueueResource;
    }
}

SqsMessageQueue是个抽象类,声明实现MessageQueue接口,其send方法委托给了sqsQueueResource.sendMessage;其sendDelayedMessage方法委托给了sqsQueueResource.sendDelayedMessage

小结

cheddar的MessageSender接口定义了send、sendDelayedMessage方法;MessageSenderImpl实现了MessageSender接口,其send方法委托给了messageQueue.send;其sendDelayedMessage方法委托给了messageQueue.sendDelayedMessage;InMemoryMessageQueue和SqsMessageQueue提供了两种实现,其中inMemory的实现不支持sendDelayedMessage方法。

doc

  • Cheddar
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MessageSender
  • MessageSenderImpl
  • MessageQueue
    • InMemoryMessageQueue
      • SqsMessageQueue
      • 小结
      • doc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档