首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花参数在SparkSubmitOperator - AirFlow中不起作用。

火花参数在SparkSubmitOperator - AirFlow中不起作用。
EN

Stack Overflow用户
提问于 2019-11-27 21:59:50
回答 1查看 970关注 0票数 0

我已经通过下面的火花参数在火花提交操作符在conf中,但是看起来这些参数在运行作业时不工作。

代码语言:javascript
运行
复制
my_conf = {
        'spark.io.compression.codec' : 'snappy',
        'spark.scheduler.listenerbus.eventqueue.size' : '30000',
        'spark.yarn.queue' : 'pixel',
        'spark.driver.cores' : '5',
        'spark.dynamicAllocation.minExecutors' : '100',
        'spark.dynamicAllocation.maxExecutors' : '300',
        'spark.shuffle.compress' : 'false',
        'spark.sql.tungsten.enabled' : 'true',
        'spark.shuffle.spill' : 'true',
        'spark.sql.parquet.compression.codec' : 'snappy',
        'spark.speculation' : 'true',
        'spark.kryo.referenceTracking' : 'false',
        'spark.hadoop.parquet.block.size' : '134217728',
        'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version' : '2',
        'spark.executor.memory' : '22g',
        'spark.hadoop.dfs.blocksize' : '134217728',
        'spark.shuffle.manager' : 'sort',
        'spark.driver.memory' : '25g',
        'spark.hadoop.mapreduce.input.fileinputformat.split.minsize' : '134217728',
        'spark.akka.frameSize' : '1024',
        'spark.yarn.executor.memoryOverhead' : '3120',
        'spark.sql.parquet.filterPushdown' : 'true',
        'spark.sql.inMemoryColumnarStorage.compressed' : 'true',
        'spark.hadoop.parquet.enable.summary-metadata' : 'false',
        'spark.serializer' : 'org.apache.spark.serializer.KryoSerializer',
        'spark.rdd.compress' : 'true',
        'spark.task.maxFailures' : '50',
        'spark.yarn.max.executor.failures' : '30',
        'spark.yarn.maxAppAttempts' : '1',
        'spark.default.parallelism' : '2001',
        'spark.network.timeout' : '1200s',
        'spark.hadoop.dfs.client.read.shortcircuit' : 'true',
        'spark.dynamicAllocation.enabled' : 'true',
        'spark.executor.cores' : '5',
        'spark.yarn.driver.memoryOverhead' : '5025',
        'spark.shuffle.consolidateFiles' : 'true',
        'spark.sql.parquet.mergeSchema' : 'false',
        'spark.sql.avro.compression.codec' : 'snappy',
        'spark.hadoop.dfs.domain.socket.path' : '/var/lib/hadoop-hdfs/dn_socket',
        'spark.shuffle.spill.compress' : 'false',
        'spark.sql.caseSensitive' : 'true',
        'spark.hadoop.mapreduce.use.directfileoutputcommitter' : 'true',
        'spark.shuffle.service.enabled' : 'true',
        'spark.driver.maxResultSize' : '0',
        'spark.sql.shuffle.partitions' : '2001'}

下面是在AirFlow中用来运行火花作业的类

代码语言:javascript
运行
复制
SparkSubmitOperator(
                                 task_id='ml_agg',
                                 application='/home/hdfs/airflow/dags/ML_Agg/ML_Aggregation-assembly-1.0.jar',
                                 conf=my_conf,
                                 conn_id='spark_default',
                 files=None,
                 py_files=None,
                 archives=None,
                 driver_class_path=None,
                 jars=None,
                 java_class='com.pubmatic.ml.MLAggregation_v2',
                 packages='com.databricks:spark-csv_2.11:1.3.0,com.databricks:spark-avro_2.11:2.0.1',
                 exclude_packages=None,
                 repositories=None,
                 keytab=None,
                 principal=None,
                 name='test_airflow_ml_aggregation',
                 application_args=application_args,
                 env_vars=None,
                 verbose=False,
                 spark_binary="spark-submit",
                 dag=my_dag
                 )

此外,还提到了spark_default配置。

代码语言:javascript
运行
复制
{"queue":"default","deploy_mode": "cluster", "spark_home": "", "spark_binary": "spark-submit", "namespace": "default"}

尽管如此,作业仍然在纱线上的默认队列上运行。

我还需要做点什么吗?

EN

回答 1

Stack Overflow用户

发布于 2019-11-28 05:03:18

spark.yarn.queue被注释掉。您需要取消注释才能在Pixel队列中运行它。

要在submit中使用队列,可以运行spark-submit命令,如下所示:

代码语言:javascript
运行
复制
spark-submit --master yarn --conf spark.executor.memory=XG --conf spark.driver.memory=YG --packages [packages separated by ,] --queue [queue_name] --class [class_name] [jar_file] [arguments]
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59079020

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档