在foreachRDD()中,有一些使用Spark的示例。但是如果我想在tranform()中使用SQL
case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
if (rdd.count > 0) {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("logstash")
我的目标是在每个数据点上有k个最近的邻居。我想避免使用带查找的for循环,并且在每个rdd_distance点上同时使用其他的东西,但是我想不出如何做到这一点。
parsedData = RDD[Object]
//Object have an id and a vector as attribute
//sqdist1 output is a Double
var rdd_distance = parsedData.cartesian(parsedData)
.flatMap { case (x,y) =>
if(x.get_id != y.get_id)
我想连接两个数据集,第一个数据集是4.5 GB,第二个数据集是5MB。
下面是我的问题,
val data= rdd1.join(rdd2,regexp_replace($"rdd2.SUBSCRIBER_ID","^0*","") === regexp_replace($"rdd1.subscriberid","^0*", "" ) or
((substring($"rdd2.FIRST_NAME",0,3) === $"rdd1.firstName") a
我正在尝试使用Kafka DirectStream,处理每个分区的RDDs,并将处理后的值写入DB。当我尝试执行reduceByKey(每个分区,也就是没有随机)时,我得到以下错误。通常在驱动节点上,我们可以使用sc.parallelize(迭代器)来解决这个问题。但我想用spark streaming来解决这个问题。
value reduceByKey is not a member of Iterator[((String, String), (Int, Int))]
有没有办法在分区内的Iterator上执行转换?
myKafkaDS
.foreachRDD { rdd =>
我试图在类似于下面的星火数据框架上进行多项式曲线拟合(使用Spark版本2.4.0.7.1.5,ScalaVersion2.11.12 (OpenJDK 64位服务器VM,1.8.0_232))。
我为此编写了一个联非新议程,它可以注册,但在运行时得到一个错误。
我是斯卡拉和联非新议程的新手。你能帮我看看我的功能,看看它有什么问题吗?
谢谢,
示例df
val n = 2
val data = Seq(
(1,80.0,-0.361982467), (1,70.0,0.067847447), (1,50.0,-0.196768255),
(1,40.0,-0.135489192)
我在println行得到错误消息SPARK-5063
val d.foreach{x=> for(i<-0 until x.length)
println(m.lookup(x(i)))}
D是RDD[Array[String]],m是RDD[(String, String)]。有没有办法按我想要的方式打印?或者如何将d从RDD[Array[String]]转换为Array[String]?
我有一个RDD,我想将这个RDD与另一个(具有相同类型的内容)“连接”在一起,而union是一个合适的方法。但是,在连接rdd之前,我希望确保我的集合满足某些要求(因此使用if-语句),然后合并rdd。不幸的是,下面代码中描述的联合不能在循环之外持久。有没有办法做到这一点呢?从if-else内部逐步向原始RDD rdd添加更多条目?如果没有if_else,联合就能正常工作。
var rdd = sc.parallelize(Seq[String]())
val (!collection.isEmpty) {
val value = collection.map(_._2)
r
希望在我的Windows机器上运行一个GraphX示例,使用SparklyR安装Hadoop/Spark的SparklyR。首先可以从安装目录启动shell:
start C:\\Users\\eyeOfTheStorm\\AppData\\Local\\rstudio\\spark\\Cache\\spark-2.0.0-bin-hadoop2.7\\bin\\spark-shell
输出:
17/01/02 12:21:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... u
我有一些关于服务提供商客户的数据(~1MB)。我正在尝试根据几个特性来预测是否会终止订阅(PySpark on Databricks)。
单特征模型
首先,我只试了一个功能,并看到了成功的培训:
# Create vector assembler to merge independent features (in this case just one) into one feature as a list
vectorAssembler = VectorAssembler(inputCols=['MonthlyCharges'], outputCol='Charges&
我按照的第一部分创建了一个外部配置单元表,并将其指向特定的S3 Bucket。在Hue界面中,我可以在成功创建后浏览数据示例。如果我切换到齐柏林飞艇并运行以下命令:%sql show tables,我可以看到我的表列在default数据库旁边。
现在,如果我实际尝试查询表,就会得到一个java.io.IOException: Not a file: s3://my-bucket/my-subdirectory错误。这个错误是有道理的,但是Hive会让你指定一个S3存储桶,而不是一个实际的S3文件,所以我不知道如何让两者都满意!
请注意,此目录中只有一个文件,并且我没有尝试任何分区。该文件经过压
我想过滤一个JavaRdd到三个不同的rdd,基于一个特定的condition.Right,现在我正在阅读相同的rdd三次并过滤it.Is,还有其他有效的方法来实现这一点吗?
Example:
Like I have an rdd of type string and I want to filter it based on name 'anshu','suman' and 'neeraj'
rdd1=rdd.filter(s->{s.contains("anshu")?return true; else return
我正在尝试在我的开发环境中使用独立安装的Spark 2.2进行一些测试。
我使用databricks库读取csv文件,然后创建临时视图。在我使用spark.sql()运行select语句之后。如果我在该DataFrame上执行collect()或任何其他稍后需要生成执行器操作,我将收到NullPointerException。
我使用spark-shell BTW。
这是我使用的代码:
val dir = "Downloads/data.csv"
val da = spark.read.format("com.databricks.spark.csv").opt