今天知识星球球友,微信问浪尖了一个spark源码阅读中的类型限定问题。这个在spark源码很多处出现,所以今天浪尖就整理一下scala类型限定的内容。希望对大家有帮助。 scala类型参数要点 1....泛型与约束实战 1 泛型函数 ClassTag[T]保存了泛型擦除后的原始类型T,提供给被运行时的。...} } 2 类型变量界定 泛型参数类型限定,限定具体类的可以调用特定的方法。...def main(args: Array[String]): Unit = { /* * 定义方法array * Manifest:需要运行时存储T的实际类型,运行时是做为参数运行在方法的上下文中...关键字 ClassTag[T]保存了泛型擦除后的原始类型T,提供给被运行时的。
scala runtime-reflection有以下几项主要功能: 1、动态检验对象类型,包括泛类型 2、实时构建类型实例 3、实时调用类型的运算方法 反射功能可以在两种环境下体现:compile-time...我们可能经常碰到TypeTag的调用例子,还有WeakTypeTag和ClassTag。...这个extract函数的目的是把T类型的值过滤出来。上面的例子里list里的String元素被筛选出来了。但是如果我们像下面这样使用extract呢?...那么如果我们为extract函数提供一个TypeTag又如何呢?...在上面的例子里调用meth函数时我们提供了一个实质类型如:List[Int],List[String],List[List[Int]]等。
注意:对于顶点的属性是使用提供的默认属性。...返回当前图的顶点的数量 3、graph.inDegrees 返回当前图每个顶点入度的数量,返回类型为 VertexRDD[Int] 4、graph.outDegrees 返回当前图每个顶点出度的数量...,返回的类型为 VertexRDD[Int] 5、graph.degrees 返回当前图每个顶点入度和出度的和,返回的类型为 VertexRDD[Int] ========== Spark...,通过传入 epred 函数来过滤边,通过传入 vpred 函数来过滤顶点,返回满足 epred 函数值为 true 的边和满足 vpred 函数值为 true 顶点组成子图。...mergeMsg 是每一个顶点都会在接受到所有消息之后调用,主要用于所有接收到的消息的聚合。然后整个函数返回消息的顶点集合 VertexRDD[A]。
, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。...以c++为例,应先用srand()设置不同种子,否则每次调用rand()得到的值是一样的。...union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。...函数定义: persist(newLevel:StorageLevel) StorageLevel 是枚举类型,代表存储模式。...3 collect: collect相当于toArray,不过已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数据,在这个数组上运用 scala 的函数式操作。
并行化集合 由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。...{SparkConf, SparkContext} /** * Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD * - 将Scala集合转换为RDD * sc.parallelize...实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。 范例演示:从文件系统读取数据,设置分区数目为2,代码如下。...println) // 应用程序运行结束,关闭资源 sc.stop() } } 其中文件路径:可以指定文件名称,可以指定文件目录,可以使用通配符指定...小文件读取 在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用
它的效率比普通的JavaSerializer更高,但是会有一定的限制,比如原生支持的类型比较少,如果必须使用自定义的类型,需要提前注册。...因为泛型类型在编译期会被擦除(即type erasure),故ClassTag在Scala中用来在运行期指定无法识别的泛型类型。...canUseKryo()方法判断要序列化的对象类型是否落在8种Scala基本类型与String类型中。...如果存储块的ID对应的数据类型支持压缩,调用wrapForCompression()方法可以将流数据用指定的编解码器压缩。判断是否可压缩的shouldCompress()方法代码如下。...并且当存储块ID的类型为StreamBlockId(Spark Streaming中用到的块ID)时,SerializerManager就不会自动判别该使用哪种序列化器,而是完全采用用户指定的类型。
T 可以是 Scala 里面的基本类型或数据结构,不限于 (K, V)。但如果是 (K, V),K 不能是 Array 等复杂类型(因为难以在复杂类型上定义 partition 函数)。...数据计算过程 下面的代码段,展现了RDD.flatmap()和MapPartitionsRDD的实现,在代码中,我们看到,当调用RDD的map并传入一个函数f的时候,Spark 并没有做什么运算,而是用...而在MapPartitionsRDD.scala中,我们也看到只有当compute方法被调用的时候,我们之前传入的函数f才会真正的被执行 // RDD.scala ... /** * Return...以之前flatmap操作生成得到的MapPartitionsRDD类为例。...数据不在存储介质当中,可能是数据已经丢失,或者 RDD 经过持久化操作,但是是当前分区数据是第一次被计算,因此会出现拉取得到数据为 None 的情况。
这个支持key-valued类型的流数据 ,支持的操作算子,如,groupByKeyAndWindow,join。...这些操作,在有key-value类型的流上是自动识别的。 对于dstream -> PairDStreamFunctions自动转换的过程大家肯定想到的是scala的隐式转换。...后面我们主要是关注该函数封装及调用。 其实,看过浪尖的Spark Streaming的视频的朋友或者度过浪尖关于Spark Streaming相关源码讲解的朋友应该有所了解的是。...这个生成RDD的函数应该是在 DStream的compute方法中在生成RDD的时候调用。假设你不了解也不要紧。 我们跟着代码轨迹前进,验证我们的想法。...然后调用,前面步骤封装的函数进行join。 val transformedRDD = transformFunc(parentRDDs, validTime) 以上就是join的全部过程。
RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。...它的函数定义为: def mapPartitionsU: ClassTag: RDD[U] f即为输入函数,它处理每个分区里面的内容。...: (T, A) => U): RDD[U] 第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A; 第二个函数f是把二元组(T, A)...作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。...与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD
,所以使用 Geotrellis 的第一步工作就是要将数据切片(无论是存储在内存中还是进行持久化),然而即使其能力再“大”在实际工作中也难以处理以下几种需求: 全球(大范围)高分辨率遥感影像数据,数据量在...load 函数读取原始数据,再调用 tile 函数对数据进行切割,而后调用 save 函数将切割后的瓦片进行持久化。...(scala 支持内部函数),其中 outputPlugin(currentId, rdd, conf, saveAction) 是将瓦片持久化的关键操作,val outputPlugin = ......,本文直接指定为 Accumulo 类型,而后获取 AccumuloAttributeStore 对象,此对象相当于是元数据,其中存储图层的范围层级等信息,最后通过 layerExists 方法即可得到图层是否存在...三、总结 阅读此文需要对 Geotrellis 框架有整体了解并熟悉其基本使用,可以参考本系列博客,使用 geotrellis 也需要对 scala 有所掌握,scala 语法在我接触过的所有语言中应当是比较灵活的
,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 (Int, Interator[T]) => Iterator[U]。...通过查看源码发现 cache 最终也是调用了 persist 方法,默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在 object StorageLevel 中定义的。...一般如果从一个普通的 RDD 转 为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对。...foldByKey() 则与 fold() 相当类似,它们都使用一个与 RDD 和合并函数中的数据类型相同的零值作为初始值。...与 fold() 一样,foldByKey() 操作所使用的合并函数对零值与另一个元素进行合并,结果仍为该元素。
• 图的分布式存储采用点分割模式,而且使用 partitionBy 方法,由用户指定不同的划分策略(PartitionStrategy)。...2.1.2 GraphX 存储模式 Graphx 借鉴 PowerGraph,使用的是 Vertex-Cut(点分割)方式存储图,用三个 RDD 存储图数据信息: VertexTable(id,...,最终都是使用 GraphImpl 来构建的,即调用了 GraphImpl 的 apply 方法。...这些特征是通过在连接顶点的结果上使用用户定义的 map 函数获得的。没有匹配的顶点保留其原始值。下面详细地来分析这两个函数。...• (2)对等合并 attr, 聚合函数使用传入的 mergeMs g函数。
2.3 编程模型 在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)调用转换。 定义RDD之后,程序员就可以在动作(注:即action操作)中使用RDD了。...Spark编程接口 Spark用Scala[5]语言实现了RDD的API。Scala是一种基于JVM的静态类型、函数式、面向对象的语言。...我们选择Scala是因为它简洁(特别适合交互式使用)、有效(因为是静态类型)。但是,RDD抽象并不局限于函数式语言,也可以使用其他语言来实现RDD,比如像Hadoop[2]那样用类表示用户函数。...不过,我们举的例子几乎都省略了这个类型参数,因为Scala支持类型推断。 虽然在概念上使用Scala实现RDD很简单,但还是要处理一些Scala闭包对象的反射问题。...另外,函数名与Scala及其他函数式语言中的API匹配,例如map是一对一的映射,而flatMap是将每个输入映射为一个或多个输出(与MapReduce中的map类似)。
: ClassTag[T] } 其中,size表示该MemoryEntry代表的块大小,memoryMode表示块存储在堆内内存还是堆外内存,classTag则是该块所存储的对象的类型标记。...只能用堆内内存存储,其数据是T类型的对象的数组。...如果内存管理器为StaticMemoryManager,该值为定值;如果内存管理器为UnifiedMemoryManager,该值会浮动。 memoryUsed:已经使用了的堆内与堆外存储内存之和。...:首先调用MemoryManager.acquireStorageMemory()方法申请所需的内存,然后调用参数中传入的偏函数_bytes,获取已经转化为ChunkedByteBuffer的数据。...注意这个方法返回值的类型是Either类型,它在Scala中表示不相交的两个结果的集合,即可能返回错误的结果(Left),或者正确的结果(Right)。
First Class Functions 函数式编程的核心就是函数应当是首类的。首类表示函数不仅能得到声明和调用,还可以作为一个数据类型用在这个语言的任何地方。...首类函数与其他数据类型一样,可以采用字面量创建;或者存储在值、变量、或数据结构等容器中;还可以作为一个函数的参数或返回值。...这个myDouble是一个函数值了,可以调用刚刚的函数。 与定义value的格式是一样的:val myDouble: 参数类型 = double。这里的参数类型就是函数类型。...用通配符为函数赋值 通配符下划线相当于占位符,表示将来的一个函数调用。要么使用显式的类型,要么使用通配符_定义函数值以及用函数赋值。...,函数字面量为(x:Int) => x*2,定义了一个有类型的输入和函数体。
前面我们讨论过FSM,一种专门为维护内部状态而设计的Actor,它的特点是一套特殊的DSL能很方便地进行状态转换。...为了实现FSM的可用性,就必须为FSM再增加自我修复能力,PersistentFSM是FSM和PersistentActor的合并,是在状态机器模式的基础上再增加了状态转变事件的持久化,从而实现内部状态的自我修复功能的...与FSM比较:PersistentFSM除增加了event参数外,State类型是以FSMState类型为基础的,方便对State进行序列化(serialization): /** * FSM...这是一个ClassTag[E]实例,用来解决泛型E的模式匹配问题(由scala语言类型擦拭type-erasure造成): /** * Enables to pass a ClassTag of...applyEvent函数是如下调用的: override def receiveRecover: Receive = { case domainEventTag(event) ⇒ startWith
一、函数指针做函数参数 1、使用函数指针间接调用函数 在上一篇博客 【C++】函数指针 ③ ( 函数指针语法 | 函数名直接调用函数 | 定义函数指针变量 | 使用 typedef 定义函数类型 | 使用..., 调用的函数可以动态指定 ; 2、函数指针做参数 定义了 如下 函数指针类型 pFun_add , 其类型为 int (*)(int, int) , 该指针指向一个 类型为 int (int, int..., int); 定义函数 接收 pFun_add 类型的形参作为参数 , 该类型是 函数指针类型 , 也就是 函数接收一个 函数指针类型参数 , 在该函数中调用 函数指针 指向的 函数 ; // 传入函数指针...C 语言 中模拟面向对象用法 ; 可以将特定的 函数指针类型 定义为 结构体 的一部分 , 并使用该 结构体 来传递具有特定行为的对象的地址 ; 该操作有助于更好地组织代码 , 使代码更易于理解和维护...; 错误处理 : 使用函数指针 , 将错误处理函数作为参数传递给其他函数 , 在发生错误时立即调用适当的错误处理函数 , 无需返回到调用堆栈中的较高层次 ; 二、代码示例 - 函数指针做函数参数 代码示例
转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/5443789.html 最近由于使用sparkstreaming的同学需要对接到部门内部的的...kafka集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为...调用的都是KafkaUtils内重载实现的createStream方法。...createStream调用了第一个createStream,而第一个createStream最后调用的是第二个createStream,所以所有的rdd数据流都是从下面这句代码产生的: new KafkaInputDStream...sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假) 这个receiver会把收到的kafka数据首先存储到日志上,然后才会向
Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型,在2.0.0之前的版本中,通过继承AccumulatorParam来实现,而2.0.0之后的版本需要继承AccumulatorV2来实现自定义类型的累加器...如下图: [image.png] 在2.0.0之前版本中,累加器的声明使用方式如下: scala> val accum = sc.accumulator(0, "My Accumulator")...10 累加器的声明在2.0.0发生了变化,到2.1.0也有所变化,具体可以参考官方文档,我们这里以2.1.0为例将代码贴一下: scala> val accum = sc.longAccumulator...res0: Array[Int] = Array(1, 2, 3) 从上文我们可以看出广播变量的声明很简单,调用broadcast就能搞定,并且scala中一切可序列化的对象都是可以进行广播的,这就给了我们很大的想象空间...上文是从spark官方文档摘抄出来的,我们可以看出,正常来说每个节点的数据是不需要我们操心的,spark会自动按照LRU规则将老数据删除,如果需要手动删除可以调用unpersist函数。
map,实际上分发到Worker节点后,执行的任然是scala的map函数。...,可以看到传入的函数在compute函数中调用,从上可以看出,TaskContext传入的是节点的上下文,Int是分区id, Iterator[T]传入的是父节点调用迭代器。...map实质是一样的,只是在调用函数时,直接调用函数,返回一个迭代器。...,然后创建MapPartitionsRDD, 在传入函数时调用scala的filter函数。...distinct是由其他基础的算子组合实现的,其原理是使用map将其转换为(key,null),调用reduceBykey进行聚合去重,最后再使用map转换为key。
领取专属 10元无门槛券
手把手带您无忧上云