学自:Spark机器学习实战
https://book.douban.com/subject/35280412/
环境:win 10 + java 1.8.0_281 + Scala 2.11.11 + Hadoop 2.7.7 + Spark2.4.7
// 通过数组来创建 DenseVector
val CustomerFeatures1: Array[Double] = Array(1, 3, 5, 7, 9, 1, 3, 2, 4, 5, 6, 1, 2, 5, 3, 7, 4, 3, 4, 1)
val x = Vectors.dense(CustomerFeatures1)
println(x) // [1.0,3.0,5.0,7.0,9.0,1.0,3.0,2.0,4.0,5.0,6.0,1.0,2.0,5.0,3.0,7.0,4.0,3.0,4.0,1.0]
// 通过字符串转化为 double
val y = Vectors.dense("24".toDouble, "8".toDouble, "001".toDouble)
println(y) // [24.0,8.0,1.0]
// 创建 SparseVector稀疏向量, 下面进行对比两种Vector
val denseVec1 = Vectors.dense(5, 0, 3, 0, 0, 0, 7, 8)
println(denseVec1.size) // 8
println(denseVec1.numActives) // 8
println(denseVec1.numNonzeros) // 4
println(denseVec1) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]
val sparseVec1 = Vectors.sparse(8, Array(0, 2, 6, 7), Array(5, 3, 7, 8))
println(sparseVec1.size) // 8
println(sparseVec1.numActives) // 4
println(sparseVec1.numNonzeros) // 4
println(sparseVec1) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0]), size, idx, values
// 稀疏向量仅存储 非零元素的 位置 和 值
//相互转换
val ConvertedDenseVector = sparseVec1.toDense
println(ConvertedDenseVector) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]
val ConvertedSparseVector = denseVec1.toSparse
println(ConvertedSparseVector) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0])
// 注意 : DenseVector, SparseVector 都是本地的 Vector, 不是分布式的
// 本地的 DenseMatrix
val MyArr1 = Array(10, 11, 20, 30.3, 24, 8)
val denseMat1 = Matrices.dense(2, 3, MyArr1) // 行、 列、 values
println(denseMat1)
// 10.0 20.0 24.0
// 11.0 30.3 8.0
// 也可以使用 内联的方式
val denseMat2 = Matrices.dense(3, 2, Array(1, 2, 3, 4, 5, 6))
println(denseMat2)
// 1.0 4.0
// 2.0 5.0
// 3.0 6.0
// 使用 多个 vector 内联方式创建 Matrix
val v1 = Vectors.dense(1, 2, 3, 4)
val v2 = Vectors.dense(5, 6, 7, 8)
val v3 = Vectors.dense(9, 10, 11, 12)
val Mat1 = Matrices.dense(4, 3, v1.toArray ++ v2.toArray ++ v3.toArray)
println(Mat1)
// 1.0 5.0 9.0
// 2.0 6.0 10.0
// 3.0 7.0 11.0
// 4.0 8.0 12.0
// 记住 : Spark 内部使用 列优先 存储机制,性能更好
// 本地 SparseMatrix
val sparseMat1 = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(11, 22, 33))
// 参数顺序 : 行,列,每列的元素个数的前缀和(上面例子表示的两列各有1-0,3-1个元素)、行索引、实际值
// 参考图片理解:https://www.cnblogs.com/zhangbojiangfeng/p/7456961.html
println(sparseMat1.numRows) // 3
println(sparseMat1.numCols) // 2
println(sparseMat1.numActives) // 3
println(sparseMat1.numNonzeros) // 3
println(sparseMat1)
// 3 x 2 CSCMatrix
// (0,0) 11.0
// (1,1) 22.0
// (2,1) 33.0
// Vector 运算, spark 2.0 缺少vector运算支持,需要先转成 breezeVector
val w1 = Vectors.dense(1,2,3)
val w2 = Vectors.dense(4,-5,6)
// 将 Spark 支持的 Vector 转换为 Breeze库所支持的Vector,可以使用丰富的库API操作
val w3 = new BreezeVector(w1.toArray)
val w4 = new BreezeVector(w2.toArray)
println(w3+w4) // DenseVector(5.0, -3.0, 9.0)
println(w3-w4) // DenseVector(-3.0, 7.0, -3.0)
println(w3.dot(w4)) // 12.0
val sv1 = Vectors.sparse(10, Array(0,2,9), Array(5,3,13))
val sv2 = Vectors.dense(1,0,1,1,0,0,1,0,0,13)
println(sv1)
println(sv2)
println(new BreezeVector(sv1.toArray).dot(new BreezeVector(sv2.toArray)))
// spark 支持 SparseMatrix 和 DenseMatrix 运算,不需要转成 Breeze 库中相应类型
// 创建 Matrix, 矩阵和向量相乘
val sparseMat2 = Matrices.sparse(3,3,Array(0,2,3,6), Array(0,2,1,0,1,2),Array(1,2,3,4,5,6))
val denseFeatureVector = Vectors.dense(1,2,1)
val ans0 = sparseMat2.multiply(denseFeatureVector)
println(ans0) // 稀疏矩阵可以和稠密矩阵相互转换 [5.0,11.0,8.0]
val denseVec3 = Vectors.dense(5,3,0)
val denseMat3 = Matrices.dense(3,3,Array(1,0,0,0,1,0,0,0,1))
println(denseMat3)
println(denseVec3)
println(denseMat3.multiply(denseVec3)) // [5.0,3.0,0.0]
// 矩阵转置
println(sparseMat2)
// 3 x 3 CSCMatrix
// (0,0) 1.0
// (2,0) 2.0
// (1,1) 3.0
// (0,2) 4.0
// (1,2) 5.0
// (2,2) 6.0
val transposeSparseMat2 = sparseMat2.transpose
println(transposeSparseMat2)
// 3 x 3 CSCMatrix
// (0,0) 1.0
// (2,0) 4.0
// (1,1) 3.0
// (2,1) 5.0
// (0,2) 2.0
// (2,2) 6.0
// 矩阵相乘
val dMat1 = new DenseMatrix(2,2,Array(1,3,2,4))
val dMat2 = new DenseMatrix(2,2,Array(2,1,0,2))
println(dMat1)
// 1.0 2.0
// 3.0 4.0
println(dMat2)
// 2.0 0.0
// 1.0 2.0
println(dMat1.multiply(dMat2))
// 4.0 4.0
// 10.0 8.0
println(dMat2.multiply(dMat1))
// 2.0 4.0
// 7.0 10.0
// RowMatrix
val dataVectors = Seq(
Vectors.dense(0, 1, 0),
Vectors.dense(3, 1, 5),
Vectors.dense(0, 7, 0)
)
val identityVectors = Seq(
Vectors.dense(1, 0, 0),
Vectors.dense(0, 1, 0),
Vectors.dense(0, 0, 1)
)
// 获取原始序列,转成 RDD
val distMat3 = new RowMatrix(spark.sparkContext.parallelize(dataVectors))
println(distMat3) // org.apache.spark.mllib.linalg.distributed.RowMatrix@352ed70d
println(distMat3.computeColumnSummaryStatistics().count) // 3
println(distMat3.computeColumnSummaryStatistics().mean) // 列的属性 均值 [1.0,3.0,1.6666666666666665]
println(distMat3.computeColumnSummaryStatistics().variance) // 列的方差 [3.0,12.0,8.333333333333334]
println(distMat3.computeCovariance()) // 协方差
// 3.0 -3.0 5.0
// -3.0 12.0 -4.999999999999999
// 5.0 -4.999999999999999 8.333333333333334
val dd = identityVectors.map(x => x.toArray).flatten.toArray
dd.foreach(println)
val dmIdentity = Matrices.dense(3,3,dd)
println(dmIdentity) // 本地矩阵
// 1.0 0.0 0.0
// 0.0 1.0 0.0
// 0.0 0.0 1.0
// 分布式矩阵 * 本地矩阵 得到 一个新的分布式矩阵
// 应用:通过乘以 一个细长、竖直 或者 狭长的矩阵,实现数据量的降低和结果的维度约减
val distMat4 = distMat3.multiply(dmIdentity)
println(distMat4) // org.apache.spark.mllib.linalg.distributed.RowMatrix@205df5dc
println(distMat4.computeColumnSummaryStatistics().count) // 3
println(distMat4.computeColumnSummaryStatistics().mean)
println(distMat4.computeColumnSummaryStatistics().variance)
println(distMat4.computeCovariance())
// IndexedRowMatrix
val distIdxMat1 = spark.sparkContext.parallelize(List(IndexedRow(0L, dataVectors.head),
IndexedRow(1L, dataVectors(1)), IndexedRow(1L, dataVectors(2))))
distIdxMat1.foreach(println)
// IndexedRow(0,[0.0,1.0,0.0]) index , RDD row
// IndexedRow(1,[3.0,1.0,5.0])
// IndexedRow(1,[0.0,7.0,0.0])
println("distinct elements=", distIdxMat1.distinct().count()) // 3
// CoordinateMatrix
val CoordinateEntries = Seq(
MatrixEntry(1, 6, 300.0), // 对应坐标,x,y类型必须 long, z double
MatrixEntry(3, 1, 5),
MatrixEntry(1, 7, 10)
)
val distCordMat1 = new CoordinateMatrix(spark.sparkContext.parallelize(CoordinateEntries.toList))
println("First Row (MarixEntry) =",distCordMat1.entries.first())
// (First Row (MarixEntry) =,MatrixEntry(1,6,300.0))
// BlockMatrix
val distBlkMat1 = distCordMat1.toBlockMatrix().cache()
distBlkMat1.validate()
// Validates the block matrix info against the matrix data (blocks)
// and throws an exception if any error is found.
println("Is block empty =", distBlkMat1.blocks.isEmpty())
// (Is block empty =,false)
package spark.ml.cookbook.chapter2
import breeze.linalg.{DenseVector => BreezeVector}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Vectors}
import org.apache.spark.sql.SparkSession
object MyVectorMatrix {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val spark = SparkSession.builder.master("local[*]")
.appName("MyVectorMatrix")
.config("spark.sql.warehouse.dir", ".")
.config("spark.io.compression.codec", "snappy")
.getOrCreate()
// 通过数组来创建 DenseVector
val CustomerFeatures1: Array[Double] = Array(1, 3, 5, 7, 9, 1, 3, 2, 4, 5, 6, 1, 2, 5, 3, 7, 4, 3, 4, 1)
val x = Vectors.dense(CustomerFeatures1)
println(x) // [1.0,3.0,5.0,7.0,9.0,1.0,3.0,2.0,4.0,5.0,6.0,1.0,2.0,5.0,3.0,7.0,4.0,3.0,4.0,1.0]
// 通过字符串转化为 double
val y = Vectors.dense("24".toDouble, "8".toDouble, "001".toDouble)
println(y) // [24.0,8.0,1.0]
// 创建 SparseVector稀疏向量, 下面进行对比两种Vector
val denseVec1 = Vectors.dense(5, 0, 3, 0, 0, 0, 7, 8)
println(denseVec1.size) // 8
println(denseVec1.numActives) // 8
println(denseVec1.numNonzeros) // 4
println(denseVec1) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]
val sparseVec1 = Vectors.sparse(8, Array(0, 2, 6, 7), Array(5, 3, 7, 8))
println(sparseVec1.size) // 8
println(sparseVec1.numActives) // 4
println(sparseVec1.numNonzeros) // 4
println(sparseVec1) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0]), size, idx, values
// 稀疏向量仅存储 非零元素的 位置 和 值
//相互转换
val ConvertedDenseVector = sparseVec1.toDense
println(ConvertedDenseVector) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]
val ConvertedSparseVector = denseVec1.toSparse
println(ConvertedSparseVector) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0])
// 注意 : DenseVector, SparseVector 都是本地的 Vector, 不是分布式的
// 本地的 DenseMatrix
val MyArr1 = Array(10, 11, 20, 30.3, 24, 8)
val denseMat1 = Matrices.dense(2, 3, MyArr1) // 行、 列、 values
println(denseMat1)
// 10.0 20.0 24.0
// 11.0 30.3 8.0
// 也可以使用 内联的方式
val denseMat2 = Matrices.dense(3, 2, Array(1, 2, 3, 4, 5, 6))
println(denseMat2)
// 1.0 4.0
// 2.0 5.0
// 3.0 6.0
// 使用 多个 vector 内联方式创建 Matrix
val v1 = Vectors.dense(1, 2, 3, 4)
val v2 = Vectors.dense(5, 6, 7, 8)
val v3 = Vectors.dense(9, 10, 11, 12)
val Mat1 = Matrices.dense(4, 3, v1.toArray ++ v2.toArray ++ v3.toArray)
println(Mat1)
// 1.0 5.0 9.0
// 2.0 6.0 10.0
// 3.0 7.0 11.0
// 4.0 8.0 12.0
// 记住 : Spark 内部使用 列优先 存储机制,性能更好
// 本地 SparseMatrix
val sparseMat1 = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(11, 22, 33))
// 参数顺序 : 行,列,每列的元素个数的前缀和(上面例子表示的两列各有1-0,3-1个元素)、行索引、实际值
// 参考图片理解:https://www.cnblogs.com/zhangbojiangfeng/p/7456961.html
println(sparseMat1.numRows) // 3
println(sparseMat1.numCols) // 2
println(sparseMat1.numActives) // 3
println(sparseMat1.numNonzeros) // 3
println(sparseMat1)
// 3 x 2 CSCMatrix
// (0,0) 11.0
// (1,1) 22.0
// (2,1) 33.0
// Vector 运算, spark 2.0 缺少vector运算支持,需要先转成 breezeVector
val w1 = Vectors.dense(1, 2, 3)
val w2 = Vectors.dense(4, -5, 6)
// 将 Spark 支持的 Vector 转换为 Breeze库所支持的Vector,可以使用丰富的库API操作
val w3 = new BreezeVector(w1.toArray)
val w4 = new BreezeVector(w2.toArray)
println(w3 + w4) // DenseVector(5.0, -3.0, 9.0)
println(w3 - w4) // DenseVector(-3.0, 7.0, -3.0)
println(w3.dot(w4)) // 12.0
val sv1 = Vectors.sparse(10, Array(0, 2, 9), Array(5, 3, 13))
val sv2 = Vectors.dense(1, 0, 1, 1, 0, 0, 1, 0, 0, 13)
println(sv1)
println(sv2)
println(new BreezeVector(sv1.toArray).dot(new BreezeVector(sv2.toArray)))
// spark 支持 SparseMatrix 和 DenseMatrix 运算,不需要转成 Breeze 库中相应类型
// 创建 Matrix, 矩阵和向量相乘
val sparseMat2 = Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1, 2, 3, 4, 5, 6))
val denseFeatureVector = Vectors.dense(1, 2, 1)
val ans0 = sparseMat2.multiply(denseFeatureVector)
println(ans0) // 稀疏矩阵可以和稠密矩阵相互转换 [5.0,11.0,8.0]
val denseVec3 = Vectors.dense(5, 3, 0)
val denseMat3 = Matrices.dense(3, 3, Array(1, 0, 0, 0, 1, 0, 0, 0, 1))
println(denseMat3)
println(denseVec3)
println(denseMat3.multiply(denseVec3)) // [5.0,3.0,0.0]
// 矩阵转置
println(sparseMat2)
// 3 x 3 CSCMatrix
// (0,0) 1.0
// (2,0) 2.0
// (1,1) 3.0
// (0,2) 4.0
// (1,2) 5.0
// (2,2) 6.0
val transposeSparseMat2 = sparseMat2.transpose
println(transposeSparseMat2)
// 3 x 3 CSCMatrix
// (0,0) 1.0
// (2,0) 4.0
// (1,1) 3.0
// (2,1) 5.0
// (0,2) 2.0
// (2,2) 6.0
// 矩阵相乘
val dMat1 = new DenseMatrix(2, 2, Array(1, 3, 2, 4))
val dMat2 = new DenseMatrix(2, 2, Array(2, 1, 0, 2))
println(dMat1)
// 1.0 2.0
// 3.0 4.0
println(dMat2)
// 2.0 0.0
// 1.0 2.0
println(dMat1.multiply(dMat2))
// 4.0 4.0
// 10.0 8.0
println(dMat2.multiply(dMat1))
// 2.0 4.0
// 7.0 10.0
// RowMatrix
val dataVectors = Seq(
Vectors.dense(0, 1, 0),
Vectors.dense(3, 1, 5),
Vectors.dense(0, 7, 0)
)
val identityVectors = Seq(
Vectors.dense(1, 0, 0),
Vectors.dense(0, 1, 0),
Vectors.dense(0, 0, 1)
)
// 获取原始序列,转成 RDD
val distMat3 = new RowMatrix(spark.sparkContext.parallelize(dataVectors))
println(distMat3) // org.apache.spark.mllib.linalg.distributed.RowMatrix@352ed70d
println(distMat3.computeColumnSummaryStatistics().count) // 3
println(distMat3.computeColumnSummaryStatistics().mean) // 列的属性 均值 [1.0,3.0,1.6666666666666665]
println(distMat3.computeColumnSummaryStatistics().variance) // 列的方差 [3.0,12.0,8.333333333333334]
println(distMat3.computeCovariance()) // 协方差
// 3.0 -3.0 5.0
// -3.0 12.0 -4.999999999999999
// 5.0 -4.999999999999999 8.333333333333334
val dd = identityVectors.flatMap(x => x.toArray).toArray
dd.foreach(println)
val dmIdentity = Matrices.dense(3,3,dd)
println(dmIdentity) // 本地矩阵
// 1.0 0.0 0.0
// 0.0 1.0 0.0
// 0.0 0.0 1.0
// 分布式矩阵 * 本地矩阵 得到 一个新的分布式矩阵
// 应用:通过乘以 一个细长、竖直 或者 狭长的矩阵,实现数据量的降低和结果的维度约减
val distMat4 = distMat3.multiply(dmIdentity)
println(distMat4) // org.apache.spark.mllib.linalg.distributed.RowMatrix@205df5dc
println(distMat4.computeColumnSummaryStatistics().count) // 3
println(distMat4.computeColumnSummaryStatistics().mean)
println(distMat4.computeColumnSummaryStatistics().variance)
println(distMat4.computeCovariance())
// IndexedRowMatrix
val distIdxMat1 = spark.sparkContext.parallelize(List(IndexedRow(0L, dataVectors.head),
IndexedRow(1L, dataVectors(1)), IndexedRow(1L, dataVectors(2))))
distIdxMat1.foreach(println)
// IndexedRow(0,[0.0,1.0,0.0]) index , RDD row
// IndexedRow(1,[3.0,1.0,5.0])
// IndexedRow(1,[0.0,7.0,0.0])
println("distinct elements=", distIdxMat1.distinct().count()) // 3
// CoordinateMatrix
val CoordinateEntries = Seq(
MatrixEntry(1, 6, 300.0), // 对应坐标,x,y类型必须 long, z double
MatrixEntry(3, 1, 5),
MatrixEntry(1, 7, 10)
)
val distCordMat1 = new CoordinateMatrix(spark.sparkContext.parallelize(CoordinateEntries.toList))
println("First Row (MarixEntry) =",distCordMat1.entries.first())
// (First Row (MarixEntry) =,MatrixEntry(1,6,300.0))
// BlockMatrix
val distBlkMat1 = distCordMat1.toBlockMatrix().cache()
distBlkMat1.validate()
// Validates the block matrix info against the matrix data (blocks)
// and throws an exception if any error is found.
println("Is block empty =", distBlkMat1.blocks.isEmpty())
// (Is block empty =,false)
spark.stop()
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>examples</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>