首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何使用kafka流放置avro消息,但它是作为二进制数据类型-java?

如何使用kafka流放置avro消息,但它是作为二进制数据类型-java?
EN

Stack Overflow用户
提问于 2018-10-11 01:22:19
回答 1查看 0关注 0票数 0

我正在尝试将jsonSerde作为主题的输入,并且应该处理记录并且需要使用kafka stream将其作为Avro消息放在不同的主题中。输出看起来是二进制的并且数据不是实际的JSON格式。看起来像它一样使用默认的bytearrayserde作为值和key.I不知道为什么,但我提供序列化器作为SpecificAvroSerde。

        private final static JsonSerde<JsonNode> jsonSerde = new JsonSerde<JsonNode>(JsonNode.class);

        private static Map<String, Object> props;
        //Serde of specific record
        private static  SpecificAvroSerde<SpecificRecord> productValueSerde;



   @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public StreamsConfig kafkaStreamsConfig()
                throws UnknownHostException {
            props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG,
                    "*****processor-3");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092,localhost:19092,localhost:39092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                    "http://localhost:18081");
            props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients"
                    + ".consumer.RoundRobinAssignor");
            productValueSerde = new SpecificAvroSerde<SpecificRecord>();
            productValueSerde.configure((Collections.singletonMap(
                    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                    "http://localhost:18081")),false);
            return new StreamsConfig(props);
        }


        @Bean
        public KStream<JsonNode,JsonNode> KStream(StreamsBuilder kStreamBuilder){

            KStream<JsonNode,JsonNode> stream = kStreamBuilder.stream("localtest",Consumed.with(jsonSerde, jsonSerde));
            try {
                KStream<JsonNode,SpecificRecord> avroStream = stream.flatMap((K,V)->actNationalPaperHelper.mapToCoreAvro(K, V));
                //avroStream.flatMap((K,V)->System.out.println(V); return avroStream));
                avroStream.through("serdetest16",Produced.with(jsonSerde, productValueSerde));
            }
            catch(Exception e) {
                System.out.println(e);
            }

            return stream;

        }
   T
EN

Stack Overflow用户

发布于 2018-10-11 10:40:41

似乎Serde在代码中明确地覆盖了配置中的默认值:

avroStream.through("serdetest16",Produced.with(jsonSerde, productValueSerde));

使用Producer.with()子句,写入的数据"serdetest16"将写为JSON,如代码中所指定。如果要through使用Serde配置中的默认值,则应省略第二个参数或传入Avro-Serdes。

票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/-100005065

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档