首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >:未能从[org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord]类型转换为

:未能从[org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord]类型转换为
EN

Stack Overflow用户
提问于 2021-12-18 11:34:55
回答 3查看 479关注 0票数 0

当我试图将一条消息从Stream转换到我的实体时,我一直在与中的这个异常做斗争。我认为这是由于Redis的默认反序列化程序出现问题,但是不确定如何解决。

当我将这条消息发送到Redis流时

XADD我的流*从john到smith类型的请求

我从我的春季启动服务中得到了这个异常。

代码语言:javascript
运行
复制
org.springframework.core.convert.ConversionFailedException: Failed to convert from type [org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord] to type [com.example.messaging.Dto.NotificationMessage] for value 'MapBackedRecord{recordId=1639826917707-0, kvMap={[B@55ab97aa=[B@510458ff, [B@5b7e99c1=[B@4141b49e, [B@12840469=[B@6aa14d0}}'; nested exception is java.lang.IllegalArgumentException: Value must not be null!
    at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:198) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:176) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.lang.IllegalArgumentException: Value must not be null!
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.12.jar:5.3.12]
    at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:577) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getDeserializer$2(DefaultStreamMessageListenerContainer.java:240) ~[spring-data-redis-2.5.6.jar:2.5.6]
    at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:196) ~[spring-data-redis-2.5.6.jar:2.5.6]
    ... 4 common frames omitted

配置类:

代码语言:javascript
运行
复制
@Configuration
public class RedisConfig {

    @Autowired
    private StreamListener<String, ObjectRecord<String, NotificationMessage>> streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, NotificationMessage>> options
                = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .pollTimeout(Duration.ofMillis(1000))
                .targetType(NotificationMessage.class)
                .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, NotificationMessage>> listenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        Subscription subscription = listenerContainer.receive(StreamOffset.latest("my-stream"), streamListener);

        listenerContainer.start();
        return subscription;
    }
}

NotificationMessage Dto:

代码语言:javascript
运行
复制
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NotificationMessage {
    private String id;
    private String from;
    private String to;
    private String type;
}
EN

Stack Overflow用户

发布于 2022-10-14 19:49:13

简单的代码示例(spring-data-redis:2.6.4):

溪流记录:

代码语言:javascript
运行
复制
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.TypeAlias;

@Data
@NoArgsConstructor
@AllArgsConstructor
@TypeAlias("com.pet.streams.User")
public class User {
    private String name;
    private Integer age;
}

流配置:

代码语言:javascript
运行
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;

@Slf4j
@Configuration
public class StreamsConfig {
    private final RedisTemplate<String, Object> redisTemplate;

    public StreamsConfig(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Bean
    public StreamListener<String, ObjectRecord<String, User>> userStreamListener() {
        // handle message from stream
        return message -> log.info(message.toString());
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory, StreamListener<String, ObjectRecord<String, User>> userStreamListener) throws UnknownHostException {
        String streamKey = "streams:users";
        String groupName = "cg";
        createConsumerGroupIfNotExists(streamKey, groupName);
        StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, User>> options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofMillis(100))
                .targetType(User.class)
                .build();
        StreamMessageListenerContainer<String, ObjectRecord<String, User>>  listenerContainer = StreamMessageListenerContainer
                .create(redisConnectionFactory, options);
        StreamMessageListenerContainer.StreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
                .consumer(Consumer.from(groupName, InetAddress.getLocalHost().getHostName()))
                .cancelOnError((err) -> false)  // do not stop consuming after error
                .errorHandler((err) -> log.error(err.getMessage()))
                .build();
        Subscription subscription = listenerContainer.register(readRequest, userStreamListener);
        listenerContainer.start();
        return subscription;
    }

    private void createConsumerGroupIfNotExists(String streamKey, String groupName){
        try {
            redisTemplate.opsForStream().createGroup(streamKey, groupName);
        }
        catch (RedisSystemException ex){
            log.error(ex.getMessage());
        }
    }
}

测试:

XADD streams:users * _class com.pet.streams.User name Bob age 30中输入redis-cli以将一个记录添加到流中。应该通过正确运行应用程序来使用(日志)。

输入XADD streams:users * name Bob age 30不带_class字段。应该记录异常(上面提到过)。

结论:

Spring数据在发布新记录到流时自动设置_class字段。但是,如果使用外部系统向redis流发布消息,请确保在记录中提供了_class字段。或覆盖默认映射程序。

额外:

检查DefaultRedisTypeMapper类。用自定义映射器替换它。我还没试过,但看起来很有可能。

票数 1
EN
查看全部 3 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70403080

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档