前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何在Springboot中使用Redis5的Stream

如何在Springboot中使用Redis5的Stream

作者头像
烂猪皮
发布2020-11-10 14:03:43
3.1K0
发布2020-11-10 14:03:43
举报
文章被收录于专栏:JAVA烂猪皮JAVA烂猪皮

一句话概括:Redis5的新数据类型,功能就是MQ。可以生产消息,消费消息。支持群组消费,以及消息确认。

在理解了Stream后,就可以继续往下看

SpringBoot整合

只需要整合进Redis就行。

POM.xml

springboot2默认使用lettuce作为客户端

代码语言:javascript
复制
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-pool2</artifactId>
</dependency>

配置

代码语言:javascript
复制
spring:
  redis:
    database: 0
    host: 192.168.1.103
    port: 6379
    password: "123456"
    timeout: 2000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

消息和消息ID的对象

我觉得要先说一下,这两个对象。因为以下的内容,都需要跟这两个对象打交道

消息对象的创建

使用StreamRecords的静态方法来创建消息实例。

一个stream消息有两个内容。可以理解为:一个是key, -个是value。

key和value都可以使用自定义的对象,字节,字符串来定义

代码语言:javascript
复制
ByteRecord rawBytes(Map<byte[], byte[]> raw) 

ByteBufferRecord rawBuffer(Map<ByteBuffer, ByteBuffer> raw) 

StringRecord string(Map<String, String> raw)

<S, K, V> MapRecord<S, K, V> mapBacked(Map<K, V> map)

<S, V> ObjectRecord<S, V> objectBacked(V value)

RecordBuilder<?> newRecord()  // 通过builder方式来创建消息

Recordld表示消息ID

你读过上面的帖子,就会知道。-条消息的ID是唯一的。并且有2部分组成

代码语言:javascript
复制
// ----------- 读取ID属性的实例方法
// 是否是系统自动生成的
boolean shouldBeAutoGenerated();
// 获取原始的id字符串
String getValue();
// 获取序列号部分
long getSequence();
// 获取时间戳部分
long getTimestamp();

// ----------- 创建ID的静态方法
RecordId of(@Nullable String value)
RecordId of(long millisecondsTime, long sequenceNumber)
RecordId autoGenerate()

往Stream推送消息

使用Redis Template

代码语言:javascript
复制
@Autowired
private StringRedisTemplate stringRedisTemplate;

public void test () {
  // 创建消息记录, 以及指定stream
  StringRecord stringRecord = StreamRecords.string(Collections.singletonMap("name", "KevinBlandy")).withStreamKey("mystream");
  RecordId recordId = this.stringRedisTemplate.opsForStream().add(stringRecord);
  // 是否是自动生成的
  boolean autoGenerated = recordId.shouldBeAutoGenerated();
  // id值
  String value = recordId.getValue();
  // 序列号部分
  long sequence = recordId.getSequence();
  // 时间戳部分
  long timestamp = recordId.getTimestamp();
}

使用RedisConnection

代码语言:javascript
复制
@Autowired
private RedisConnectionFactory redisConnectionFactory;

public void test () {
  // 创建消息记录, 以及指定stream
  ByteRecord byteRecord = StreamRecords.rawBytes(Collections.singletonMap("name".getBytes(), "KevinBlandy".getBytes())).withStreamKey("mystream".getBytes());
  // 获取连接
  RedisConnection redisConnection = this.redisConnectionFactory.getConnection();
  RecordId recordId = redisConnection.xAdd(byteRecord);
  // 是否是自动生成的
  boolean autoGenerated = recordId.shouldBeAutoGenerated();
  // id值
  String value = recordId.getValue();
  // 序列号部分
  long sequence = recordId.getSequence();
  // 时间戳部分
  long timestamp = recordId.getTimestamp();
}

从Stream消费消息

阻塞消费

StreamConsumerRunner

使用ApplicationRnner,在系统启动以后,初始化监听器。开始监听消费。

代码语言:javascript
复制
import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;


@Component
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {

  static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);

  @Value("${redis.stream.consumer}")
  private String consumer;

  @Autowired
  RedisConnectionFactory redisConnectionFactory;

  @Autowired
  ThreadPoolTaskExecutor threadPoolTaskExecutor;

  @Autowired
  StreamMessageListener streamMessageListener;

  @Autowired
  StringRedisTemplate stringRedisTemplate;

  private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;

  @Override
  public void run(ApplicationArguments args) throws Exception {

    // 创建配置对象
    StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
        .builder()
        // 一次性最多拉取多少条消息
        .batchSize(10)
        // 执行消息轮询的执行器
        .executor(this.threadPoolTaskExecutor)
        // 消息消费异常的handler
        .errorHandler(new ErrorHandler() {
          @Override
          public void handleError(Throwable t) {
            // throw new RuntimeException(t);
            t.printStackTrace();
          }
        })
        // 超时时间,设置为0,表示不超时(超时后会抛出异常)
        .pollTimeout(Duration.ZERO)
        // 序列化器
        .serializer(new StringRedisSerializer())
        .build();

    // 根据配置对象创建监听容器对象
    StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
        .create(this.redisConnectionFactory, streamMessageListenerContainerOptions);

    // 使用监听容器对象开始监听消费(使用的是手动确认方式)
    streamMessageListenerContainer.receive(Consumer.from("group-1", "consumer-1"), 
        StreamOffset.create("mystream", ReadOffset.lastConsumed()), this.streamMessageListener);

    this.streamMessageListenerContainer = streamMessageListenerContainer;
    // 启动监听
    this.streamMessageListenerContainer.start();

  }

  @Override
  public void destroy() throws Exception {
    this.streamMessageListenerContainer.stop();
  }
}

StreamMessageListener

实现函数接口 StreamListener<K, V extends Record<K, ?>> ,来自定义消息的消费实现

代码语言:javascript
复制
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;


@Component
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>>{

  static final Logger LOGGER = LoggerFactory.getLogger(StreamMessageListener.class);

  @Autowired
  StringRedisTemplate stringRedisTemplate;

  @Override
  public void onMessage(MapRecord<String, String, String> message) {

    // 消息ID
    RecordId messageId = message.getId();

    // 消息的key和value
    Map<String, String> body = message.getValue();

    LOGGER.info("stream message。messageId={}, stream={}, body={}", messageId, message.getStream(), body);

    // 通过RedisTemplate手动确认消息
    this.stringRedisTemplate.opsForStream().acknowledge("mystream", message);
  }
}

非阻塞消费

主要是通过StreamOperations或者是RedicConnection的消费API来进行消息的随机消费

StreamOperations中,关于读取操作的API

从RedisTemplate中获取到StreamOperations

代码语言:javascript
复制
StreamOperations<String, String, String> s = this.stringRedisTemplate.opsForStream();

StreamOperations 的读取 API

代码语言:javascript
复制
// 随机范围读取
<V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range)
<V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range, Limit limit)


// 根据消息ID或者偏移量读取
List<MapRecord<K, HK, HV>> read(StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(StreamReadOptions readOptions, StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions readOptions, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(Consumer consumer, StreamOffset<K>... streams)
<V> List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamOffset<K>... streams)
List<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)
List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams)

// 随机逆向范围读取
List<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range)
List<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range, Limit limit)
<V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range)
<V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range, Limit limit)

// 消费者信息
XInfoConsumers consumers(K key, String group);
// 消费者信息
XInfoGroups groups(K key);
// stream信息
XInfoStream info(K key);

// 获取消费组,消费者中未确认的消息
PendingMessagesSummary pending(K key, String group);
PendingMessages pending(K key, Consumer consumer)
PendingMessages pending(K key, String group, Range<?> range, long count)
PendingMessages pending(K key, String group, Range<?> range, long count)

测试

先通过Redis控制台创建stream以及group。

代码语言:javascript
复制
127.0.0.1:6379> XADD mystream * hello world
"1583208428680-0"
127.0.0.1:6379> XGROUP CREATE mystream group-1 $
OK

启动程序后,通过控制台往stream生产消息

代码语言:javascript
复制
127.0.0.1:6379> XADD mystream * name KevinBlandy
"1583208571017-0"

程序成功的消费了这条消息

代码语言:javascript
复制
2020-03-03 12:09:34.159  INFO 9344 --- [lTaskExecutor-1] i.s.c.r.stream.StreamMessageListener     : stream message。messageId=1583208571017-0, stream=mystream, body={name=KevinBlandy

最后

对于Streram还有一些其他的操作。例如:通过RedisTemplate来发送消息,以及查看未ACK的消息,重新消费等等。在这里没有一一列举。其实你如果学懂了Stream,那么我觉得这些API连蒙带猜也都知道是怎么用的。水到渠成的事儿,不难。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JAVA烂猪皮 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 配置
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档