首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为什么状态存储由于序列化问题而失败?

为什么状态存储由于序列化问题而失败?
EN

Stack Overflow用户
提问于 2018-12-27 16:45:56
回答 1查看 714关注 0票数 1

我使用1.1.0。

我创建了以下拓扑:

代码语言:javascript
运行
复制
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
      --> KTABLE-SOURCE-0000000002
    Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
      --> KTABLE-MAPVALUES-0000000003
      <-- KSTREAM-SOURCE-0000000001
    Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
      --> none
      <-- KTABLE-SOURCE-0000000002

守则如下:

代码语言:javascript
运行
复制
case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
  Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
  .someAdditionalTransformation
  .mapValues[Test](
      new ValueMapperWithKey[String, String, Test] {
         override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
      }, mal)

我想构建一个可查询的存储,以后可以用来查询它(检索、过滤/转换的值)。

我使用TopologyTestDriver运行了一个简单的测试,并引发了以下异常:

原因: org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1无法将com.example.kafka.streams.topology.Test转换为java.lang.String.innerValue(MeteredKeyValueBytesStore.java:57) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:103) at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:89)在org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)在org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush( org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:244) ) (InnerMeteredKeyValueStore.java:267) . 58人

知道为什么怎么修吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-12-28 14:43:00

经过一番调查,我发现了上述例外的原因。

我已经为存储数据创建了物化,但是我没有为键或值传递任何Serdes。

如果没有传递,则使用默认值。在我的例子中,它是StringSerializer,我尝试用StringSerializer mea 序列化Test类的对象。

要传递服务,只需添加.withValueSerde(GenericSerde[Test]),其中GenericSerdes是org.apache.kafka.common.serialization.Serde的实现

代码语言:javascript
运行
复制
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
  Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
    .withValueSerde(GenericSerde[Test])
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53948254

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档