首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >气流DataprocSubmitJobOperator - ValueError:协议消息作业没有"python_file_uris“字段

气流DataprocSubmitJobOperator - ValueError:协议消息作业没有"python_file_uris“字段
EN

Stack Overflow用户
提问于 2022-07-17 18:36:08
回答 1查看 211关注 0票数 1

我正在使用气流中的DataprocSubmitJobOperator来安排火花放电作业,而且当我无法将pyfiles传递给火花放电作业时

下面是我使用的代码:

达格

代码语言:javascript
复制
# working - passing jars
PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : ["gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar",
                               'gs://dataproc-spark-jars/bson-4.0.5.jar','gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar','gs://dataproc-spark-jars/mongodb-driver-core-4.0.5.jar',
                               'gs://dataproc-spark-jars/mongodb-driver-sync-4.0.5.jar','gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar','gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar',
                           'gs://dataproc-spark-jars/spark-token-provider-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/htrace-core4-4.1.0-incubating.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/spark-sql-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/hadoop-client-runtime-3.3.1.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/kafka-clients-3.2.0.jar','gs://dataproc-spark-jars/commons-pool2-2.11.1.jar'],
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12','gs://kafka-certs/appstat-anomaly-user.p12','gs://kafka-certs/appstat-agg-user.p12','gs://kafka-certs/alarmblock-user.p12']
        },
        "python_file_uris": ['gs://dagger-mongo/move2mongo_api.zip']
}

path = "gs://dataproc-spark-configs/pip_install.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone="us-east1-b",
    master_machine_type="n1-highmem-8",
    worker_machine_type="n1-highmem-8",
    num_workers=2,
    storage_bucket="dataproc-spark-logs",
    init_actions_uris=[path],
    metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl kafka-python google-cloud-storage pyspark'},
).make()

with models.DAG(
        'Versa-kafka2mongo_api',
        # Continue to run DAG twice per day
        default_args=default_dag_args,
        #schedule_interval='*/10 * * * *',
        schedule_interval=None,
        catchup=False,
        ) as dag:

    # create_dataproc_cluster
    create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        cluster_name=CLUSTER_NAME,
        region=REGION,
        cluster_config=CLUSTER_GENERATOR_CONFIG
    )

    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )

    delete_dataproc_cluster = DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        region=REGION,
        trigger_rule=trigger_rule.TriggerRule.ALL_SUCCESS
    )

   # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_spark >> delete_dataproc_cluster

下面是python (call_kafka2mongo_api.py)代码:

代码语言:javascript
复制
from move2mongo_api import alarmBlock_api
from pyspark.sql import SparkSession



spark = SparkSession.builder.appName('kafka2mongo_api').getOrCreate()
print(f" spark : {spark}")
spark.sparkContext.addPyFile("move2mongo_api.zip")

cust = ['versa']
for c in cust:
    ap = alarmBlock_api(spark, c)
    ap.readFromKafka()

请注意:在运行气流作业之前,我将move2mongo_api.zip上传到存储桶( gs://dagger-mongo/move2mongo_api.zip)。move2mongo_api.zip -包含python文件-> alarmBlock_api.py,在作业文件'call_kafka2mongo.py‘中引用。

当我运行这个工作流时,我得到的错误如下所示:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 1849, in execute
    job_object = self.hook.submit_job(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 439, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 870, in submit_job
    return client.submit_job(
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataproc_v1/services/job_controller/client.py", line 493, in submit_job
    request = jobs.SubmitJobRequest(request)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 516, in __init__
    pb_value = marshal.to_proto(pb_type, value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 211, in to_proto
    pb_value = rule.to_proto(value)
  File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
    return self._descriptor(**value)
ValueError: Protocol message Job has no "python_file_uris" field.
[2022-07-17, 18:07:00 UTC] {taskinstance.py:1279} INFO - Marking task as UP_FOR_RETRY. dag_id=Versa-kafka2mongo_api, task_id=run_dataproc_spark, execution_date=20220717T180647, start_date=20220717T180659, end_date=20220717T180700

我在这里做错什么了?有什么办法来调试/修复这个问题吗?

蒂娅!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-17 19:32:09

您似乎在PYSPARK_JOB中出现了布局问题。

你有:

代码语言:javascript
复制
PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : []
        },
        "python_file_uris": []
}

你想:

代码语言:javascript
复制
PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : []
        "python_file_uris": []
    },
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73014392

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档