如何在应用转换之前将每个火花输入流中的数据集组合为一个。我用的是火花-2.0.0
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val lines = ssc.textFileStream("input")
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val dataSet = sqlContext.read.json(rdd)
中提到的示例允许我在TCP流中接收数据包并监听端口9999。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 core
我的问题是,当我将代码转换为流模式并将数据帧放入foreach循环时,数据帧会显示空表!我不填!我也不能将它放入assembler.transform()中。错误是:
Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U].
Unspecified value parameter mapFunc.
v
我在火星上部署了师父和工人。当我尝试使用SparkStreaming进行一些计算时,它失败了。我在sbt控制台中创建了StreamingContext。
请参见下面的错误消息、示例代码、build.sbt和运行程序的命令。
错误信息
ERROR StreamingContext: Error starting the context,
marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations
registered, so nothing to exe
如何使用apache来流HDFS中已经存在的文件?
我有一个非常具体的用例,我有数百万的客户数据,我想使用apache流在客户级别处理这些数据。目前,我要做的是获取整个客户数据集和重新分区 it on customerId,并创建100个这样的分区,并确保在单个流中传递唯一的客户多个记录。
现在我有了HDFS位置中的所有数据。
hdfs:/tmp/数据集
现在,使用上面的HDFS位置,我想要流文件,它将读取拼花文件,获取数据集。我试过以下几样东西,但没有运气。
// start stream
val sparkConf = new SparkConf().setAppName("
在使用foreachRDD进行CSV数据处理时,我得到了异常。这是我的密码
case class Person(name: String, age: Long)
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("CassandraExample").set("spark.driver.allowMultipleContexts", "true")
val ssc = new StreamingContext(conf,