首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >PySpark RDD与Scala的转换

PySpark RDD与Scala的转换
EN

Stack Overflow用户
提问于 2016-09-12 20:40:14
回答 1查看 2.3K关注 0票数 5

TL;DR -我在PySpark应用程序中有一个看起来像字符串的DStream。我想将它作为一个DStream[String] 发送到Scala库。但是,Py4j不转换字符串.

我正在开发一个PySpark应用程序,它使用星火流从Kafka提取数据。我的消息是字符串,我想在Scala代码中调用一个方法,向它传递一个DStream[String]实例。但是,我无法在Scala代码中接收到适当的JVM字符串。在我看来,Python字符串不是转换成Java字符串,而是序列化的。

我的问题是:如何从DStream对象中获取Java?

下面是我想出的最简单的Python代码:

代码语言:javascript
运行
复制
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()

我正在PySpark中运行这段代码,将它传递到JAR的路径:

代码语言:javascript
运行
复制
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

在Scala方面,我有:

代码语言:javascript
运行
复制
package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}

现在,假设我把一些数据发送到Kafka:

代码语言:javascript
运行
复制
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN

Scala代码中的println语句打印如下所示:

代码语言:javascript
运行
复制
[B@758aa4d9

我本来想得到foo bar的。

现在,如果我将Scala代码中的简单println语句替换为以下内容:

代码语言:javascript
运行
复制
rdd.foreach(v => println(v.getClass.getCanonicalName))

我得到:

代码语言:javascript
运行
复制
java.lang.ClassCastException: [B cannot be cast to java.lang.String

这表明字符串实际上是以字节数组的形式传递的。

如果我只是尝试将这个字节数组转换成一个字符串(我知道我甚至没有指定编码):

代码语言:javascript
运行
复制
      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }

我得到的东西看起来很像(特殊人物可能会被剥去):

代码语言:javascript
运行
复制
�]qXfoo barqa.

这表明Python字符串是序列化的(泡沫化的?)。我如何检索适当的Java字符串呢?

EN

Stack Overflow用户

回答已采纳

发布于 2016-09-12 22:52:05

长话短说,没有人支持这样做。不要在生产中尝试这个。有人警告过你。

通常,除了驱动程序上的一些基本RPC调用之外,Spark不会使用Py4j进行任何其他操作,并且不会在任何其他机器上启动Py4j网关。当需要时(主要是MLlib和部分SQL),Spark使用软锰矿来序列化JVM和Python之间传递的对象。

API的这一部分要么是私有的(Scala),要么是内部的(Python),因此不打算用于一般用途。尽管理论上您无论如何都可以访问它,但每批都要访问它:

代码语言:javascript
运行
复制
package dummy

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.DataFrame

object PythonRDDHelper {
  def go(rdd: JavaRDD[Any]) = {
    rdd.rdd.collect {
      case s: String => s
    }.take(5).foreach(println)
  }
}

全流:

代码语言:javascript
运行
复制
object PythonDStreamHelper {
  def go(stream: JavaDStream[Any]) = {
    stream.dstream.transform(_.collect {
      case s: String => s
    }).print
  }
}

或者将单个批公开为DataFrames (可能是最不坏的选项):

代码语言:javascript
运行
复制
object PythonDataFrameHelper {
  def go(df: DataFrame) = {
    df.show
  }
}

并按以下方式使用这些包装:

代码语言:javascript
运行
复制
from pyspark.streaming import StreamingContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.rdd import RDD

ssc = StreamingContext(spark.sparkContext, 10)
spark.catalog.listTables()

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output)
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd)
))

# Reserialize and convert to JavaDStream<Object>
# This is the only option which allows further transformations
# on DStream
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD(  # Reserialize but keep as Python RDD
        _to_java_object_rdd(rdd), ssc.sparkContext
    ))._jdstream
)

# Convert to DataFrame and pass to Scala sink.
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
        rdd.map(lambda x: (x, )).toDF()._jdf
    )
)

ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop()

这是不受支持的,未经测试的,因此,除了使用Spark的实验之外,其他任何东西都是无用的。

票数 7
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39458465

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档