前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pyspark 原理、源码解析与优劣势分析(2) ---- Executor 端进程间通信和序列化

pyspark 原理、源码解析与优劣势分析(2) ---- Executor 端进程间通信和序列化

作者头像
流川疯
发布2021-12-06 16:14:29
1.5K0
发布2021-12-06 16:14:29
举报
文章被收录于专栏:流川疯编写程序的艺术

文章大纲

系列文章:

Executor 端进程间通信和序列化

对于 Spark 内置的算子,在 Python 中调用 RDD、DataFrame 的接口后,从上文可以看出会通过 JVM 去调用到 Scala 的接口,最后执行和直接使用 Scala 并无区别。而 对于需要使用 UDF 的情形,在 Executor 端就需要启动一个 Python worker 子进程,然后执行 UDF 的逻辑。那么 Spark 是怎样判断需要启动子进程的呢?

在 Spark 编译用户的 DAG 的时候,Catalyst Optimizer 会创建 BatchEvalPython 或者 ArrowEvalPython 这样的 Logical Operator,随后会被转换成 PythonEvals 这个 Physical Operator。在 PythonEvals(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala)中:

代码语言:javascript
复制
object PythonEvals extends Strategy {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case ArrowEvalPython(udfs, output, child, evalType) =>
      ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil
    case BatchEvalPython(udfs, output, child) =>
      BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil
    case _ =>
      Nil
  }
}

创建了 ArrowEvalPythonExec 或者 BatchEvalPythonExec,而这二者内部会创建 ArrowPythonRunner、PythonUDFRunner 等类的对象实例,并调用了它们的 compute 方法。由于它们都继承了 BasePythonRunner,基类的 compute 方法中会去启动 Python 子进程:

代码语言:javascript
复制
def compute(
      inputIterator: Iterator[IN],
      partitionIndex: Int,
      context: TaskContext): Iterator[OUT] = {
  // ......
 
  val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
  // Start a thread to feed the process input from our parent's iterator
  val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context)
  writerThread.start()
  val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
 
  val stdoutIterator = newReaderIterator(
    stream, writerThread, startTime, env, worker, releasedOrClosed, context)
  new InterruptibleIterator(context, stdoutIterator)

这里 env.createPythonWorker 会通过 PythonWorkerFactory

(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala)去启动 Python 进程。

Executor 端启动 Python 子进程后,会创建一个 socket 与 Python 建立连接。所有 RDD 的数据都要序列化后,通过 socket 发送,而结果数据需要同样的方式序列化传回 JVM。

对于直接使用 RDD 的计算,或者没有开启 spark.sql.execution.arrow.enabled 的 DataFrame,是将输入数据按行发送给 Python,可想而知,这样效率极低。

在 Spark 2.2 后提供了基于 Arrow 的序列化、反序列化的机制(从 3.0 起是默认开启),从 JVM 发送数据到 Python 进程的代码在 sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala。这个类主要是重写了 newWriterThread 这个方法,使用了 ArrowWriter 向 socket 发送数据:

代码语言:javascript
复制
val arrowWriter = ArrowWriter.create(root)
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()
 
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
 
while (nextBatch.hasNext) {
    arrowWriter.write(nextBatch.next())
}
 
arrowWriter.finish()
writer.writeBatch()
arrowWriter.reset()

可以看到,每次取出一个 batch,填充给 ArrowWriter,实际数据会保存在 root 对象中,然后由 ArrowStreamWriter 将 root 对象中的整个 batch 的数据写入到 socket 的 DataOutputStream 中去。ArrowStreamWriter 会调用 writeBatch 方法去序列化消息并写数据,代码参考 ArrowWriter.java#L131。

代码语言:javascript
复制
protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
  ArrowBlock block = MessageSerializer.serialize(out, batch, option);
  LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
      block.getOffset(), block.getMetadataLength(), block.getBodyLength());
  return block;
}

在 MessageSerializer 中,使用了 flatbuffer 来序列化数据。flatbuffer 是一种比较高效的序列化协议,它的主要优点是反序列化的时候,不需要解码,可以直接通过裸 buffer 来读取字段,可以认为反序列化的开销为零。我们来看看 Python 进程收到消息后是如何反序列化的。

Python 子进程实际上是执行了 worker.py 的 main 函数 (python/pyspark/worker.py):

代码语言:javascript
复制
if __name__ == '__main__':
    # Read information about how to connect back to the JVM from the environment.
    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
    auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
    (sock_file, _) = local_connect_and_auth(java_port, auth_secret)
    main(sock_file, sock_file)

这里会去向 JVM 建立连接,并从 socket 中读取指令和数据。对于如何进行序列化、反序列化,是通过 UDF 的类型来区分:

代码语言:javascript
复制
eval_type = read_int(infile)
if eval_type == PythonEvalType.NON_UDF:
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
else:
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)

在 read_udfs 中,如果是 PANDAS 类的 UDF,会创建 ArrowStreamPandasUDFSerializer,其余的 UDF 类型创建 BatchedSerializer。我们来看看 ArrowStreamPandasUDFSerializer(python/pyspark/serializers.py):

代码语言:javascript
复制
def dump_stream(self, iterator, stream):
    import pyarrow as pa
    writer = None
    try:
        for batch in iterator:
            if writer is None:
                writer = pa.RecordBatchStreamWriter(stream, batch.schema)
            writer.write_batch(batch)
    finally:
        if writer is not None:
            writer.close()
 
def load_stream(self, stream):
    import pyarrow as pa
    reader = pa.ipc.open_stream(stream)
    for batch in reader:
        yield batch

可以看到,这里双向的序列化、反序列化,都是调用了 PyArrow 的 ipc 的方法,和前面看到的 Scala 端是正好对应的,也是按 batch 来读写数据。对于 Pandas 的 UDF,读到一个 batch 后,会将 Arrow 的 batch 转换成 Pandas Series。

代码语言:javascript
复制
def arrow_to_pandas(self, arrow_column):
    from pyspark.sql.types import _check_series_localize_timestamps
 
    # If the given column is a date type column, creates a series of datetime.date directly
    # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
    # datetime64[ns] type handling.
    s = arrow_column.to_pandas(date_as_object=True)
 
    s = _check_series_localize_timestamps(s, self._timezone)
    return s
 
def load_stream(self, stream):
    """
    Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
    """
    batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
    import pyarrow as pa
    for batch in batches:
        yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]

Pandas UDF

前面我们已经看到,PySpark 提供了基于 Arrow 的进程间通信来提高效率,那么对于用户在 Python 层的 UDF,是不是也能直接使用到这种高效的内存格式呢?答案是肯定的,这就是 PySpark 推出的 Pandas UDF。区别于以往以行为单位的 UDF,Pandas UDF 是以一个 Pandas Series 为单位,batch 的大小可以由 spark.sql.execution.arrow.maxRecordsPerBatch 这个参数来控制。这是一个来自官方文档的示例:

代码语言:javascript
复制
def multiply_func(a, b):
    return a * b
 
multiply = pandas_udf(multiply_func, returnType=LongType())
 
df.select(multiply(col("x"), col("x"))).show()

上文已经解析过,PySpark 会将 DataFrame 以 Arrow 的方式传递给 Python 进程,Python 中会转换为 Pandas Series,传递给用户的 UDF。在 Pandas UDF 中,可以使用 Pandas 的 API 来完成计算,在易用性和性能上都得到了很大的提升。

参考文献

https://www.mobvista.com/cn/blog/2019-12-27-2/ https://blog.csdn.net/Pysamlam/article/details/115683424

pyspark 启动原理: https://blog.csdn.net/wangyaninglm/article/details/114038572

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/05/24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章大纲
  • Executor 端进程间通信和序列化
  • Pandas UDF
  • 参考文献
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档