前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 机器学习中的线性代数库

Spark 机器学习中的线性代数库

作者头像
Michael阿明
发布2021-09-06 11:01:35
4280
发布2021-09-06 11:01:35
举报
文章被收录于专栏:Michael阿明学习之路

文章目录

学自:Spark机器学习实战

https://book.douban.com/subject/35280412/

环境:win 10 + java 1.8.0_281 + Scala 2.11.11 + Hadoop 2.7.7 + Spark2.4.7

1. DenseVector、SparseVector

代码语言:javascript
复制
		// 通过数组来创建 DenseVector
        val CustomerFeatures1: Array[Double] = Array(1, 3, 5, 7, 9, 1, 3, 2, 4, 5, 6, 1, 2, 5, 3, 7, 4, 3, 4, 1)
        val x = Vectors.dense(CustomerFeatures1)
        println(x) // [1.0,3.0,5.0,7.0,9.0,1.0,3.0,2.0,4.0,5.0,6.0,1.0,2.0,5.0,3.0,7.0,4.0,3.0,4.0,1.0]

        // 通过字符串转化为 double
        val y = Vectors.dense("24".toDouble, "8".toDouble, "001".toDouble)
        println(y) // [24.0,8.0,1.0]

        // 创建 SparseVector稀疏向量, 下面进行对比两种Vector
        val denseVec1 = Vectors.dense(5, 0, 3, 0, 0, 0, 7, 8)
        println(denseVec1.size) // 8
        println(denseVec1.numActives) // 8
        println(denseVec1.numNonzeros) // 4
        println(denseVec1) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]

        val sparseVec1 = Vectors.sparse(8, Array(0, 2, 6, 7), Array(5, 3, 7, 8))
        println(sparseVec1.size) // 8
        println(sparseVec1.numActives) // 4
        println(sparseVec1.numNonzeros) // 4
        println(sparseVec1) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0]), size, idx, values
        // 稀疏向量仅存储 非零元素的 位置 和 值

        //相互转换
        val ConvertedDenseVector = sparseVec1.toDense
        println(ConvertedDenseVector) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]
        val ConvertedSparseVector = denseVec1.toSparse
        println(ConvertedSparseVector) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0])

        // 注意 : DenseVector, SparseVector 都是本地的 Vector, 不是分布式的

2. DenseMatrix

代码语言:javascript
复制
// 本地的 DenseMatrix
        val MyArr1 = Array(10, 11, 20, 30.3, 24, 8)
        val denseMat1 = Matrices.dense(2, 3, MyArr1) // 行、 列、 values
        println(denseMat1)
        // 10.0  20.0  24.0
        // 11.0  30.3  8.0

        // 也可以使用 内联的方式
        val denseMat2 = Matrices.dense(3, 2, Array(1, 2, 3, 4, 5, 6))
        println(denseMat2)
        // 1.0  4.0
        // 2.0  5.0
        // 3.0  6.0

        // 使用 多个 vector 内联方式创建 Matrix
        val v1 = Vectors.dense(1, 2, 3, 4)
        val v2 = Vectors.dense(5, 6, 7, 8)
        val v3 = Vectors.dense(9, 10, 11, 12)
        val Mat1 = Matrices.dense(4, 3, v1.toArray ++ v2.toArray ++ v3.toArray)
        println(Mat1)
        // 1.0  5.0  9.0
        // 2.0  6.0  10.0
        // 3.0  7.0  11.0
        // 4.0  8.0  12.0
        // 记住 : Spark 内部使用 列优先 存储机制,性能更好

3. SparseMatrix

代码语言:javascript
复制
// 本地 SparseMatrix
        val sparseMat1 = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(11, 22, 33))
        // 参数顺序 : 行,列,每列的元素个数的前缀和(上面例子表示的两列各有1-0,3-1个元素)、行索引、实际值
        // 参考图片理解:https://www.cnblogs.com/zhangbojiangfeng/p/7456961.html
        println(sparseMat1.numRows) // 3
        println(sparseMat1.numCols) // 2
        println(sparseMat1.numActives) // 3
        println(sparseMat1.numNonzeros) // 3
        println(sparseMat1)
        // 3 x 2 CSCMatrix
        // (0,0) 11.0
        // (1,1) 22.0
        // (2,1) 33.0

4. Vector 运算

代码语言:javascript
复制
// Vector 运算, spark 2.0 缺少vector运算支持,需要先转成 breezeVector
        val w1 = Vectors.dense(1,2,3)
        val w2 = Vectors.dense(4,-5,6)
        // 将 Spark 支持的 Vector 转换为 Breeze库所支持的Vector,可以使用丰富的库API操作
        val w3 = new BreezeVector(w1.toArray)
        val w4 = new BreezeVector(w2.toArray)
        println(w3+w4) // DenseVector(5.0, -3.0, 9.0)
        println(w3-w4) // DenseVector(-3.0, 7.0, -3.0)
        println(w3.dot(w4)) // 12.0

        val sv1 = Vectors.sparse(10, Array(0,2,9), Array(5,3,13))
        val sv2 = Vectors.dense(1,0,1,1,0,0,1,0,0,13)
        println(sv1)
        println(sv2)
        println(new BreezeVector(sv1.toArray).dot(new BreezeVector(sv2.toArray)))

5. 矩阵运算

代码语言:javascript
复制
// spark 支持 SparseMatrix 和 DenseMatrix 运算,不需要转成 Breeze 库中相应类型
        // 创建 Matrix, 矩阵和向量相乘
        val sparseMat2 = Matrices.sparse(3,3,Array(0,2,3,6), Array(0,2,1,0,1,2),Array(1,2,3,4,5,6))
        val denseFeatureVector = Vectors.dense(1,2,1)
        val ans0 = sparseMat2.multiply(denseFeatureVector)
        println(ans0) // 稀疏矩阵可以和稠密矩阵相互转换  [5.0,11.0,8.0]

        val denseVec3 = Vectors.dense(5,3,0)
        val denseMat3 = Matrices.dense(3,3,Array(1,0,0,0,1,0,0,0,1))
        println(denseMat3)
        println(denseVec3)
        println(denseMat3.multiply(denseVec3)) // [5.0,3.0,0.0]

        // 矩阵转置
        println(sparseMat2)
        // 3 x 3 CSCMatrix
        // (0,0) 1.0
        // (2,0) 2.0
        // (1,1) 3.0
        // (0,2) 4.0
        // (1,2) 5.0
        // (2,2) 6.0
        val transposeSparseMat2 = sparseMat2.transpose
        println(transposeSparseMat2)
        //  3 x 3 CSCMatrix
        // (0,0) 1.0
        // (2,0) 4.0
        // (1,1) 3.0
        // (2,1) 5.0
        // (0,2) 2.0
        // (2,2) 6.0

        // 矩阵相乘
        val dMat1 = new DenseMatrix(2,2,Array(1,3,2,4))
        val dMat2 = new DenseMatrix(2,2,Array(2,1,0,2))
        println(dMat1)
        // 1.0  2.0
        // 3.0  4.0
        println(dMat2)
        // 2.0  0.0
        // 1.0  2.0
        println(dMat1.multiply(dMat2))
        // 4.0   4.0
        // 10.0  8.0
        println(dMat2.multiply(dMat1))
        // 2.0  4.0
        // 7.0  10.0

6. RowMatrix

  • 面向行的 Matrix,缺点是 没有行索引用来追踪,它是由本地 Vector 作为行组成的
代码语言:javascript
复制
// RowMatrix
        val dataVectors = Seq(
            Vectors.dense(0, 1, 0),
            Vectors.dense(3, 1, 5),
            Vectors.dense(0, 7, 0)
        )
        val identityVectors = Seq(
            Vectors.dense(1, 0, 0),
            Vectors.dense(0, 1, 0),
            Vectors.dense(0, 0, 1)
        )
        // 获取原始序列,转成 RDD
        val distMat3 = new RowMatrix(spark.sparkContext.parallelize(dataVectors))
        println(distMat3) // org.apache.spark.mllib.linalg.distributed.RowMatrix@352ed70d
        println(distMat3.computeColumnSummaryStatistics().count) // 3
        println(distMat3.computeColumnSummaryStatistics().mean) // 列的属性 均值 [1.0,3.0,1.6666666666666665]
        println(distMat3.computeColumnSummaryStatistics().variance) // 列的方差 [3.0,12.0,8.333333333333334]
        println(distMat3.computeCovariance()) // 协方差
        // 3.0   -3.0                5.0
        // -3.0  12.0                -4.999999999999999
        // 5.0   -4.999999999999999  8.333333333333334

        val dd = identityVectors.map(x => x.toArray).flatten.toArray
        dd.foreach(println)
        val dmIdentity = Matrices.dense(3,3,dd)
        println(dmIdentity) // 本地矩阵
        // 1.0  0.0  0.0
        // 0.0  1.0  0.0
        // 0.0  0.0  1.0
        // 分布式矩阵 * 本地矩阵 得到 一个新的分布式矩阵
        // 应用:通过乘以 一个细长、竖直 或者 狭长的矩阵,实现数据量的降低和结果的维度约减
        val distMat4 = distMat3.multiply(dmIdentity)
        println(distMat4) // org.apache.spark.mllib.linalg.distributed.RowMatrix@205df5dc
        println(distMat4.computeColumnSummaryStatistics().count) // 3
        println(distMat4.computeColumnSummaryStatistics().mean)
        println(distMat4.computeColumnSummaryStatistics().variance)
        println(distMat4.computeCovariance())

7. IndexedRowMatrix

  • 可以携带 索引 和 数据行 RDD,可以随机访问,定位数据
代码语言:javascript
复制
// IndexedRowMatrix
        val distIdxMat1 = spark.sparkContext.parallelize(List(IndexedRow(0L, dataVectors.head),
            IndexedRow(1L, dataVectors(1)), IndexedRow(1L, dataVectors(2))))
        distIdxMat1.foreach(println)
        // IndexedRow(0,[0.0,1.0,0.0])  index , RDD row
        // IndexedRow(1,[3.0,1.0,5.0])
        // IndexedRow(1,[0.0,7.0,0.0])
        println("distinct elements=", distIdxMat1.distinct().count()) // 3

8. CoordinateMatrix

  • 涉及大量 3D 坐标系统数据时,这个形式的矩阵非常有用
代码语言:javascript
复制
// CoordinateMatrix
        val CoordinateEntries = Seq(
            MatrixEntry(1, 6, 300.0), // 对应坐标,x,y类型必须 long,  z double
            MatrixEntry(3, 1, 5),
            MatrixEntry(1, 7, 10)
        )

        val distCordMat1 = new CoordinateMatrix(spark.sparkContext.parallelize(CoordinateEntries.toList))
        println("First Row (MarixEntry) =",distCordMat1.entries.first())
        // (First Row (MarixEntry) =,MatrixEntry(1,6,300.0))

9. BlockMatrix

代码语言:javascript
复制
// BlockMatrix
        val distBlkMat1 =  distCordMat1.toBlockMatrix().cache()
        distBlkMat1.validate()
        // Validates the block matrix info against the matrix data (blocks)
        // and throws an exception if any error is found.
        println("Is block empty =", distBlkMat1.blocks.isEmpty())
        // (Is block empty =,false)

完整代码

代码语言:javascript
复制
package spark.ml.cookbook.chapter2


import breeze.linalg.{DenseVector => BreezeVector}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Vectors}
import org.apache.spark.sql.SparkSession


object MyVectorMatrix {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
        Logger.getLogger("akka").setLevel(Level.ERROR)
        val spark = SparkSession.builder.master("local[*]")
            .appName("MyVectorMatrix")
            .config("spark.sql.warehouse.dir", ".")
            .config("spark.io.compression.codec", "snappy")
            .getOrCreate()

        // 通过数组来创建 DenseVector
        val CustomerFeatures1: Array[Double] = Array(1, 3, 5, 7, 9, 1, 3, 2, 4, 5, 6, 1, 2, 5, 3, 7, 4, 3, 4, 1)
        val x = Vectors.dense(CustomerFeatures1)
        println(x) // [1.0,3.0,5.0,7.0,9.0,1.0,3.0,2.0,4.0,5.0,6.0,1.0,2.0,5.0,3.0,7.0,4.0,3.0,4.0,1.0]

        // 通过字符串转化为 double
        val y = Vectors.dense("24".toDouble, "8".toDouble, "001".toDouble)
        println(y) // [24.0,8.0,1.0]

        // 创建 SparseVector稀疏向量, 下面进行对比两种Vector
        val denseVec1 = Vectors.dense(5, 0, 3, 0, 0, 0, 7, 8)
        println(denseVec1.size) // 8
        println(denseVec1.numActives) // 8
        println(denseVec1.numNonzeros) // 4
        println(denseVec1) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]

        val sparseVec1 = Vectors.sparse(8, Array(0, 2, 6, 7), Array(5, 3, 7, 8))
        println(sparseVec1.size) // 8
        println(sparseVec1.numActives) // 4
        println(sparseVec1.numNonzeros) // 4
        println(sparseVec1) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0]), size, idx, values
        // 稀疏向量仅存储 非零元素的 位置 和 值

        //相互转换
        val ConvertedDenseVector = sparseVec1.toDense
        println(ConvertedDenseVector) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]
        val ConvertedSparseVector = denseVec1.toSparse
        println(ConvertedSparseVector) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0])

        // 注意 : DenseVector, SparseVector 都是本地的 Vector, 不是分布式的

        // 本地的 DenseMatrix
        val MyArr1 = Array(10, 11, 20, 30.3, 24, 8)
        val denseMat1 = Matrices.dense(2, 3, MyArr1) // 行、 列、 values
        println(denseMat1)
        // 10.0  20.0  24.0
        // 11.0  30.3  8.0

        // 也可以使用 内联的方式
        val denseMat2 = Matrices.dense(3, 2, Array(1, 2, 3, 4, 5, 6))
        println(denseMat2)
        // 1.0  4.0
        // 2.0  5.0
        // 3.0  6.0

        // 使用 多个 vector 内联方式创建 Matrix
        val v1 = Vectors.dense(1, 2, 3, 4)
        val v2 = Vectors.dense(5, 6, 7, 8)
        val v3 = Vectors.dense(9, 10, 11, 12)
        val Mat1 = Matrices.dense(4, 3, v1.toArray ++ v2.toArray ++ v3.toArray)
        println(Mat1)
        // 1.0  5.0  9.0
        // 2.0  6.0  10.0
        // 3.0  7.0  11.0
        // 4.0  8.0  12.0
        // 记住 : Spark 内部使用 列优先 存储机制,性能更好

        // 本地 SparseMatrix
        val sparseMat1 = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(11, 22, 33))
        // 参数顺序 : 行,列,每列的元素个数的前缀和(上面例子表示的两列各有1-0,3-1个元素)、行索引、实际值
        // 参考图片理解:https://www.cnblogs.com/zhangbojiangfeng/p/7456961.html
        println(sparseMat1.numRows) // 3
        println(sparseMat1.numCols) // 2
        println(sparseMat1.numActives) // 3
        println(sparseMat1.numNonzeros) // 3
        println(sparseMat1)
        // 3 x 2 CSCMatrix
        // (0,0) 11.0
        // (1,1) 22.0
        // (2,1) 33.0

        // Vector 运算, spark 2.0 缺少vector运算支持,需要先转成 breezeVector
        val w1 = Vectors.dense(1, 2, 3)
        val w2 = Vectors.dense(4, -5, 6)
        // 将 Spark 支持的 Vector 转换为 Breeze库所支持的Vector,可以使用丰富的库API操作
        val w3 = new BreezeVector(w1.toArray)
        val w4 = new BreezeVector(w2.toArray)
        println(w3 + w4) // DenseVector(5.0, -3.0, 9.0)
        println(w3 - w4) // DenseVector(-3.0, 7.0, -3.0)
        println(w3.dot(w4)) // 12.0

        val sv1 = Vectors.sparse(10, Array(0, 2, 9), Array(5, 3, 13))
        val sv2 = Vectors.dense(1, 0, 1, 1, 0, 0, 1, 0, 0, 13)
        println(sv1)
        println(sv2)
        println(new BreezeVector(sv1.toArray).dot(new BreezeVector(sv2.toArray)))

        // spark 支持 SparseMatrix 和 DenseMatrix 运算,不需要转成 Breeze 库中相应类型
        // 创建 Matrix, 矩阵和向量相乘
        val sparseMat2 = Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1, 2, 3, 4, 5, 6))
        val denseFeatureVector = Vectors.dense(1, 2, 1)
        val ans0 = sparseMat2.multiply(denseFeatureVector)
        println(ans0) // 稀疏矩阵可以和稠密矩阵相互转换  [5.0,11.0,8.0]

        val denseVec3 = Vectors.dense(5, 3, 0)
        val denseMat3 = Matrices.dense(3, 3, Array(1, 0, 0, 0, 1, 0, 0, 0, 1))
        println(denseMat3)
        println(denseVec3)
        println(denseMat3.multiply(denseVec3)) // [5.0,3.0,0.0]

        // 矩阵转置
        println(sparseMat2)
        // 3 x 3 CSCMatrix
        // (0,0) 1.0
        // (2,0) 2.0
        // (1,1) 3.0
        // (0,2) 4.0
        // (1,2) 5.0
        // (2,2) 6.0
        val transposeSparseMat2 = sparseMat2.transpose
        println(transposeSparseMat2)
        //  3 x 3 CSCMatrix
        // (0,0) 1.0
        // (2,0) 4.0
        // (1,1) 3.0
        // (2,1) 5.0
        // (0,2) 2.0
        // (2,2) 6.0

        // 矩阵相乘
        val dMat1 = new DenseMatrix(2, 2, Array(1, 3, 2, 4))
        val dMat2 = new DenseMatrix(2, 2, Array(2, 1, 0, 2))
        println(dMat1)
        // 1.0  2.0
        // 3.0  4.0
        println(dMat2)
        // 2.0  0.0
        // 1.0  2.0
        println(dMat1.multiply(dMat2))
        // 4.0   4.0
        // 10.0  8.0
        println(dMat2.multiply(dMat1))
        // 2.0  4.0
        // 7.0  10.0

        // RowMatrix
        val dataVectors = Seq(
            Vectors.dense(0, 1, 0),
            Vectors.dense(3, 1, 5),
            Vectors.dense(0, 7, 0)
        )
        val identityVectors = Seq(
            Vectors.dense(1, 0, 0),
            Vectors.dense(0, 1, 0),
            Vectors.dense(0, 0, 1)
        )
        // 获取原始序列,转成 RDD
        val distMat3 = new RowMatrix(spark.sparkContext.parallelize(dataVectors))
        println(distMat3) // org.apache.spark.mllib.linalg.distributed.RowMatrix@352ed70d
        println(distMat3.computeColumnSummaryStatistics().count) // 3
        println(distMat3.computeColumnSummaryStatistics().mean) // 列的属性 均值 [1.0,3.0,1.6666666666666665]
        println(distMat3.computeColumnSummaryStatistics().variance) // 列的方差 [3.0,12.0,8.333333333333334]
        println(distMat3.computeCovariance()) // 协方差
        // 3.0   -3.0                5.0
        // -3.0  12.0                -4.999999999999999
        // 5.0   -4.999999999999999  8.333333333333334

        val dd = identityVectors.flatMap(x => x.toArray).toArray
        dd.foreach(println)
        val dmIdentity = Matrices.dense(3,3,dd)
        println(dmIdentity) // 本地矩阵
        // 1.0  0.0  0.0
        // 0.0  1.0  0.0
        // 0.0  0.0  1.0
        // 分布式矩阵 * 本地矩阵 得到 一个新的分布式矩阵
        // 应用:通过乘以 一个细长、竖直 或者 狭长的矩阵,实现数据量的降低和结果的维度约减
        val distMat4 = distMat3.multiply(dmIdentity)
        println(distMat4) // org.apache.spark.mllib.linalg.distributed.RowMatrix@205df5dc
        println(distMat4.computeColumnSummaryStatistics().count) // 3
        println(distMat4.computeColumnSummaryStatistics().mean)
        println(distMat4.computeColumnSummaryStatistics().variance)
        println(distMat4.computeCovariance())

        // IndexedRowMatrix
        val distIdxMat1 = spark.sparkContext.parallelize(List(IndexedRow(0L, dataVectors.head),
            IndexedRow(1L, dataVectors(1)), IndexedRow(1L, dataVectors(2))))
        distIdxMat1.foreach(println)
        // IndexedRow(0,[0.0,1.0,0.0])  index , RDD row
        // IndexedRow(1,[3.0,1.0,5.0])
        // IndexedRow(1,[0.0,7.0,0.0])
        println("distinct elements=", distIdxMat1.distinct().count()) // 3

        // CoordinateMatrix
        val CoordinateEntries = Seq(
            MatrixEntry(1, 6, 300.0), // 对应坐标,x,y类型必须 long,  z double
            MatrixEntry(3, 1, 5),
            MatrixEntry(1, 7, 10)
        )

        val distCordMat1 = new CoordinateMatrix(spark.sparkContext.parallelize(CoordinateEntries.toList))
        println("First Row (MarixEntry) =",distCordMat1.entries.first())
        // (First Row (MarixEntry) =,MatrixEntry(1,6,300.0))


        // BlockMatrix
        val distBlkMat1 =  distCordMat1.toBlockMatrix().cache()
        distBlkMat1.validate()
        // Validates the block matrix info against the matrix data (blocks)
        // and throws an exception if any error is found.
        println("Is block empty =", distBlkMat1.blocks.isEmpty())
        // (Is block empty =,false)
        spark.stop()
    }
}

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>examples</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>net.jpountz.lz4</groupId>
            <artifactId>lz4</artifactId>
            <version>1.3.0</version>
        </dependency>
    </dependencies>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>


</project>
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/05/06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 1. DenseVector、SparseVector
  • 2. DenseMatrix
  • 3. SparseMatrix
  • 4. Vector 运算
  • 5. 矩阵运算
  • 6. RowMatrix
  • 7. IndexedRowMatrix
  • 8. CoordinateMatrix
  • 9. BlockMatrix
  • 完整代码
    • pom.xml
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档