问题是在进行案例类提取时,映射函数中存在问题。case类不可序列化。我已经隐式地定义了DefaultFormats
格式。
package org.apache.flink.quickstart
import java.util.Properties
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try
case class CC(key:String)
object WordCount{
def main(args: Array[String]) {
implicit val formats = org.json4s.DefaultFormats
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
// .map(_.extract[CC])
val l = st.map(_.extract[CC])
st.print()
env.execute()
}
}
错误:
INFO main -没有检测到org.json4s.JsonAST$JValue类的字段。不能用作PojoType。将作为线程"main“org.apache.flink.api.common.InvalidProgramException:任务中的GenericType异常处理,在org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666) at org.apache.flink.streaming上不可序列化。.api.scala.DataStream.clean(DataStream.scala:994) at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:519) at org.apache.flink.quickstart.WordCount$.main(WordCount.scala:38) at org.apache.flink.quickstart.WordCount.main(WordCount.scala) at : java.io.NotSerializableException: org.json4s.DefaultFormats$$anon$4 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)在java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)在java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317) at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170) 6 进程已完成,退出代码为%1
发布于 2017-05-11 22:36:00
解决办法
implicit val formats = org.json4s.DefaultFormats
外部的主要功能,如
object WordCount{
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String])
或者懒散地初始化格式,比如
implicit lazy val formats = org.json4s.DefaultFormats
在主要功能内部,如
def main(args: Array[String]) {
implicit lazy val formats = org.json4s.DefaultFormats
https://stackoverflow.com/questions/43731397
复制