(end goal)在测试我是否最终可以使用火花流从汇合平台读取avro数据之前,如下所述:Integrating Spark Structured Streaming with the Confluent Schema Registry
我将验证是否可以使用下面的命令来读取它们:
$ kafka-avro-console-consumer \
> --topic my-topic-produced-using-file-pulse-xml \
> --from-beginning \
> --bootstrap-server localhost:9092 \
> --property schema.registry.url=http://localhost:8081我收到这个错误消息,未知魔法字节
Processed a total of 1 messages
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!注意,可以这样读消息(使用console-而不是avro-console-):
kafka-console-consumer \
--bootstrap-server localhost:9092 --group my-group-console \
--from-beginning \
--topic my-topic-produced-using-file-pulse-xml该消息是使用汇流连接文件-脉冲(1.5.2)读取xml文件(streamthoughts/kafka- connect file -脉冲)生成的。
,请在这里帮忙:,我用错了kafka-avro-console-consumer吗?我尝试了这里描述的“反序列化器”属性选项:https://stackoverflow.com/a/57703102/4582240,没有帮助
我还不想勇敢地启动火花流来读取数据。
我使用的文件脉冲1.5.2属性如下所示:添加了11/09/2020以完成.
name=connect-file-pulse-xml
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic= my-topic-produced-using-file-pulse-xml
tasks.max=1
# File types
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.xml$
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader
force.array.on.fields=sometagNameInXml
# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/kafka-connect/xml/
fs.scan.interval.ms=10000
# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-xml
internal.kafka.reporter.topic=connect-file-pulse-status
# Track file by name
offset.strategy=name发布于 2020-09-10 20:11:19
如果您正在从消费者那里得到未知的Magic,那么生产者就没有使用合流AvroSerializer,并且可能已经推送了不使用架构注册表的Avro数据。
如果不看到生产者代码或使用和检查二进制格式的数据,就很难知道是哪种情况。
消息是使用汇流连接文件-脉冲产生的。
value.converter和AvroConverter类一起使用了吗?
https://stackoverflow.com/questions/63828704
复制相似问题