首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >无法执行转换并从Flink DataStream和Kafka主题中提取JSON值

无法执行转换并从Flink DataStream和Kafka主题中提取JSON值
EN

Stack Overflow用户
提问于 2021-12-27 15:38:47
回答 1查看 83关注 0票数 0

我试图阅读卡夫卡主题的数据,并成功地阅读了它。但是,我希望提取数据并将其作为Tuple返回。因此,我尝试执行map操作,但它不允许我通过说cannot resolve overloaded method 'map'来执行。下面是我的代码:

代码语言:javascript
复制
  package KafkaAsSource

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties

object ReadAndValidateJSON {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    //env.enableCheckpointing(5000)
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    val data:DataStream[String] = getDataFromKafkaTopic(properties,env)


    val mappedData: DataStream[jsonData] = data.map(v => v)
      .map {
        v =>
          val id = v["id"]
          val category = v["category"]
          val eventTime = v["eventTime"]
          jsonData(id,category,eventTime)
      }
    
    data.print()


    env.execute("ReadAndValidateJSON")
  }

  def getDataFromKafkaTopic(properties: Properties,env:StreamExecutionEnvironment): DataStream[String] = {

    val consumer = new FlinkKafkaConsumer[String]("maddy1", new SimpleStringSchema(), properties)
    consumer.setStartFromEarliest()
    val src: DataStream[String] = env.addSource(consumer)
    return src
  }

}

Pom.xml

代码语言:javascript
复制
<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>

    </dependencies>

Kafka主题数据:

代码语言:javascript
复制
{
  "id":"7",
  "Category":"Flink",
  "eventTime":"2021-12-27 20:52:58.708"
  
}
{
  "id":"9",
  "Category":"Flink",
  "eventTime":"2021-12-27 20:52:58.727"
  
}
{
  "id":"10",
  "Category":"Flink",
  "eventTime":"2021-12-27 20:52:58.734"
  
}

我到底哪里出错了?依赖关系正确吗?我的Flink版本是1.12.2

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-27 15:50:09

试着添加

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70497515

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档