聊聊openmessaging-java

本文主要研究一下openmessaging-java

maven

<dependency>
    <groupId>io.openmessaging</groupId>
    <artifactId>openmessaging-api</artifactId>
    <version>0.3.1-alpha</version>
</dependency>

maven最新的是0.3.1-alpha,这里直接用源码的0.3.2-alpha-SNAPSHOT

producer

Producer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java

/**
 * A {@code Producer} is a simple object used to send messages on behalf
 * of a {@code MessagingAccessPoint}. An instance of {@code Producer} is
 * created by calling the {@link MessagingAccessPoint#createProducer()} method.
 * <p>
 * It provides various {@code send} methods to send a message to a specified destination,
 * which is a {@code Queue} in OMS.
 * <p>
 * {@link Producer#send(Message)} means send a message to the destination synchronously,
 * the calling thread will block until the send request complete.
 * <p>
 * {@link Producer#sendAsync(Message)} means send a message to the destination asynchronously,
 * the calling thread won't block and will return immediately. Since the send call is asynchronous
 * it returns a {@link Future} for the send result.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface Producer extends MessageFactory, ServiceLifecycle {
    /**
     * Returns the attributes of this {@code Producer} instance.
     * Changes to the return {@code KeyValue} are not reflected in physical {@code Producer}.
     * <p>
     * There are some standard attributes defined by OMS for {@code Producer}:
     * <ul>
     * <li> {@link OMSBuiltinKeys#PRODUCER_ID}, the unique producer id for a producer instance.
     * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code Producer}.
     * </ul>
     *
     * @return the attributes
     */
    KeyValue attributes();

    /**
     * Sends a message to the specified destination synchronously, the destination should be preset to
     * {@link Message#sysHeaders()}, other header fields as well.
     *
     * @param message a message will be sent
     * @return the successful {@code SendResult}
     * @throws OMSMessageFormatException if an invalid message is specified.
     * @throws OMSTimeOutException if the given timeout elapses before the send operation completes
     * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.
     */
    SendResult send(Message message);

    /**
     * Sends a message to the specified destination synchronously, using the specified attributes, the destination
     * should be preset to {@link Message#sysHeaders()}, other header fields as well.
     *
     * @param message a message will be sent
     * @param attributes the specified attributes
     * @return the successful {@code SendResult}
     * @throws OMSMessageFormatException if an invalid message is specified.
     * @throws OMSTimeOutException if the given timeout elapses before the send operation completes
     * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.
     */
    SendResult send(Message message, KeyValue attributes);

    /**
     * Sends a transactional message to the specified destination synchronously, using the specified attributes,
     * the destination should be preset to {@link Message#sysHeaders()}, other header fields as well.
     * <p>
     * A transactional message will be exposed to consumer if and only if the local transaction
     * branch has been committed, or be discarded if local transaction has been rolled back.
     *
     * @param message a transactional message will be sent
     * @param branchExecutor local transaction executor associated with the message
     * @param attributes the specified attributes
     * @return the successful {@code SendResult}
     * @throws OMSMessageFormatException if an invalid message is specified.
     * @throws OMSTimeOutException if the given timeout elapses before the send operation completes
     * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.
     */
    SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes);

    /**
     * Sends a message to the specified destination asynchronously, the destination should be preset to
     * {@link Message#sysHeaders()}, other header fields as well.
     * <p>
     * The returned {@code Promise} will have the result once the operation completes, and the registered
     * {@code FutureListener} will be notified, either because the operation was successful or because of an error.
     *
     * @param message a message will be sent
     * @return the {@code Promise} of an asynchronous message send operation.
     * @see Future
     * @see FutureListener
     */
    Future<SendResult> sendAsync(Message message);

    /**
     * Sends a message to the specified destination asynchronously, using the specified attributes, the destination
     * should be preset to {@link Message#sysHeaders()}, other header fields as well.
     * <p>
     * The returned {@code Promise} will have the result once the operation completes, and the registered
     * {@code FutureListener} will be notified, either because the operation was successful or because of an error.
     *
     * @param message a message will be sent
     * @param attributes the specified attributes
     * @return the {@code Promise} of an asynchronous message send operation.
     * @see Future
     * @see FutureListener
     */
    Future<SendResult> sendAsync(Message message, KeyValue attributes);

    /**
     * Sends a message to the specified destination in one way, the destination should be preset to
     * {@link Message.BuiltinKeys}, other header fields as well.
     * <p>
     * There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't
     * care about the send result and also have no context to get the result.
     *
     * @param message a message will be sent
     */
    void sendOneway(Message message);

    /**
     * Sends a message to the specified destination in one way, using the specified attributes, the destination
     * should be preset to {@link Message.BuiltinKeys}, other header fields as well.
     * <p>
     * There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't
     * care about the send result and also have no context to get the result.
     *
     * @param message a message will be sent
     * @param properties the specified userHeaders
     */
    void sendOneway(Message message, KeyValue properties);

    /**
     * Creates a {@code BatchMessageSender} to send message in batch manner.
     *
     * @return a {@code BatchMessageSender} instance
     */
    BatchMessageSender createBatchMessageSender();

    /**
     * Adds a {@code ProducerInterceptor} to intercept send operations of producer.
     *
     * @param interceptor a producer interceptor
     */
    void addInterceptor(ProducerInterceptor interceptor);

    /**
     * Removes a {@code ProducerInterceptor}
     *
     * @param interceptor a producer interceptor will be removed
     */
    void removeInterceptor(ProducerInterceptor interceptor);
}
  • 这里提供了诸多发送消息的方法,也提供创建BatchMessageSender的方法
  • 消息这里封装为Message,设置的属性封装为KeyValue,发送结果封装为SendResult

Message

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/Message.java

/**
 * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is
 * {@link BytesMessage}.
 * <p>
 * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of
 * header and body and is used by separate applications to exchange a piece of information,
 * like <a href="http://rocketmq.apache.org/">Apache RocketMQ</a>.
 * <p>
 * The header contains fields used by the messaging system that describes the message's meta information,
 * like QoS level, origin, destination, and so on, while the body contains the application data being transmitted.
 * <p>
 * As for the header, OMS defines two kinds types:  System Header and User Header,
 * with respect to flexibility in vendor implementation and user usage.
 * <ul>
 * <li>
 * System Header, OMS defines some standard attributes that represent the characteristics of the message.
 * </li>
 * <li>
 * User Header, some OMS vendors may require enhanced extra attributes of the message
 * or some users may want to clarify some customized attributes to draw the body.
 * OMS provides the improved scalability for these scenarios.
 * </li>
 * </ul>
 * The body contains the application data being transmitted,
 * which is generally ignored by the messaging system and simply transmitted to its destination.
 * <p>
 * In BytesMessage, the body is just a byte array, may be compressed and uncompressed
 * in the transmitting process by the messaging system.
 * The application is responsible for explaining the concrete content and format of the message body,
 * OMS is never aware of that.
 *
 * The body part is placed in the implementation classes of {@code Message}.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface Message {
    /**
     * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}.
     *
     * @return the system headers of a {@code Message}
     * @see BuiltinKeys
     */
    KeyValue sysHeaders();

    /**
     * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}.
     *
     * @return the user headers of a {@code Message}
     */
    KeyValue userHeaders();

    /**
     * Puts a {@code String}-{@code int} {@code KeyValue} entry to the system headers of a {@code Message}.
     *
     * @param key the key to be placed into the system headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putSysHeaders(String key, int value);

    /**
     * Puts a {@code String}-{@code long} {@code KeyValue} entry to the system headers of a {@code Message}.
     *
     * @param key the key to be placed into the system headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putSysHeaders(String key, long value);

    /**
     * Puts a {@code String}-{@code double} {@code KeyValue} entry to the system headers of a {@code Message}.
     *
     * @param key the key to be placed into the system headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putSysHeaders(String key, double value);

    /**
     * Puts a {@code String}-{@code String} {@code KeyValue} entry to the system headers of a {@code Message}.
     *
     * @param key the key to be placed into the system headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putSysHeaders(String key, String value);

    /**
     * Puts a {@code String}-{@code int} {@code KeyValue} entry to the user headers of a {@code Message}.
     *
     * @param key the key to be placed into the user headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putUserHeaders(String key, int value);

    /**
     * Puts a {@code String}-{@code long} {@code KeyValue} entry to the user headers of a {@code Message}.
     *
     * @param key the key to be placed into the user headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putUserHeaders(String key, long value);

    /**
     * Puts a {@code String}-{@code double} {@code KeyValue} entry to the user headers of a {@code Message}.
     *
     * @param key the key to be placed into the user headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putUserHeaders(String key, double value);

    /**
     * Puts a {@code String}-{@code String} {@code KeyValue} entry to the user headers of a {@code Message}.
     *
     * @param key the key to be placed into the user headers
     * @param value the value corresponding to <tt>key</tt>
     */
    Message putUserHeaders(String key, String value);

    /**
     * Get message body
     *
     * @param type Message body type
     * @param <T> Generic type
     * @return message body
     * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type
     */
    <T> T getBody(Class<T> type) throws OMSMessageFormatException;

    interface BuiltinKeys {
        /**
         * The {@code MESSAGE_ID} header field contains a value that uniquely identifies
         * each message sent by a {@code Producer}.
         * <p>
         * When a message is sent, MESSAGE_ID is assigned by the producer.
         */
        String MESSAGE_ID = "MESSAGE_ID";

        /**
         * The {@code DESTINATION} header field contains the destination to which the message is being sent.
         * <p>
         * When a message is sent this value is set to the right {@code Queue}, then the message will be sent to
         * the specified destination.
         * <p>
         * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in.
         */
        String DESTINATION = "DESTINATION";

        //......
    }
}
  • Message接口主要定义了设置header以及获取body的方法
  • 另外还内置了BuiltinKeys的header,比如MESSAGE_ID,DESTINATION等

KeyValue

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java

/**
 * The {@code KeyValue} class represents a persistent set of attributes,
 * which supports method chaining.
 * <p>
 * A {@code KeyValue} object only allows {@code String} keys and can contain four primitive type
 * as values: {@code int}, {@code long}, {@code double}, {@code String}.
 * <p>
 * The {@code KeyValue} is a replacement of {@code Properties}, with simpler
 * interfaces and reasonable entry limits.
 * <p>
 * A {@code KeyValue} object may be used in concurrent scenarios, so the implementation
 * of {@code KeyValue} should consider concurrent related issues.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface KeyValue {
    /**
     * Inserts or replaces {@code int} value for the specified key.
     *
     * @param key the key to be placed into this {@code KeyValue} object
     * @param value the value corresponding to <tt>key</tt>
     */
    KeyValue put(String key, int value);

    /**
     * Inserts or replaces {@code long} value for the specified key.
     *
     * @param key the key to be placed into this {@code KeyValue} object
     * @param value the value corresponding to <tt>key</tt>
     */
    KeyValue put(String key, long value);

    /**
     * Inserts or replaces {@code double} value for the specified key.
     *
     * @param key the key to be placed into this {@code KeyValue} object
     * @param value the value corresponding to <tt>key</tt>
     */
    KeyValue put(String key, double value);

    /**
     * Inserts or replaces {@code String} value for the specified key.
     *
     * @param key the key to be placed into this {@code KeyValue} object
     * @param value the value corresponding to <tt>key</tt>
     */
    KeyValue put(String key, String value);

    /**
     * Searches for the {@code int} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, zero is returned.
     *
     * @param key the property key
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, int)
     */
    int getInt(String key);

    /**
     * Searches for the {@code int} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, the default value argument is returned.
     *
     * @param key the property key
     * @param defaultValue a default value
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, int)
     */
    int getInt(String key, int defaultValue);

    /**
     * Searches for the {@code long} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, zero is returned.
     *
     * @param key the property key
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, long)
     */
    long getLong(String key);

    /**
     * Searches for the {@code long} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, the default value argument is returned.
     *
     * @param key the property key
     * @param defaultValue a default value
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, long)
     */
    long getLong(String key, long defaultValue);

    /**
     * Searches for the {@code double} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, zero is returned.
     *
     * @param key the property key
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, double)
     */
    double getDouble(String key);

    /**
     * Searches for the {@code double} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, the default value argument is returned.
     *
     * @param key the property key
     * @param defaultValue a default value
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, double)
     */
    double getDouble(String key, double defaultValue);

    /**
     * Searches for the {@code String} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, {@code null} is returned.
     *
     * @param key the property key
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, String)
     */
    String getString(String key);

    /**
     * Searches for the {@code String} property with the specified key in this {@code KeyValue} object.
     * If the key is not found in this property list, the default value argument is returned.
     *
     * @param key the property key
     * @param defaultValue a default value
     * @return the value in this {@code KeyValue} object with the specified key value
     * @see #put(String, String)
     */
    String getString(String key, String defaultValue);

    /**
     * Returns a {@link Set} view of the keys contained in this {@code KeyValue} object.
     * <p>
     * The set is backed by the {@code KeyValue}, so changes to the set are
     * reflected in the @code KeyValue}, and vice-versa.
     *
     * @return the key set view of this {@code KeyValue} object.
     */
    Set<String> keySet();

    /**
     * Tests if the specified {@code String} is a key in this {@code KeyValue}.
     *
     * @param key possible key
     * @return <code>true</code> if and only if the specified key is in this {@code KeyValue}, <code>false</code>
     * otherwise.
     */
    boolean containsKey(String key);
}
  • 主要封装int、long、double、Sting这几种类型,其中key只能为String类型

SendResult

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/SendResult.java

/**
 * The result of sending a OMS message to server
 * with the message id and some attributes.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface SendResult {
    /**
     * The unique message id related to the {@code SendResult} instance.
     *
     * @return the message id
     */
    String messageId();
}
  • 目前仅仅定义一个返回messageId的方法

BatchMessageSender

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/BatchMessageSender.java

/**
 * A message sender created through {@link Producer#createBatchMessageSender()}, to send
 * messages in batch manner, and commit or roll back at the appropriate time.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface BatchMessageSender {
    /**
     * Submits a message to this sender
     *
     * @param message a message to be sent
     * @return this batch sender
     */
    BatchMessageSender send(Message message);

    /**
     * Commits all the uncommitted messages in this sender.
     *
     * @throws OMSRuntimeException if the sender fails to commit the messages due to some internal error.
     */
    void commit();

    /**
     * Discards all the uncommitted messages in this sender.
     */
    void rollback();

    /**
     * Close this sender.
     */
    void close();
}
  • 定义了send方法用于批量添加消息,这里的send语义不是太好,用pending好一些
  • 然后定义了commit用于提交批量消息、rollback用于回滚、close用于关闭这个sender

consumer

PullConsumer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.java

/**
 * A {@code PullConsumer} pulls messages from the specified queue,
 * and supports submit the consume result by acknowledgement.
 *
 * @version OMS 1.0.0
 * @see MessagingAccessPoint#createPullConsumer()
 * @since OMS 1.0.0
 */
public interface PullConsumer extends ServiceLifecycle {
    /**
     * Returns the attributes of this {@code PullConsumer} instance.
     * Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer}.
     * <p>
     * There are some standard attributes defined by OMS for {@code PullConsumer}:
     * <ul>
     * <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.
     * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PullConsumer}.
     * </ul>
     *
     * @return the attributes
     */
    KeyValue attributes();

    /**
     * Attaches the {@code PullConsumer} to a specified queue.
     *
     * @param queueName a specified queue
     * @return this {@code PullConsumer} instance
     */
    PullConsumer attachQueue(String queueName);

    /**
     * Attaches the {@code PullConsumer} to a specified queue with some specified attributes..
     *
     * @param queueName a specified queue
     * @param attributes some specified attributes
     * @return this {@code PullConsumer} instance
     */
    PullConsumer attachQueue(String queueName, KeyValue attributes);

    /**
     * Detaches the {@code PullConsumer} from a specified queue.
     * <p>
     * After the success call, this consumer won't receive new message
     * from the specified queue any more.
     *
     * @param queueName a specified queue
     * @return this {@code PullConsumer} instance
     */
    PullConsumer detachQueue(String queueName);

    /**
     * Receives the next message from the attached queues of this consumer.
     * <p>
     * This call blocks indefinitely until a message is arrives, the timeout expires,
     * or until this {@code PullConsumer} is shut down.
     *
     * @return the next message received from the attached queues, or null if the consumer is
     * concurrently shut down or the timeout expires
     * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error.
     */
    Message receive();

    /**
     * Receives the next message from the attached queues of this consumer, using the specified attributes.
     * <p>
     * This call blocks indefinitely until a message is arrives, the timeout expires,
     * or until this {@code PullConsumer} is shut down.
     *
     * @param attributes the specified attributes
     * @return the next message received from the attached queues, or null if the consumer is
     * concurrently shut down or the timeout expires
     * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error.
     */
    Message receive(KeyValue attributes);

    /**
     * Acknowledges the specified and consumed message with the unique message receipt handle.
     * <p>
     * Messages that have been received but not acknowledged may be redelivered.
     *
     * @param receiptHandle the receipt handle associated with the consumed message
     * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
     */
    void ack(String receiptHandle);

    /**
     * Acknowledges the specified and consumed message with the specified attributes.
     * <p>
     * Messages that have been received but not acknowledged may be redelivered.
     *
     * @param receiptHandle the receipt handle associated with the consumed message
     * @param attributes the specified attributes
     * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
     */
    void ack(String receiptHandle, KeyValue attributes);
}
  • 定义了attachQueue以及attachQueue方法,用于绑定及解绑队列
  • receive方法用于接收消息,其实这里用receive语义不是太好,用pull可能好点
  • ack方法用于ack消息

PushConsumer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.java

/**
 * A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from
 * MOM server to {@code PushConsumer} client.
 *
 * @version OMS 1.0.0
 * @see MessagingAccessPoint#createPushConsumer()
 * @since OMS 1.0.0
 */
public interface PushConsumer extends ServiceLifecycle {
    /**
     * Returns the attributes of this {@code PushConsumer} instance.
     * Changes to the return {@code KeyValue} are not reflected in physical {@code PushConsumer}.
     * <p>
     * There are some standard attributes defined by OMS for {@code PushConsumer}:
     * <ul>
     * <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.
     * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PushConsumer}.
     * </ul>
     *
     * @return the attributes
     */
    KeyValue attributes();

    /**
     * Resumes the {@code PushConsumer} after a suspend.
     * <p>
     * This method resumes the {@code PushConsumer} instance after it was suspended.
     * The instance will not receive new messages between the suspend and resume calls.
     *
     * @throws OMSRuntimeException if the instance has not been suspended.
     * @see PushConsumer#suspend()
     */
    void resume();

    /**
     * Suspends the {@code PushConsumer} for later resumption.
     * <p>
     * This method suspends the consumer until it is resumed.
     * The consumer will not receive new messages between the suspend and resume calls.
     * <p>
     * This method behaves exactly as if it simply performs the call {@code suspend(0)}.
     *
     * @throws OMSRuntimeException if the instance is not currently running.
     * @see PushConsumer#resume()
     */
    void suspend();

    /**
     * Suspends the {@code PushConsumer} for later resumption.
     * <p>
     * This method suspends the consumer until it is resumed or a
     * specified amount of time has elapsed.
     * The consumer will not receive new messages during the suspended state.
     * <p>
     * This method is similar to the {@link #suspend()} method, but it allows finer control
     * over the amount of time to suspend, and the consumer will be suspended until it is resumed
     * if the timeout is zero.
     *
     * @param timeout the maximum time to suspend in milliseconds.
     * @throws OMSRuntimeException if the instance is not currently running.
     */
    void suspend(long timeout);

    /**
     * This method is used to find out whether the {@code PushConsumer} is suspended.
     *
     * @return true if this {@code PushConsumer} is suspended, false otherwise
     */
    boolean isSuspended();

    /**
     * Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener}.
     * <p>
     * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new
     * delivered message is coming.
     *
     * @param queueName a specified queue
     * @param listener a specified listener to receive new message
     * @return this {@code PushConsumer} instance
     */
    PushConsumer attachQueue(String queueName, MessageListener listener);

    /**
     * Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener} and some
     * specified attributes.
     * <p>
     * {@link MessageListener#onReceived(Message, MessageListener.Context)}  will be called when new
     * delivered message is coming.
     *
     * @param queueName a specified queue
     * @param listener a specified listener to receive new message
     * @param attributes some specified attributes
     * @return this {@code PushConsumer} instance
     */
    PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes);

    /**
     * Detaches the {@code PushConsumer} from a specified queue.
     * <p>
     * After the success call, this consumer won't receive new message
     * from the specified queue any more.
     *
     * @param queueName a specified queue
     * @return this {@code PushConsumer} instance
     */
    PushConsumer detachQueue(String queueName);

    /**
     * Adds a {@code ConsumerInterceptor} instance to this consumer.
     *
     * @param interceptor an interceptor instance
     */
    void addInterceptor(ConsumerInterceptor interceptor);

    /**
     * Removes an interceptor from this consumer.
     *
     * @param interceptor an interceptor to be removed
     */
    void removeInterceptor(ConsumerInterceptor interceptor);
}
  • 定义了attachQueue以及detachQueue方法用于绑定及解绑队列
  • attachQueue方法有个MessageListener用于消息push的时候的回调处理

MessageListener

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java

/**
 * A message listener must implement this {@code MessageListener} interface and register
 * itself to a consumer instance to asynchronously receive messages.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface MessageListener {
    /**
     * Callback method to receive incoming messages.
     * <p>
     * A message listener should handle different types of {@code Message}.
     *
     * @param message the received message object
     * @param context the context delivered to the consume thread
     */
    void onReceived(Message message, Context context);

    interface Context {
        /**
         * Returns the attributes of this {@code MessageContext} instance.
         *
         * @return the attributes
         */
        KeyValue attributes();

        /**
         * Acknowledges the specified and consumed message, which is related to this {@code MessageContext}.
         * <p>
         * Messages that have been received but not acknowledged may be redelivered.
         *
         * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
         */
        void ack();
    }
}
  • MessageListener定义了onReceived的回调方法,同时传递Context上下文对象

StreamingConsumer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingConsumer.java

/**
 * A {@code StreamingConsumer} provides low level APIs to open multiple streams
 * from a specified queue and then retrieve messages from them through @{code StreamingIterator}.
 *
 * A {@code Queue} is consists of multiple streams, the {@code Stream} is an abstract concept and
 * can be associated with partition in most messaging systems.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface StreamingConsumer extends ServiceLifecycle {
    /**
     * Returns the attributes of this {@code StreamingConsumer} instance.
     * Changes to the return {@code KeyValue} are not reflected in physical {@code StreamingConsumer}.
     * <p>
     * There are some standard attributes defined by OMS for {@code StreamingConsumer}:
     * <ul>
     * <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.
     * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code
     * StreamingConsumer}.
     * </ul>
     *
     * @return the attributes
     */
    KeyValue attributes();

    /**
     * Creates a {@code StreamingIterator} from the end position of the specified stream.
     *
     * @param streamName the specified stream
     * @return a message iterator at the begin position.
     */
    StreamingIterator seekToEnd(String streamName);

    /**
     * Creates a {@code StreamingIterator} from the begin position of the specified stream.
     *
     * @param streamName the specified stream
     * @return a message iterator at the begin position.
     */
    StreamingIterator seekToBeginning(String streamName);

    /**
     * Creates a {@code StreamingIterator} from the fixed position of the specified stream.
     * <p>
     * Creates a {@code StreamingIterator} from the begin position if the given position
     * is earlier than the first message's store position in this stream.
     * <p>
     * Creates a {@code StreamingIterator} from the end position, if the given position
     * is later than the last message's store position in this stream.
     * <p>
     * The position is a {@code String} value, may represented by timestamp, offset, cursor,
     * even a casual key.
     *
     * @param streamName the specified stream
     * @param position the specified position
     * @return a message iterator at the specified position
     */
    StreamingIterator seek(String streamName, String position);
}
  • 流式处理主要是定义seek方法,返回的是StreamingIterator

StreamingIterator

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingIterator.java

/**
 * A {@code StreamingIterator} is provided by {@code Stream} and is used to
 * retrieve messages a specified stream like a read-only iterator.
 *
 * @version OMS 1.0.0
 * @since OMS 1.0.0
 */
public interface StreamingIterator {
    /**
     * Returns the attributes of this {@code StreamingIterator} instance.
     * <p>
     * There are some standard attributes defined by OMS for {@code Stream}:
     * <ul>
     * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code
     * Stream}.
     * </ul>
     *
     * @return the attributes
     */
    KeyValue attributes();

    /**
     * Returns {@code true} if this iterator has more messages when
     * traversing the iterator in the forward direction.
     *
     * @return {@code true} if the iterator has more messages
     */
    boolean hasNext();

    /**
     * Returns the next message in the iteration and advances the offset position.
     * <p>
     * This method may be called repeatedly to iterate through the iteration,
     * or intermixed with calls to {@link #previous} to go back and forth.
     *
     * @return the next message in the list
     * @throws OMSRuntimeException if the iteration has no more message, or
     * the the consumer fails to receive the next message
     */
    Message next();

    /**
     * Returns {@code true} if this partition iterator has more messages when
     * traversing the iterator in the reverse direction.
     *
     * @return {@code true} if the partition iterator has more messages when
     * traversing the iterator in the reverse direction
     */
    boolean hasPrevious();

    /**
     * Returns the previous message in the iteration and moves the offset
     * position backwards.
     * <p>
     * This method may be called repeatedly to iterate through the iteration backwards,
     * or intermixed with calls to {@link #next} to go back and forth.
     *
     * @return the previous message in the list
     * @throws OMSRuntimeException if the iteration has no previous message, or
     * the the consumer fails to receive the previous message
     */
    Message previous();

    /**
     * Returns the position of the message that would be returned by a
     * subsequent call to {@link #next}.
     *
     * @return the position of the next message
     * @throws OMSRuntimeException if the iteration has no next message
     */
    String nextPosition();

    /**
     * Returns the position of the message that would be returned by a
     * subsequent call to {@link #previous}.
     *
     * @return the position of the previous message
     * @throws OMSRuntimeException if the iteration has no previous message
     */
    String previousPosition();
}
  • 是一个read-only的iterator,类似java的iterator,提供了hasNext、next等方法

小结

openmessaging-java的定义java实现OpenMessaging的api规范,其中producer提供了单个发送也提供了批量发送的方法,而consumer则提供了pull、push以及stream三类消费方式。

doc

  • openmessaging.cloud
  • OpenMessaging Runtime Interface for Java

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-07-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

自定义kafka streams的processor

本文来解析一下kafka streams的KStreamBuilder以及举例如何自定义kafka streams的processor

14120
来自专栏函数式编程语言及工具

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)...

45480
来自专栏码匠的流水账

聊聊storm的LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer....

14930
来自专栏码匠的流水账

聊聊rocketmq的PullConsumerImpl

io/openmessaging/rocketmq/consumer/PullConsumerImpl.java

25010
来自专栏10km的专栏

thrift:返回null的解决办法

最的项目用到swift:thrift做RPC框架,开始也没有了解太深,就开始干了,今天开始测试了,发现thrift居然不允许服务接口返回null。跟踪源码到下面...

42560
来自专栏码匠的流水账

聊聊storm的LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder...

12530
来自专栏码匠的流水账

聊聊flink的RichParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/func...

32210
来自专栏码匠的流水账

聊聊storm supervisor的启动

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Supervisor.java

19530
来自专栏码匠的流水账

聊聊storm的LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder...

8920
来自专栏码匠的流水账

聊聊storm worker的executor与task

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

11120

扫码关注云+社区

领取腾讯云代金券