首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >闪烁CassandraSink.addSink(输入)类型不匹配

闪烁CassandraSink.addSink(输入)类型不匹配
EN

Stack Overflow用户
提问于 2017-12-01 03:50:25
回答 1查看 558关注 0票数 0

我是Flink的新手。我正在尝试使用Flink 1.3.2来读取我们的Kinesis流,并将输出写入Cassandra表。该程序能够从Kinesis流式传输数据。

问题是,当我执行“CassandraSink.addSink(CountsStreaming)”时,它给出“类型不匹配。预期: DataStreamNotInferredIN,实际: DataStream(String,Long)”。我已经阅读了文档和源代码,并注意到addSink使用DataStreamIN。

有没有人能帮我理解一下“IN”类型是什么,以及如何解决这个问题?

提前感谢!

代码语言:javascript
运行
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mapper = new ObjectMapper
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
  "kinesis-stream", new SimpleStringSchema, ConsumerConfig))

//DataStream[(String, Long)]
val countsStreaming: DataStream[(String, Long)] = kinesis.map(x => mapper.readValue(x,classOf[java.util.Map[String,String]]))
  .map(x => x.get("game_name"))
  .map({x => (x,1L) })
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

countsStreaming.print()

CassandraSink.addSink(countsStreaming)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
  override def buildCluster(builder: Cluster.Builder): Cluster = {
    builder.addContactPoint("0.0.0.0").build()
  }
}).build()

env.execute("StreamingExample")
EN

回答 1

Stack Overflow用户

发布于 2018-04-10 17:05:02

问题是CassandraSink.addSink只接受Java DataStream。

您需要在scala DataStream之后添加.javaStream,然后类型不匹配应该会消失。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47581521

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档