首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >java.io.IOException:无法将语句写入batch_layer.test。最新的异常是关键字不能为空

java.io.IOException:无法将语句写入batch_layer.test。最新的异常是关键字不能为空
EN

Stack Overflow用户
提问于 2021-05-09 04:22:50
回答 1查看 70关注 0票数 1

我正在尝试计算文本中的单词数,并将结果保存到Cassandra数据库中。Producer从文件中读取数据并将其发送给kafka。消费者使用spark streaming读取和处理数据,然后将计算结果发送到表。

我的producer看起来像这样:

代码语言:javascript
复制
object ProducerPlayground extends App {

  val topicName = "test"
  private def createProducer: Properties = {
    val producerProperties = new Properties()
    producerProperties.setProperty(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      "localhost:9092"
    )
    producerProperties.setProperty(
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      classOf[IntegerSerializer].getName
    )
    producerProperties.setProperty(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      classOf[StringSerializer].getName
    )
    producerProperties
  }

  val producer = new KafkaProducer[Int, String](createProducer)

  val source = Source.fromFile("G:\\text.txt", "UTF-8")

  val lines = source.getLines()

  var key = 0
  for (line <- lines) {
    producer.send(new ProducerRecord[Int, String](topicName, key, line))
    key += 1
  }
  source.close()
  producer.flush()

}

消费者看起来像这样:

代码语言:javascript
复制
object BatchLayer {
  def main(args: Array[String]) {

    val brokers = "localhost:9092"
    val topics = "test"
    val groupId = "groupId-1"

    val sparkConf = new SparkConf()
      .setAppName("BatchLayer")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    val sc = ssc.sparkContext
    sc.setLogLevel("OFF")

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
    )
    val stream =
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
      )

   
    val cass = CassandraConnector(sparkConf)

    cass.withSessionDo { session =>
      session.execute(
        s"CREATE KEYSPACE IF NOT EXISTS batch_layer WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }"
      )
      session.execute(s"CREATE TABLE IF NOT EXISTS batch_layer.test (key VARCHAR PRIMARY KEY, value INT)")
      session.execute(s"TRUNCATE batch_layer.test")
    }

    stream
      .map(v => v.value())
      .flatMap(x => x.split(" "))
      .filter(x => !x.contains(Array('\n', '\t')))
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("batch_layer", "test", SomeColumns("key", "value"))

    ssc.start()
    ssc.awaitTermination()
  }

}

启动producer后,程序会因此错误而停止工作。我做错了什么?

EN

Stack Overflow用户

回答已采纳

发布于 2021-05-09 16:42:37

在2021年使用传统流媒体几乎没有意义--使用起来非常麻烦,而且你还需要跟踪Kafka的偏移量,等等。最好使用Structured Streaming instead -它将通过检查点跟踪你的偏移量,你将使用高级数据集API,等等。

在您的示例中,代码可能如下所示(没有测试,但它是从this working example中采用的):

代码语言:javascript
复制
val streamingInputDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()

val wordsCountsDF = streamingInputDF.selectExpr("CAST(value AS STRING) as value")
  .selectExpr("split(value, '\\w+', -1) as words")
  .selectExpr("explode(words) as word")
  .filter("word != ''")
  .groupBy($"word")
  .count()
  .select($"word", $"count")

// create table ...

val query = wordsCountsDF.writeStream
   .outputMode(OutputMode.Update)
   .format("org.apache.spark.sql.cassandra")
   .option("checkpointLocation", "path_to_checkpoint)
   .option("keyspace", "test")
   .option("table", "<table_name>")
   .start()

query.awaitTermination()

附注:在你的例子中,最可能的错误是你试图直接在DStream上使用.saveToCassandra -它不是这样工作的。

票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67451827

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档