我正在使用气流中的DataprocSubmitJobOperator来安排火花放电作业,而且当我无法将pyfiles传递给火花放电作业时
下面是我使用的代码:
达格
# working - passing jars
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": PYSPARK_URI,
"jar_file_uris" : ["gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar",
'gs://dataproc-spark-jars/bson-4.0.5.jar','gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar','gs://dataproc-spark-jars/mongodb-driver-core-4.0.5.jar',
'gs://dataproc-spark-jars/mongodb-driver-sync-4.0.5.jar','gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar','gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar',
'gs://dataproc-spark-jars/spark-token-provider-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/htrace-core4-4.1.0-incubating.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/spark-sql-kafka-0-10_2.12-3.1.3.jar','gs://dataproc-spark-jars/hadoop-client-runtime-3.3.1.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/kafka-clients-3.2.0.jar','gs://dataproc-spark-jars/commons-pool2-2.11.1.jar'],
"file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12','gs://kafka-certs/appstat-anomaly-user.p12','gs://kafka-certs/appstat-agg-user.p12','gs://kafka-certs/alarmblock-user.p12']
},
"python_file_uris": ['gs://dagger-mongo/move2mongo_api.zip']
}
path = "gs://dataproc-spark-configs/pip_install.sh"
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
project_id=PROJECT_ID,
zone="us-east1-b",
master_machine_type="n1-highmem-8",
worker_machine_type="n1-highmem-8",
num_workers=2,
storage_bucket="dataproc-spark-logs",
init_actions_uris=[path],
metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl kafka-python google-cloud-storage pyspark'},
).make()
with models.DAG(
'Versa-kafka2mongo_api',
# Continue to run DAG twice per day
default_args=default_dag_args,
#schedule_interval='*/10 * * * *',
schedule_interval=None,
catchup=False,
) as dag:
# create_dataproc_cluster
create_dataproc_cluster = DataprocCreateClusterOperator(
task_id="create_dataproc_cluster",
cluster_name=CLUSTER_NAME,
region=REGION,
cluster_config=CLUSTER_GENERATOR_CONFIG
)
run_dataproc_spark = DataprocSubmitJobOperator(
task_id="run_dataproc_spark",
job=PYSPARK_JOB,
location=REGION,
project_id=PROJECT_ID,
)
delete_dataproc_cluster = DataprocDeleteClusterOperator(
task_id="delete_dataproc_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=trigger_rule.TriggerRule.ALL_SUCCESS
)
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_spark >> delete_dataproc_cluster下面是python (call_kafka2mongo_api.py)代码:
from move2mongo_api import alarmBlock_api
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('kafka2mongo_api').getOrCreate()
print(f" spark : {spark}")
spark.sparkContext.addPyFile("move2mongo_api.zip")
cust = ['versa']
for c in cust:
ap = alarmBlock_api(spark, c)
ap.readFromKafka()请注意:在运行气流作业之前,我将move2mongo_api.zip上传到存储桶( gs://dagger-mongo/move2mongo_api.zip)。move2mongo_api.zip -包含python文件-> alarmBlock_api.py,在作业文件'call_kafka2mongo.py‘中引用。
当我运行这个工作流时,我得到的错误如下所示:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 1849, in execute
job_object = self.hook.submit_job(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 439, in inner_wrapper
return func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 870, in submit_job
return client.submit_job(
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/dataproc_v1/services/job_controller/client.py", line 493, in submit_job
request = jobs.SubmitJobRequest(request)
File "/opt/python3.8/lib/python3.8/site-packages/proto/message.py", line 516, in __init__
pb_value = marshal.to_proto(pb_type, value)
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/marshal.py", line 211, in to_proto
pb_value = rule.to_proto(value)
File "/opt/python3.8/lib/python3.8/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
return self._descriptor(**value)
ValueError: Protocol message Job has no "python_file_uris" field.
[2022-07-17, 18:07:00 UTC] {taskinstance.py:1279} INFO - Marking task as UP_FOR_RETRY. dag_id=Versa-kafka2mongo_api, task_id=run_dataproc_spark, execution_date=20220717T180647, start_date=20220717T180659, end_date=20220717T180700我在这里做错什么了?有什么办法来调试/修复这个问题吗?
蒂娅!
发布于 2022-07-17 19:32:09
您似乎在PYSPARK_JOB中出现了布局问题。
你有:
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": PYSPARK_URI,
"jar_file_uris" : []
},
"python_file_uris": []
}你想:
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
"main_python_file_uri": PYSPARK_URI,
"jar_file_uris" : []
"python_file_uris": []
},
}https://stackoverflow.com/questions/73014392
复制相似问题