当我试图将一条消息从Stream转换到我的实体时,我一直在与中的这个异常做斗争。我认为这是由于Redis的默认反序列化程序出现问题,但是不确定如何解决。
当我将这条消息发送到Redis流时
XADD我的流*从john到smith类型的请求
我从我的春季启动服务中得到了这个异常。
org.springframework.core.convert.ConversionFailedException: Failed to convert from type [org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord] to type [com.example.messaging.Dto.NotificationMessage] for value 'MapBackedRecord{recordId=1639826917707-0, kvMap={[B@55ab97aa=[B@510458ff, [B@5b7e99c1=[B@4141b49e, [B@12840469=[B@6aa14d0}}'; nested exception is java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:198) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:176) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.5.6.jar:2.5.6]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.12.jar:5.3.12]
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:577) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getDeserializer$2(DefaultStreamMessageListenerContainer.java:240) ~[spring-data-redis-2.5.6.jar:2.5.6]
at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:196) ~[spring-data-redis-2.5.6.jar:2.5.6]
... 4 common frames omitted
配置类:
@Configuration
public class RedisConfig {
@Autowired
private StreamListener<String, ObjectRecord<String, NotificationMessage>> streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, NotificationMessage>> options
= StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(1000))
.targetType(NotificationMessage.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, NotificationMessage>> listenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
Subscription subscription = listenerContainer.receive(StreamOffset.latest("my-stream"), streamListener);
listenerContainer.start();
return subscription;
}
}
NotificationMessage Dto:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NotificationMessage {
private String id;
private String from;
private String to;
private String type;
}
发布于 2022-10-14 19:49:13
简单的代码示例(spring-data-redis:2.6.4):
溪流记录:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.TypeAlias;
@Data
@NoArgsConstructor
@AllArgsConstructor
@TypeAlias("com.pet.streams.User")
public class User {
private String name;
private Integer age;
}
流配置:
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
@Slf4j
@Configuration
public class StreamsConfig {
private final RedisTemplate<String, Object> redisTemplate;
public StreamsConfig(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Bean
public StreamListener<String, ObjectRecord<String, User>> userStreamListener() {
// handle message from stream
return message -> log.info(message.toString());
}
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory, StreamListener<String, ObjectRecord<String, User>> userStreamListener) throws UnknownHostException {
String streamKey = "streams:users";
String groupName = "cg";
createConsumerGroupIfNotExists(streamKey, groupName);
StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, User>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(100))
.targetType(User.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, User>> listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory, options);
StreamMessageListenerContainer.StreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
.consumer(Consumer.from(groupName, InetAddress.getLocalHost().getHostName()))
.cancelOnError((err) -> false) // do not stop consuming after error
.errorHandler((err) -> log.error(err.getMessage()))
.build();
Subscription subscription = listenerContainer.register(readRequest, userStreamListener);
listenerContainer.start();
return subscription;
}
private void createConsumerGroupIfNotExists(String streamKey, String groupName){
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
}
catch (RedisSystemException ex){
log.error(ex.getMessage());
}
}
}
测试:
在XADD streams:users * _class com.pet.streams.User name Bob age 30
中输入redis-cli
以将一个记录添加到流中。应该通过正确运行应用程序来使用(日志)。
输入XADD streams:users * name Bob age 30
不带_class
字段。应该记录异常(上面提到过)。
结论:
Spring数据在发布新记录到流时自动设置_class
字段。但是,如果使用外部系统向redis流发布消息,请确保在记录中提供了_class
字段。或覆盖默认映射程序。
额外:
检查DefaultRedisTypeMapper
类。用自定义映射器替换它。我还没试过,但看起来很有可能。
https://stackoverflow.com/questions/70403080
复制相似问题