首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark报错记录:Overloaded method foreachBatch with alternatives

Spark报错记录:Overloaded method foreachBatch with alternatives

作者头像
WHYBIGDATA
发布2023-01-31 11:42:12
发布2023-01-31 11:42:12
71300
代码可运行
举报
运行总次数:0
代码可运行

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives0. 写在前面1. 报错2. 代码及报错信息3. 原因及纠错4. 参考链接


Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

0. 写在前面

  • Spark : Spark3.0.0
  • Scala : Scala2.12

1. 报错

overloaded method value foreachBatch with alternatives:

2. 代码及报错信息

Error:(48, 12) overloaded method value foreachBatch with alternatives: (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and> (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) .foreachBatch((df, batchId) => {

代码语言:javascript
代码运行次数:0
运行
复制
 import java.util.Properties
 import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 object ForeachBatchSink1 {
     def main(args: Array[String]): Unit = {
         val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink1")
            .getOrCreate()
         import spark.implicits._
         
         val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
         
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
         
         val query: StreamingQuery = lines.writeStream
            .outputMode("update")
            .foreachBatch((df, batchId) => {
                 val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
               
                 result.persist()
               result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                 result.write.mode("overwrite").json("./foreach1")
                 result.unpersist()
            })
 //           .trigger(Trigger.ProcessingTime(0))
            .trigger(Trigger.Continuous(10))
            .start
         query.awaitTermination()
       
    }
 }


/**

  • Error:(43, 12) overloaded method value foreachBatch with alternatives:
  • (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
  • (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
  • cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
  • .foreachBatch((df, batchId) => {*/
代码语言:javascript
代码运行次数:0
运行
复制
 import java.util.Properties
 import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 object ForeachBatchSink {
     def main(args: Array[String]): Unit = {
         val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink")
            .getOrCreate()
         import spark.implicits._
         
         val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
         
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
         
         val query: StreamingQuery = lines.writeStream
            .outputMode("complete")
            .foreachBatch((df, batchId) => {          
                 result.persist()
                 result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                 result.write.mode("overwrite").json("./foreach")
                 result.unpersist()
            })
            .start
         query.awaitTermination()
       
    }
 }

3. 原因及纠错

Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样

正确代码如下

代码语言:javascript
代码运行次数:0
运行
复制
 import java.util.Properties
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
 
 object ForeachBatchSink {
 
     def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
         println("BatchId" + batchId)
         if (df.count() != 0) {
             df.persist()
             df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
             df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
             df.unpersist()
        }
    }
 
     def main(args: Array[String]): Unit = {
 
         val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink")
          .getOrCreate()
         import spark.implicits._
 
         val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load
 
         val wordCount: DataFrame = lines.as[String]
          .flatMap(_.split("\\W+"))
          .groupBy("value")
          .count()  // value count
 
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
 
         val query: StreamingQuery = wordCount.writeStream
          .outputMode("complete")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
               myFun(df, batchId, props)
          })
          .start
 
         query.awaitTermination()
 
    }
 }

代码语言:javascript
代码运行次数:0
运行
复制
 import java.util.Properties
 
 import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
 object ForeachBatchSink1 {
 
     def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
         import spark.implicits._
         println("BatchId = " + batchId)
         if (df.count() != 0) {
             val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
             result.persist()
             result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
             result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
             result.unpersist()
        }
    }
 
     def main(args: Array[String]): Unit = {
 
         val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink1")
          .getOrCreate()
         import spark.implicits._
 
         val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load
 
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
 
         val query: StreamingQuery = lines.writeStream
          .outputMode("update")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
                 myFun(df, batchId, props, spark)
          })
          .trigger(Trigger.Continuous(10))
          .start
         query.awaitTermination()
 
    }
 }

4. 参考链接

https://blog.csdn.net/Shockang/article/details/120961968

小手一点关注,文章提前阅读

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-07-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 WHYBIGDATA 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0. 写在前面
  • 1. 报错
  • 2. 代码及报错信息
  • 3. 原因及纠错
  • 4. 参考链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档