首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >在JAAS配置中找不到'KafkaClient‘条目。未设置系统属性'java.security.auth.login.config‘

在JAAS配置中找不到'KafkaClient‘条目。未设置系统属性'java.security.auth.login.config‘
EN

Stack Overflow用户
提问于 2018-08-25 23:19:26
回答 1查看 5.1K关注 0票数 0

我正在尝试从spark structured streaming连接到Kafka。

这是可行的:

代码语言:javascript
复制
spark-shell --master local[1] \
       --files /mypath/jaas_mh.conf \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_mh.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_mh.conf" \
       --num-executors 1  --executor-cores 1 

但是,当我尝试以编程方式执行相同的操作时..

代码语言:javascript
复制
object SparkHelper {
  def getAndConfigureSparkSession() = {
    val conf = new SparkConf()
      .setAppName("Structured Streaming from Message Hub to Cassandra")
      .setMaster("local[1]")
      .set("spark.driver.extraJavaOptions", "-Djava.security.auth.login.config=jaas_mh.conf")
      .set("spark.executor.extraJavaOptions", "-Djava.security.auth.login.config=jaas_mh.conf")

    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    getSparkSession()
  }

  def getSparkSession() : SparkSession = {
    val spark = SparkSession
      .builder()
      .getOrCreate()

    spark.sparkContext.addFile("/mypath/jaas_mh.conf")

    return spark
  }
}

我得到了错误:

代码语言:javascript
复制
 Could not find a 'KafkaClient' entry in the JAAS configuration. 
    System property 'java.security.auth.login.config' is not set

有什么建议吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-08-26 01:23:23

即使在配置文件中,也应该提供.conf文件的完整路径或相对路径。此外,当您创建SparkConf时,我发现您没有将其应用于当前的SparkSession。

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object Driver extends App {

  val confPath: String = "/Users/arcizon/IdeaProjects/spark/src/main/resources/jaas_mh.conf"

  def getAndConfigureSparkSession(): SparkSession = {
    val conf = new SparkConf()
      .setAppName("Structured Streaming from Message Hub to Cassandra")
      .setMaster("local[1]")
      .set("spark.driver.extraJavaOptions", s"-Djava.security.auth.login.config=$confPath")
      .set("spark.executor.extraJavaOptions", s"-Djava.security.auth.login.config=$confPath")

    getSparkSession(conf)
  }

  def getSparkSession(conf: SparkConf): SparkSession = {
    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()

    spark.sparkContext.addFile(confPath)

    spark.sparkContext.setLogLevel("WARN")

    spark
  }

  val sparkSession: SparkSession = getAndConfigureSparkSession()

  println(sparkSession.conf.get("spark.driver.extraJavaOptions"))
  println(sparkSession.conf.get("spark.executor.extraJavaOptions"))

  sparkSession.stop()
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52018621

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档