首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何在NiFi ValidateRecord processor / JsonRecordSetWriter中将时间戳序列化为Json字段

如何在NiFi ValidateRecord processor / JsonRecordSetWriter中将时间戳序列化为Json字段
EN

Stack Overflow用户
提问于 2018-10-19 01:22:59
回答 1查看 1.1K关注 0票数 1

如何在NiFi ValidateRecord processor / JsonRecordSetWriter中将时间戳序列化为Json字段。

在输入时,我有一个CSV文件,其中包含一个格式为yyyy-MM-dd HH:mm:ss.SSS的时间戳列。在我的NiFi流中,我有一个ValidateRecord处理器,它使用CSVReader进行读取,使用JsonRecordSetWriter作为编写器。它们都使用Avro模式,时间戳字段定义如下

代码语言:javascript
复制
"fields" : [ {
    "name" : "timestamp",
    "type" : {
            "type" : "long",
            "logicalType" : "timestamp-millis"
        },
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
    }, {
    ...

当有像2016-10-08 07:51:00.000这样的字段值的记录通过时,我在NiFi日志中得到一个异常:

代码语言:javascript
复制
2018-10-18 17:05:59,135 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=3d44915d-a52a-3eb0-1ae1-7b0cbe4b1a03] Failed to write MapRecord[{timestamp=2016-10-08 07:51:00.0, ...  ] with schema {"type":"record","name":"redfunnel","doc":"Schema generated by Kite","fields":[{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"},"doc":"Type inferred from '2016/10/08 07:51:00.000'"},{ .... }]} as a JSON Object due to java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp): java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
    at org.codehaus.jackson.impl.JsonGeneratorBase._writeSimpleObject(JsonGeneratorBase.java:556)
    at org.codehaus.jackson.impl.JsonGeneratorBase.writeObject(JsonGeneratorBase.java:317)
    at org.apache.nifi.json.WriteJsonResult.writeRawValue(WriteJsonResult.java:267)
    at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:201)
    at org.apache.nifi.json.WriteJsonResult.writeRawRecord(WriteJsonResult.java:149)
    at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:342)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

在我的JsonRecordSetWriter的属性中,我尝试指定一种将时间戳写为yyyy-MM-dd HH:mm:ss.SSS的格式

但不幸的是,没有成功,我仍然在NiFi日志中得到相同的异常。

这是否意味着JsonRecordSetWriter在默认情况下不能序列化java.time.Timestamp,即使它具有用于配置看似完全正确的Timestamp Format属性?

是否可以使用开箱即用的NiFi组件根据自定义格式编写时间戳,或者我必须修改JsonRecordSetWriter

更新

在代码之后,我的异常从this code branch抛出。它似乎是未通过验证的无效记录的分支。也许我的错误只发生在无效的记录上。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-10-19 19:46:49

看起来我找到了一个在我的情况下有效的配置。

我必须将模式一分为二:一个用于输入,另一个用于输出。

因此,schema1将时间戳字段定义为:

代码语言:javascript
复制
{
    "name" : "timestamp",
    "type" : "string",
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}

schema2将时间戳字段定义为

代码语言:javascript
复制
{
    "name" : "timestamp",
    "type" : {
        "type" : "long",
        "logicalType" : "timestamp-millis"
    },
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}

现在,我正在使用以下命令配置ValidateRecord处理器

  • CSVReader,它使用schema1 schema2
  • ValidateRecord的"Schema Text“字段和ValidateRecord

之后,记录毫无错误地通过我的ValidateRecord处理器,并进入Postgres数据库的timestamp字段,该数据库使用PutDatabaseRecord处理器,该处理器使用配置了schema2JsonTreeReader

同样重要的是,使用正确的字符串格式配置JsonTreeReader的时间戳格式属性,例如在我的示例中为'yyyy-MM-dd HH:mm:ss.SSS‘。

希望这能在类似的情况下帮助某些人。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52879418

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档