首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何在SPARK scala中创建两列邻接矩阵及其计数

在SPARK Scala中创建两列邻接矩阵及其计数的方法如下:

  1. 导入所需的SPARK库和模块:
代码语言:txt
复制
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.linalg.Vector
  1. 创建SPARK会话:
代码语言:txt
复制
val spark = SparkSession.builder().appName("AdjacencyMatrix").getOrCreate()
  1. 准备数据集: 假设我们有一个包含两列数据的DataFrame,分别为source和target,表示两个节点之间的连接关系。可以通过读取数据源或手动创建DataFrame来准备数据集。
  2. 创建CountVectorizer模型:
代码语言:txt
复制
val countVectorizer = new CountVectorizer()
  .setInputCol("source")
  .setOutputCol("sourceVector")
  .setVocabSize(1000)  // 设置词汇表大小,根据实际情况调整
  1. 对source列进行向量化转换:
代码语言:txt
复制
val sourceVectorizerModel = countVectorizer.fit(data)
val sourceVectorized = sourceVectorizerModel.transform(data)
  1. 创建CountVectorizer模型并对target列进行向量化转换:
代码语言:txt
复制
val targetVectorizerModel = countVectorizer.setInputCol("target").setOutputCol("targetVector").fit(data)
val targetVectorized = targetVectorizerModel.transform(sourceVectorized)
  1. 创建邻接矩阵:
代码语言:txt
复制
val adjacencyMatrix = targetVectorized.select("sourceVector", "targetVector")
  1. 计算邻接矩阵的计数:
代码语言:txt
复制
val adjacencyMatrixCount = adjacencyMatrix.groupBy("sourceVector", "targetVector").count()

完整代码示例:

代码语言:txt
复制
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.linalg.Vector

val spark = SparkSession.builder().appName("AdjacencyMatrix").getOrCreate()

// 准备数据集
val data = spark.createDataFrame(Seq(
  (1, 2),
  (1, 3),
  (2, 3),
  (3, 4),
  (4, 5)
)).toDF("source", "target")

// 创建CountVectorizer模型
val countVectorizer = new CountVectorizer()
  .setInputCol("source")
  .setOutputCol("sourceVector")
  .setVocabSize(1000)

// 对source列进行向量化转换
val sourceVectorizerModel = countVectorizer.fit(data)
val sourceVectorized = sourceVectorizerModel.transform(data)

// 创建CountVectorizer模型并对target列进行向量化转换
val targetVectorizerModel = countVectorizer.setInputCol("target").setOutputCol("targetVector").fit(data)
val targetVectorized = targetVectorizerModel.transform(sourceVectorized)

// 创建邻接矩阵
val adjacencyMatrix = targetVectorized.select("sourceVector", "targetVector")

// 计算邻接矩阵的计数
val adjacencyMatrixCount = adjacencyMatrix.groupBy("sourceVector", "targetVector").count()

adjacencyMatrixCount.show()

这段代码使用了SPARK的ML库中的CountVectorizer模型来将source和target列中的数据转换为向量表示,然后通过对向量化后的数据进行分组计数,得到了邻接矩阵及其计数。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

SparkR:数据科学家的新利器

随后,来自工业界的Alteryx、Databricks、Intel等公司和来自学术界的普渡大学,以及其它开发者积极参与到开发来,最终在2015年4月成功地合并进Spark代码库的主干分支,并在Spark...只提供了Spark组API的R语言封装,即Spark Core的RDD API和Spark SQL的DataFrame API。...需要指出的是,在Spark 1.4版本,SparkR的RDD API被隐藏起来没有开放,主要是出于点考虑: RDD API虽然灵活,但比较底层,R用户可能更习惯于使用更高层的API; RDD API...目前SparkR RDD实现了Scala RDD API的大部分方法,可以满足大多数情况下的使用需求: SparkR支持的创建RDD的方式有: 从R list或vector创建RDD(parallelize...Scala API RDD的每个分区的数据由iterator来表示和访问,而在SparkR RDD,每个分区的数据用一个list来表示,应用到分区的转换操作,mapPartitions(),接收到的分区数据是一个

4.1K20

【数据科学家】SparkR:数据科学家的新利器

随后,来自工业界的Alteryx、Databricks、Intel等公司和来自学术界的普渡大学,以及其它开发者积极参与到开发来,最终在2015年4月成功地合并进Spark代码库的主干分支,并在Spark...只提供了Spark组API的R语言封装,即Spark Core的RDD API和Spark SQL的DataFrame API。...需要指出的是,在Spark 1.4版本,SparkR的RDD API被隐藏起来没有开放,主要是出于点考虑: RDD API虽然灵活,但比较底层,R用户可能更习惯于使用更高层的API; RDD API...目前SparkR RDD实现了Scala RDD API的大部分方法,可以满足大多数情况下的使用需求: SparkR支持的创建RDD的方式有: 从R list或vector创建RDD(parallelize...Scala API RDD的每个分区的数据由iterator来表示和访问,而在SparkR RDD,每个分区的数据用一个list来表示,应用到分区的转换操作,mapPartitions(),接收到的分区数据是一个

3.5K100
  • Spark之【SparkSQL编程】系列(No1)——《SparkSession与DataFrame》

    上一篇博客已经为大家介绍完了SparkSQL的基本概念以及其提供的个编程抽象:DataFrame和DataSet,本篇博客,博主要为大家介绍的是关于SparkSQL编程的内容。...SparkSession 在老的版本,SparkSQL提供种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive...DataFrame 2.1 创建Spark SQLSparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的...注意使用全局表时需要全路径访问,:global_temp:people。...全局的临时视图存在于系统数据库 global_temp,我们必须加上库名去引用它 5)对于DataFrame创建一个全局表 scala> df.createGlobalTempView("people

    1.5K20

    大数据入门与实战-Spark上手

    Spark的主要特性是其内存的集群计算,可以提高应用程序的处理速度。 Spark旨在涵盖广泛的工作负载,批处理应用程序,迭代算法,交互式查询和流式处理。...有种方法可以创建RDD - 在驱动程序并行化现有集合,或在外部存储系统引用数据集,例如共享文件系统,HDFS,HBase或提供Hadoop输入格式的任何数据源。...Spark很懒,所以除非你调用一些会触发作业创建和执行的转换或动作,否则不执行任何操作。请查看以下单词计数示例的片段。...其他的这里不再一一举,想要了解更多的,大家可以看下:Spark核心编程 4.5 RDD 操作 -reduce(func):使用函数func(它接受个参数并返回一个)来聚合数据集的元素。...5.2 打开Spark-Shell 以下命令用于打开spark shell。通常,使用Scala构建spark。因此,Spark程序在Scala环境运行。

    1.1K20

    Spark入门基础深度解析图解

    1、Scala解析   Ⅰ、Scala解析器   Scala解析器会快速编译Scala代码为字节码然后交给JVM运行; REPL -> Read(取值) -> Evaluation(求值) -> Print...2、Spark体系概览 – Spark的地位图解 ? 3、Spark vs MapReduce的计算模型图解   Spark相对于Hadoop最大的不同在于迭代式计算模型; ?...广播变量会为每个节点拷贝一份变量,累加器则可以让多个task共同操作同一份变量进行累加计数;   广播变量是只读的;   累加器只提供了累加功能,只有Driver可以获取累加器的值; 12、Spark杂谈...  Ⅰ、Spark自定义二次排序: 需要Javabean实现Ordered 和 Serializable接口,然后在自定义的JavaBean里面定义需要进行排序的, 并为属性提供构造方法...14、RDD以及其特性 ? 15、Spark核心编程原理 ? ?

    51820

    Apache Spark:大数据时代的终极解决方案

    以下部分将介绍如何在Ubuntu 14.04或更高版本上安装单机模式的Spark 2.0.0。...可以通过种方法创建它们 - 通过在应用程序获取现有集合并通过Spark Context将其并行化或通过从HDFS,HBase,AWS等外部存储系统创建引用。...分配后,每个作业的执行者会收到用于执行作业的应用程序代码及其任务。每个Spark应用程序都有自己的可多线程的执行程序。数据需要存储在不同的Spark应用程序的外部存储以便共享。...(这是我第一个使用Spark的小字数计数程序。我将使用一个在Scala制作的简单MapReduce程序来计算每个单词的频率。)...Shopify、阿里巴巴和eBay都使用了这些技术。由于Spark能够快速诊断并过滤出具有健康风险状态的个人,医疗行业可从Spark数据分析受益。

    1.8K30

    Spark研究】用Apache Spark进行大数据处理第二部分:Spark SQL

    Spark SQL组件 使用Spark SQL时,最主要的个组件就是DataFrame和SQLContext。 首先,我们来了解一下DataFrame。...可以通过如下数据源创建DataFrame: 已有的RDD 结构化数据文件 JSON数据集 Hive表 外部数据库 Spark SQL和DataFrame API已经在下述几种程序设计语言中实现: Scala...JDBC数据源 Spark SQL库的其他功能还包括数据源,JDBC数据源。 JDBC数据源可用于通过JDBC API读取关系型数据库的数据。...Spark SQL示例应用 在上一篇文章,我们学习了如何在本地环境安装Spark框架,如何启动Spark框架并用Spark Scala Shell与其交互。...Spark SQL是一个功能强大的库,组织的非技术团队成员,业务分析师和数据分析师,都可以用Spark SQL执行数据分析。

    3.3K100

    Apache Spark大数据分析入门(一)

    Spark SQL使得用户使用他们最擅长的语言查询结构化数据,DataFrame位于Spark SQL的核心,DataFrame将数据保存为行的集合,对应行的各都被命名,通过使用DataFrame,...下载Spark并河演示如何使用交互式Shell命令行 动手实验Apache Spark的最好方式是使用交互式Shell命令行,Spark目前有Python Shell和Scala Shell种交互式命令行...在Scala Shell,执行下列操作: 在Spark中使用README 文件创建textFileRDD val textFile = sc.textFile("README.md") 获取textFile...想像每均为一个分区(partition ),你可以非常方便地将分区数据分配给集群的各个节点。...例如,我们可以使用Spark的文本文件README.md创建一个RDD textFile,文件包含了若干文本行,将该文本文件读入RDD textFile时,其中的文本行数据将被分区以便能够分发到集群并被并行化操作

    99550

    Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

    创建 DataFrames Scala Java Python R 在一个 SparkSession, 应用程序可以从一个 已经存在的 RDD, 从hive表, 或者从 Spark数据源创建一个...当 hive-site.xml 未配置时,上下文会自动在当前目录创建 metastore_db,并创建由 spark.sql.warehouse.dir 配置的目录,该目录默认为Spark应用程序当前目录的...但是,这意味着如果你的列名包含任何圆点,你现在必须避免使用反引号( table.column.with.dots.nested)。 在内存存储分区修剪默认是开启的。...在 Spark 1.3 ,Java API 和 Scala API 已经统一。种语言的用户可以使用 SQLContext 和 DataFrame。...一般来说论文类尝试使用种语言的共有类型( Array 替代了一些特定集合)。在某些情况下不通用的类型情况下,(例如,passing in closures 或 Maps)使用函数重载代替。

    26K80

    BigData--大数据技术之SparkStreaming

    数据输入后可以用Spark的高度抽象原语:map、reduce、join、window等进行运算。而结果也能保存在很多地方,HDFS,数据库等。 ? 1、SparkStreaming架构 ?...无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream的每一个RDD。部分无状态转化操作在了下表。...一个例子是随着窗口滑动对keys的“加”“减”计数。...与RDD的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。...其中 参数传入的函数func应该实现将每一个RDD数据推送到外部系统,将RDD存入文件或者通过网络将其写入数据库。

    86120

    Spark SQL,DataFrame以及 Datasets 编程指南 - For 2.0

    如上所述,在 Spark 2.0 ,DataFrames 是元素为 Row 的 Dataset 在 Scala 和 Java API 。...创建 Datasets 的第二种方法通过接口构造一个模式来应用于现有的 RDD。虽然这种方法要少复杂一些,但允许在及其类型直到运行时才知道的情况下构造 Datasets。...举个例子,我们可以使用下列目录结构存储上文中提到的人口属性数据至一个分区的表,将额外的 gender 和 country 作为分区: path └── to └── table...如果用户即只想访问 path/to/table/gender=male 下的数据,又希望 gender 能成为分区,可以使用 basePath 选项,将 basePath 设置为 path/to/table...Spark SQL会只会缓存需要的并且会进行压缩以减小内存消耗和 GC 压力。可以调用 spark.uncacheTable("tableName") 将表内存移除。

    4K20

    使用ReduceByKey在Spark中进行词频统计

    Spark采用Local模式运行,Spark版本3.2.0,Scala版本2.12,集成idea开发环境。 实验代码 import org.apache.spark....{SparkConf, SparkContext} object ReduceByKey { def main(args: Array[String]): Unit = { // 创建...() } } 在执行 reduceByKey(_ + _) 这一步后,生成的 RDD 将包含每个单词及其对应的累加值,数据结构类似于 (单词, 累加值)。...在上下文中,_ + _ 表示一个匿名函数,用于对个相同类型的值进行相加操作。在这里,这个值是指 reduceByKey 函数对于相同键的个值。具体来说: 第一个 _ 表示相同键的第一个值。...在这个例子,键是单词,而值是累加的次数。所以 _ + _ 表示将相同键的值(即累加的次数)相加,以得到该键对应的总累加值。

    7410

    原 荐 SparkSQL简介及入门

    但是,随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于hive的太多依赖(采用hive的语法解析器、查询优化器等等),制约了Spark的One Stack rule them all...2)在应用程序可以混合使用不同来源的数据,可以将来自HiveQL的数据和来自SQL的数据进行Join操作。     ...比如针对二元数据,可以用字节编码压缩来实现(010101)     这样,每个创建一个JVM对象,从而可以快速的GC和紧凑的数据存储;额外的,还可以使用低廉CPU开销的高效压缩方法(字典编码、行长度编码等压缩方法...3、行存储VS存储     目前大数据存储有种方案可供选择:行存储(Row-Based)和存储(Column-Based)。...scala> res0.printSchema #查看的类型等属性 root |-- id: integer (nullable = true)     创建DataFrame对象     DataFrame

    2.5K60

    如何管理Spark的分区

    以下操作是将数据合并到个分区: scala> val numsDF2 = numsDF.coalesce(2) numsDF2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row...] = [num: int] 我们可以验证上述操作是否创建了只有个分区的新DataFrame:可以看出,分区数变为了2 scala> numsDF2.rdd.partitions.size res13...] = [name: string, gender: string] 按进行分区时,Spark默认会创建200个分区。...如果要将数据写出到文件系统,则可以选择一个分区大小,以创建合理大小的文件。 该使用哪种方法进行重分区呢?...总结 本文主要介绍了Spark是如何管理分区的,分别解释了Spark提供的种分区方法,并给出了相应的使用示例和分析。最后对分区情况及其影响进行了讨论,并给出了一些实践的建议。希望本文对你有所帮助。

    1.9K10

    SparkSQL极简入门

    但是,随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于hive的太多依赖(采用hive的语法解析器、查询优化器等等),制约了Spark的One Stack rule them all...2)在应用程序可以混合使用不同来源的数据,可以将来自HiveQL的数据和来自SQL的数据进行Join操作。 3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算。...显然这种内存存储方式对于基于内存计算的spark来说,很昂贵也负担不起) 2、SparkSql的存储方式 对于内存存储来说,将所有原生数据类型的采用原生数组来存储,将Hive支持的复杂数据类型(array...比如针对二元数据,可以用字节编码压缩来实现(010101) 这样,每个创建一个JVM对象,从而可以快速的GC和紧凑的数据存储;额外的,还可以使用低廉CPU开销的高效压缩方法(字典编码、行长度编码等压缩方法...SparkSql将RDD封装成一个DataFrame对象,这个对象类似于关系型数据库的表。 1、创建DataFrame对象 DataFrame就相当于数据库的一张表。

    3.8K10

    4.4 共享变量

    □广播变量:可以在内存的所有节点中被访问,用于缓存变量(只读); □累加器:只能用来做加法的变量,计数和求和。...4.4.2 累加器 累加器是一种只能通过关联操作进行“加”操作的变量,因此可以在并行计算得到高效的支持。类似MapReduce的counter,可以用来实现计数和求和等功能。...Spark原生支持Int和Double类型的累加器,程序员可以自己添加新的支持类型。 累加器可以通过调用SparkContext.accumulator(v)方法从一个初始值v创建。...该AccumulatorParam接口有个方法:提供了一个“zero”值进行初始化,以及一个addInPlace方法将个值相加,如果需要可以自己尝试需要的类型,Vector。...本章重点讲解了如何创建Spark的RDD,以及RDD的一系列转换和执行操作,并给出一些基于Scala编程语言的支持。

    1.2K120

    SparkSql的优化器-Catalyst

    一,概述 为了实现Spark SQL,基于Scala的函数编程结构设计了一个新的可扩展优化器Catalyst。Catalyst可扩展的设计有个目的。...最后,规则条件及其本身可以包含任意的Scala代码。这使得Catalyst比优化器的域特定语言更强大,同时保持简洁的简单规则。 在经验,对不变树的功能转换使得整个优化器非常容易推理和调试。...2),将命名的属性(“col”)映射到给定操作符的子节点的输入。...物理计划还可以执行基于规则的物理优化,比如将裁剪和过滤操在一个Spark的Map算子以pipeline方式执行。此外,它可以将逻辑计划的操作下推到支持谓词或projection 下推的数据源。...后面也会举例讲解,如何在我们的应用中使用。

    2.7K90
    领券