前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊openmessaging的MessagingAccessPoint

聊聊openmessaging的MessagingAccessPoint

作者头像
code4it
发布2018-09-17 17:06:12
6290
发布2018-09-17 17:06:12
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下openmessaging的MessagingAccessPoint

MessagingAccessPoint

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java

代码语言:javascript
复制
/**
 * An instance of {@code MessagingAccessPoint} may be obtained from {@link OMS}, which is capable of creating {@code
 * Producer}, {@code Consumer}, {@code ResourceManager}, and other facility entities.
 * <p>
 * For example:
 * <pre>
 * MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east:default_space");
 * messagingAccessPoint.startup();
 * Producer producer = messagingAccessPoint.createProducer();
 * producer.startup();
 * producer.send(producer.createBytesMessage("HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
 * </pre>
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface MessagingAccessPoint extends ServiceLifecycle {

    /**
     * Returns the target OMS specification version of the specified vendor implementation.
     *
     * @return the OMS version of implementation
     * @see OMS#specVersion
     */
    String implVersion();

    /**
     * Returns the attributes of this {@code MessagingAccessPoint} instance.
     * <p>
     * There are some standard attributes defined by OMS for {@code MessagingAccessPoint}:
     * <ul>
     * <li> {@link OMSBuiltinKeys#ACCESS_POINTS}, the specified access points.
     * <li> {@link OMSBuiltinKeys#DRIVER_IMPL}, the fully qualified class name of the specified MessagingAccessPoint's
     * implementation, the default value is {@literal io.openmessaging.<driver_type>.MessagingAccessPointImpl}.
     * <li> {@link OMSBuiltinKeys#REGION}, the region the resources reside in.
     * <li> {@link OMSBuiltinKeys#ACCOUNT_ID}, the ID of the specific account system that owns the resource.
     * </ul>
     *
     * @return the attributes
     */
    KeyValue attributes();

    /**
     * Creates a new {@code Producer} for the specified {@code MessagingAccessPoint}.
     *
     * @return the created {@code Producer}
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    Producer createProducer();

    /**
     * Creates a new {@code Producer} for the specified {@code MessagingAccessPoint}
     * with some preset attributes.
     *
     * @param attributes the preset attributes
     * @return the created {@code Producer}
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    Producer createProducer(KeyValue attributes);

    /**
     * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}.
     * The returned {@code PushConsumer} isn't attached to any queue,
     * uses {@link PushConsumer#attachQueue(String, MessageListener)} to attach queues.
     *
     * @return the created {@code PushConsumer}
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    PushConsumer createPushConsumer();

    /**
     * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes.
     *
     * @param attributes the preset attributes
     * @return the created {@code PushConsumer}
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    PushConsumer createPushConsumer(KeyValue attributes);

    /**
     * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}.
     *
     * @return the created {@code PullConsumer}
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    PullConsumer createPullConsumer();

    /**
     * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes.
     *
     * @param attributes the preset attributes
     * @return the created {@code PullConsumer}
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    PullConsumer createPullConsumer(KeyValue attributes);

    /**
     * Creates a new {@code StreamingConsumer} for the specified {@code MessagingAccessPoint}.
     *
     * @return the created {@code Stream}
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    StreamingConsumer createStreamingConsumer();

    /**
     * Creates a new {@code StreamingConsumer} for the specified {@code MessagingAccessPoint} with some preset
     * attributes.
     *
     * @param attributes the preset attributes
     * @return the created consumer
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    StreamingConsumer createStreamingConsumer(KeyValue attributes);

    /**
     * Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}.
     *
     * @return the resource manger
     * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
     * due to some internal error
     */
    ResourceManager resourceManager();
}
  • MessagingAccessPoint就类似OMS的工厂方法,聚合了创建各类对象的方法,比如createProducer,createPushConsumer,createPullConsumer,resourceManager

OMS

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/OMS.java

代码语言:javascript
复制
/**
 * The oms class provides some static methods to create a {@code MessagingAccessPoint}
 * from the specified OMS driver url and some useful util methods.
 * <p>
 * The complete OMS driver URL syntax is:
 * <p>
 * {@literal oms:<driver_type>://[account_id@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/<region>}
 * <p>
 * The first part of the URL specifies which OMS implementation is to be used, rocketmq is a
 * optional driver type.
 * <p>
 * The brackets indicate that the extra access points are optional, and a correct OMS driver url
 * needs at least one access point, which consists of hostname and port, like localhost:8081.
 *
 * @version OMS 1.0.0
 * @see ResourceManager
 * @since OMS 1.0.0
 */
public final class OMS {
    /**
     * Returns a {@code MessagingAccessPoint} instance from the specified OMS driver url.
     *
     * @param url the specified OMS driver url
     * @return a {@code MessagingAccessPoint} instance
     * @throws OMSRuntimeException if the factory fails to create a {@code MessagingAccessPoint} due to some driver url
     * some syntax error or internal error.
     */
    public static MessagingAccessPoint getMessagingAccessPoint(String url) {
        return getMessagingAccessPoint(url, OMS.newKeyValue());
    }

    /**
     * Returns a {@code MessagingAccessPoint} instance from the specified OMS driver url
     * with some preset attributes, which will be passed to MessagingAccessPoint's implementation
     * class as a unique constructor parameter.
     *
     * There are some standard attributes defined by OMS for this method,
     * the same as {@link MessagingAccessPoint#attributes()}
     *
     * @param url the specified OMS driver url
     * @return a {@code MessagingAccessPoint} instance
     * @throws OMSRuntimeException if the factory fails to create a {@code MessagingAccessPoint} due to some driver url
     * some syntax error or internal error.
     */
    public static MessagingAccessPoint getMessagingAccessPoint(String url, KeyValue attributes) {
        return MessagingAccessPointAdapter.getMessagingAccessPoint(url, attributes);
    }

    /**
     * Returns a default and internal {@code KeyValue} implementation instance.
     *
     * @return a {@code KeyValue} instance
     */
    public static KeyValue newKeyValue() {
        return new DefaultKeyValue();
    }

    /**
     * The version format is X.Y.Z (Major.Minor.Patch), a pre-release version may be denoted by appending a hyphen and a
     * series of dot-separated identifiers immediately following the patch version, like X.Y.Z-alpha.
     *
     * <p>
     * OMS version follows semver scheme partially.
     *
     * @see <a href="http://semver.org">http://semver.org</a>
     */
    public static String specVersion = "UnKnown";

    static {
        InputStream stream = OMS.class.getClassLoader().getResourceAsStream("oms.spec.properties");
        try {
            if (stream != null) {
                Properties properties = new Properties();
                properties.load(stream);
                specVersion = String.valueOf(properties.get("version"));
            }
        } catch (IOException ignore) {
        }
    }

    private OMS() {
    }
}
  • OMS可以理解为总入口,是MessagingAccessPoint的静态工厂方法
  • 它根据url来解析访问地址等信息,比如oms:rocketmq://alice@rocketmq.apache.org/us-east:default_space

ResourceManager

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/ResourceManager.java

代码语言:javascript
复制
/**
 * The {@code ResourceManager} is to provide a unified interface of resource management,
 * allowing developers to manage the namespace, queue and routing resources.
 * <p>
 * Create, set, get and delete are the four basic operations of {@code ResourceManager}.
 * <p>
 * {@code ResourceManager} also supports dynamic fetch and update of resource attributes.
 * <p>
 * {@link MessagingAccessPoint#resourceManager()} ()} is the unique method to obtain a {@code ResourceManager}
 * instance. Changes made through this instance will immediately apply to the message-oriented middleware (MOM) behind
 * {@code MessagingAccessPoint}.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface ResourceManager {
    /**
     * Creates a {@code Namespace} resource with some preset attributes.
     * <p>
     * A namespace wraps the OMS resources in an abstract concept that makes it appear to the users
     * within the namespace that they have their own isolated instance of the global OMS resources.
     *
     * @param nsName the name of the new namespace
     * @param attributes the preset attributes
     */
    void createNamespace(String nsName, KeyValue attributes);

    /**
     * Sets the attributes of the specific namespace, the old attributes will be replaced
     * by the provided attributes, only the provided key will be updated.
     *
     * @param nsName the specific namespace
     * @param attributes the new attributes
     * @throws OMSResourceNotExistException if the specific namespace does not exist
     */
    void setNamespaceAttributes(String nsName, KeyValue attributes) throws OMSResourceNotExistException;

    /**
     * Gets the attributes of the specific namespace.
     *
     * @param nsName the specific namespace
     * @return the attributes of namespace
     * @throws OMSResourceNotExistException if the specific namespace does not exist
     */
    KeyValue getNamespaceAttributes(String nsName) throws OMSResourceNotExistException;

    /**
     * Deletes an existing namespace resource.
     *
     * @param nsName the namespace needs to be deleted
     * @throws OMSResourceNotExistException if the specified namespace does not exist
     */
    void deleteNamespace(String nsName) throws OMSResourceNotExistException;

    /**
     * Gets the namespace list in the current {@code MessagingAccessPoint}.
     *
     * @return the list of all namespaces
     */
    List<String> listNamespaces();

    /**
     * Creates a {@code Queue} resource in the configured namespace with some preset attributes.
     * <p>
     * The standard OMS {@code Queue} schema must start with the {@code Namespace} prefix:
     * <p>
     * {@literal <namespace_name>://<queue_name>}
     *
     * @param queueName the name of the new queue
     * @param attributes the preset attributes
     * @throws OMSResourceNotExistException if the configured namespace does not exist or specified queue name is
     * not available
     */
    void createQueue(String queueName, KeyValue attributes) throws OMSResourceNotExistException;

    /**
     * Sets the attributes of the specified queue, the old attributes will be replaced
     * by the provided attributes, only the provided key will be updated.
     *
     * @param queueName the queue name
     * @param attributes the new attributes
     * @throws OMSResourceNotExistException if the specified queue or namespace does not exist
     */
    void setQueueAttributes(String queueName, KeyValue attributes) throws OMSResourceNotExistException;

    /**
     * Gets the attributes of the specified queue.
     *
     * @param queueName the queue name
     * @return the attributes of namespace
     * @throws OMSResourceNotExistException if the specified queue or namespace does not exist
     */
    KeyValue getQueueAttributes(String queueName) throws OMSResourceNotExistException;

    /**
     * Deletes an existing queue resource.
     *
     * @param queueName the queue needs to be deleted
     * @throws OMSResourceNotExistException if the specified queue or namespace does not exist
     */
    void deleteQueue(String queueName) throws OMSResourceNotExistException;

    /**
     * Gets the queue list in the specific namespace.
     *
     * @param nsName the specific namespace
     * @return the list of all queues
     * @throws OMSResourceNotExistException if the specific namespace does not exist
     */
    List<String> listQueues(String nsName) throws OMSResourceNotExistException;

    /**
     * Creates a {@code Routing} resource in the configured namespace with some preset attributes.
     * <p>
     * The standard OMS {@code Routing} schema must start with the {@code Namespace} prefix:
     * <p>
     * {@literal <namespace_name>://<routing_name>}
     *
     * @param routingName the name of the new routing
     * @param attributes the preset attributes
     * @throws OMSResourceNotExistException if the configured namespace does not exist or specified routing name is not
     * available
     */
    void createRouting(String routingName, KeyValue attributes) throws OMSResourceNotExistException;

    /**
     * Sets the attributes of the specified routing, the old attributes will be replaced
     * by the provided attributes, only the provided key will be updated.
     *
     * @param routingName the routing name
     * @param attributes the new attributes
     * @throws OMSResourceNotExistException if the specified routing or namespace does not exist
     */
    void setRoutingAttributes(String routingName, KeyValue attributes) throws OMSResourceNotExistException;

    /**
     * Gets the attributes of the specified routing.
     *
     * @param routingName the routing name
     * @return the attributes of routing
     * @throws OMSResourceNotExistException if the specified routing or namespace does not exist
     */
    KeyValue getRoutingAttributes(String routingName) throws OMSResourceNotExistException;

    /**
     * Deletes an existing routing resource.
     *
     * @param routingName the routing needs to be deleted
     * @throws OMSResourceNotExistException if the specified routing or namespace does not exist
     */
    void deleteRouting(String routingName) throws OMSResourceNotExistException;

    /**
     * Gets the routing list in the specific namespace.
     *
     * @param nsName the specific namespace
     * @return the list of all routings
     * @throws OMSResourceNotExistException if the specific namespace does not exist
     */
    List<String> listRoutings(String nsName) throws OMSResourceNotExistException;

    /**
     * Gets the stream list behind the specified queue.
     *
     * @param queueName the queue name
     * @return the list of all streams
     * @throws OMSResourceNotExistException if the specified queue or namespace does not exist
     */
    List<String> listStreams(String queueName) throws OMSResourceNotExistException;

    /**
     * Updates some system headers of a message in the configured namespace.
     * <p>
     * Below system headers are allowed to be changed dynamically:
     * <ul>
     * <li>{@link Message.BuiltinKeys#START_TIME}</li>
     * <li>{@link Message.BuiltinKeys#STOP_TIME}</li>
     * <li>{@link Message.BuiltinKeys#TIMEOUT}</li>
     * <li>{@link Message.BuiltinKeys#PRIORITY}</li>
     * <li>{@link Message.BuiltinKeys#SCHEDULE_EXPRESSION}</li>
     * </ul>
     *
     * @param queueName the specific queue the message resides in
     * @param messageId the id of message
     * @param headers the new headers
     * @throws OMSResourceNotExistException if the specified queue, namespace or message does not exist
     */
    void updateMessage(String queueName, String messageId, KeyValue headers) throws OMSResourceNotExistException;
}
  • 提供了namespace、queue、routing等的操作方法
  • namespace相关的操作有createNamespace、setNamespaceAttributes、deleteNamespace、listNamespaces、getNamespaceAttributes
  • queue相关的操作有createQueue、setQueueAttributes、getQueueAttributes、deleteQueue、listQueues
  • routing相关的操作有createRouting、setRoutingAttributes、getRoutingAttributes、deleteRouting、listRoutings
  • 通过也提供了listStreams以及updateMessage方法

小结

openmessaging的MessagingAccessPoint聚合了相关访问操作入口,可以理解为一个facade,另外OMS是MessagingAccessPoint的静态工厂,而ResourceManager则类似kafkaAdmin,用于操作管理namespace、queue、routing等。

doc

  • openmessaging.cloud
  • OpenMessaging Runtime Interface for Java
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MessagingAccessPoint
  • OMS
  • ResourceManager
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档