首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >结构化流错误py4j.protocol.Py4JNetworkError: Java端的回答为空

结构化流错误py4j.protocol.Py4JNetworkError: Java端的回答为空
EN

Stack Overflow用户
提问于 2018-05-04 17:18:19
回答 1查看 4.1K关注 0票数 3

我正在尝试使用PySpark和Structured Streaming (Spark2.3)在两个Kafka Stream之间进行左向外连接。

代码语言:javascript
复制
import os
import time

from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode, get_json_object
from ast import literal_eval
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("Spark Kafka Structured Streaming") \
    .getOrCreate()

schema_impressions = StructType() \
    .add("id_req", StringType()) \
    .add("ts_imp_request", TimestampType()) \
    .add("country", StringType()) \
    .add("TS_IMPRESSION", TimestampType()) 

schema_requests = StructType() \
    .add("id_req", StringType()) \
    .add("page", StringType()) \
    .add("conntype", StringType()) \
    .add("TS_REQUEST", TimestampType()) 

impressions = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
  .option("subscribe", "ssp.datascience_impressions") \
  .load()

requests = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
  .option("subscribe", "ssp.datascience_requests") \
  .option("startingOffsets", "latest") \
  .load()

query_requests = requests \
        .select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_requests).alias("parsed")) \
        .select(col("timestamp").alias("timestamp_req"), "parsed.id_req", "parsed.page", "parsed.conntype", "parsed.TS_REQUEST") \
        .withWatermark("timestamp_req", "120 seconds") 

query_impressions = impressions \
        .select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_impressions).alias("parsed")) \
        .select(col("timestamp").alias("timestamp_imp"), col("parsed.id_req").alias("id_imp"), "parsed.ts_imp_request", "parsed.country", "parsed.TS_IMPRESSION") \
        .withWatermark("timestamp_imp", "120 seconds") 

query_requests.printSchema()        
query_impressions.printSchema()

> root  
|-- timestamp_req: timestamp (nullable = true)  
|-- id_req: string (nullable = true)  
|-- page: string (nullable = true)  
|-- conntype: string (nullable = true)  
|-- TS_REQUEST: timestamp (nullable = true)
> 
> root  |-- timestamp_imp: timestamp (nullable = true)  
|-- id_imp: string (nullable = true)  
|-- ts_imp_request: timestamp (nullable = true)  
|-- country: string (nullable = true)  
|-- TS_IMPRESSION: timestamp (nullable = true)

在简历中,我将从两个Kafka流中获取数据,在接下来的几行中,我将尝试使用I进行连接。

代码语言:javascript
复制
rawQuery = query_requests.join(query_impressions,  expr(""" 
    (id_req = id_imp AND 
    timestamp_imp >= timestamp_req AND 
    timestamp_imp <= timestamp_req + interval 5 minutes) 
    """), 
  "leftOuter")

rawQuery = rawQuery \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/home/jovyan/streaming/applicationHistory") \
        .option("path", "/home/jovyan/streaming").start()
print(rawQuery.status)

{'message':‘正在处理新数据’,'isDataAvailable':True,'isTriggerActive':True}错误:根:发送命令时出现异常。回溯(最近一次调用):文件"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行1062,在send_command中提升Py4JNetworkError(“来自Java端的应答为空”) py4j.protocol.Py4JNetworkError:来自Java端的应答为空

在处理上述异常的过程中,发生了另一个异常:

回溯(最近一次调用):文件响应行908,在命令响应=connection.send_command(命令)文件"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行1067,在send_command“接收时出错”,e,proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError:接收ERROR:py4j.java_gateway:An时出错尝试连接到Java服务器时出错(127.0.0.1:33968)回溯(最近一次调用):

文件"/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py",行2910,在run_code exec(code_obj,self.user_global_ns,self.user_ns)文件"",行3,在打印(rawQuery.status)文件"/opt/conda/lib/python3.6/site-packages/pyspark/sql/streaming.py",行114,在status return "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",(self._jsq.status().json())文件"/opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py",行1160中,在call answer,self.gateway_client,self.target_id,self.name中,在deco return f(*a,**kw)文件名称行328,格式为get_return_value (target_id,".","/opt/conda/lib/python3.6/site-packages/py4j/protocol.py",)) py4j.protocol.Py4JError:调用o92.status出错

在处理上述异常的过程中,发生了另一个异常:

回溯(最近一次调用):文件"/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py",行1828,在showtraceback stb = value._render_traceback_() AttributeError:'Py4JError‘对象没有属性'_render_traceback_’

在处理上述异常的过程中,发生了另一个异常:

回溯(最近一次调用):文件连接第852行,在_get_connection "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",= self.deque.pop()中IndexError:从空的双队列中弹出

我使用Jupyter Notebook在本地运行Spark。在spark/conf/spark-defaults.conf中,我有:

代码语言:javascript
复制
# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory             15g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

如果我在前面的错误之后尝试使用Spark,我会收到该错误:

错误:根目录:发送命令时出现异常。回溯(最近一次调用):文件"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行1062,在send_command中提升Py4JNetworkError(“来自Java端的应答为空”) py4j.protocol.Py4JNetworkError:来自Java端的应答为空

在处理上述异常的过程中,发生了另一个异常:

回溯(最近一次调用):文件响应第908行,在响应=connection.send_command(命令)文件"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行1067,在send_command中“接收时出错”,e,proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError:接收时出错

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-05-15 16:57:31

我解决了这个问题!基本上,这个问题与Jupyter Notebook有关。我删除了前面代码的下一行:

代码语言:javascript
复制
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

我使用控制台运行代码:

代码语言:javascript
复制
> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 spark_structured.py

这样,我就可以毫无问题地运行所有的代码。

如果您遇到同样的问题,您也可以更改spark-default.conf并增加spark.driver.memoryspark.executor.memory

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50171431

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档