1、在 scala 中,方法和函数几乎可以等同(比如他们的定义、使用、运行机制都一样的),只是函数的使用方式更加的灵活多样。 ...4、在 scala 中函数式编程和面向对象编程融合在一起了。...6.4.3 Scala 构造器的介绍+基本语法+快速入门 Scala 构造器的介绍 和 Java 一样,Scala 构造对象也需要调用构造方法,并且可以有任意多个构造方法(即 scala 中构造器也支持重载...".foreach((_) => {res *= _.toLong}) "Hello".foreach(res1 *= _.toLong) println("res1=" + res1)...Java 与 Scala 在函数层面上的不同体现: // 在 Java 中 函数(接收参数) // 在 Scala 中 集合.函数(函数) 如下图所示: ?
1.将kafka streaming 和 redis整合 实现词频统计 Producer.class 生成数据daokafka package day14; /** * 创建一个生产者 生成随机的...key 和 字母 * 用于实现实时流统计词频 并 存储到redis */ import org.apache.kafka.clients.producer.KafkaProducer; import...val redis = JPools.getJedis rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})...topicPL._1.split("[-]") fromDbOffset += (new TopicPartition(split(0), split(1).toInt) -> topicPL._2.toLong...val redis = JPools.getJedis rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})
(已解决,最后可以当成事实) 暂时理解成:来自网络 1、i686和x86_64有什么不同?...2、linux系统中的i386/i686和x86_64有什么区别 回答 i386对应的是32位系统、而i686是i386的一个子集,i686仅对应P6及以上级别的CPU,i386则广泛适用于80386以上的各种...CPU;x86_64主要是64位系统。...i686 在 pentun II 以后的 Intel 系列 CPU ,及 K7 以后等级的 CPU 都属于这个 686 等级! 而x86_64就是64位的x(代表不确定。...这个编译链带的i686或者x86_64和Linux开发板没关系,和宿主的Ubuntu是64还是32有关系。
demo2:使用Scala 在客户端造数据,测试Spark Sql: ?...Spark SQL 映射实体类的方式读取HDFS方式和字段,注意在Scala的Objcet最上面有个case 类定义,一定要放在 这里,不然会出问题: ?...val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar" /**Spark SQL映射的到实体类的方式**/ def mapSQL2()...: Unit ={ //使用一个类,参数都是可选类型,如果没有值,就默认为NULL //SparkConf指定master和任务名 val conf = new SparkConf...split("\1")) .map( p => ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong
(注意: 这里我们只关注点击次数, 不关心下单和支付次数) 这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。 ...).toLong, fields(7).toLong, fields(8), fields(9), fields(10),...{Partitioner, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.mutable /** **...排序,scala排序必须把所有数据全部加载到内存才能排。...(println) } /* 使用scala的排序,会导致内存溢出 问题解决方案: 方案2: 1.
项目需求: ip.txt:包含ip起始地址,ip结束地址,ip所属省份 access.txt:包含ip地址和各种访问数据 需求:两表联合查询每个省份的ip数量 SparkCore 使用广播,将小表广播到...=> { val splited = x.split("[|]") val startNum = splited(2).toLong val endNum = splited...ipDriver: Array[(Long, Long, String)] = ipRules.collect() //4.将IP通过广播的方式发送到executor //广播之后,在Driver...(println) //9.将数据存储到mysql中 /** * reduceRDD.foreach(x => { * * val conn = DriverManager.getConnection...iter.foreach(x =>{ ps.setString(1,x._1) ps.setInt(2,x._2) ps.executeUpdate() })
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" import scala.util.matching.Regex...val matcher = options.get // 构建返回值 ApacheAccessLog( matcher.group(1), // 获取匹配字符串中第一个小括号中的值...matcher.group(6), matcher.group(7), matcher.group(8).toInt, matcher.group(9).toLong...var ipNum =0L for(i<- 0 until fragments.length){ ipNum = fragments(i).toLong | ipNum foreach(x =>{ ps.setString(1,x._1) ps.setInt(2,x._2) ps.executeUpdate() })
如果搜索关键字是 null, 表示这次不是搜索 如果点击的品类 id 和产品 id 是 -1 表示这次不是点击 下单行为来说一次可以下单多个产品, 所以品类 id 和产品 id 都是多个, id 之间使用逗号...遍历全部日志数据, 根据品类 id 和操作类型分别累加....需要用到累加器 定义累加器 当碰到订单和支付业务的时候注意拆分字段才能得到品类 id 遍历完成之后就得到每个每个品类 id 和操作类型的数量. 按照点击下单支付的顺序来排序 取出 Top10 ?...// 如果是可变map,则所有的变化都是在原集合中发生变化,最后的值可以不用再一次添加 // 如果是不可变map,则计算的结果,必须重新赋值给原来的map变量...).toLong, fields(7).toLong, fields(8), fields(9), fields(10),
0.5KB 左右,下单量数据大概在 15GB 左右....2.1 Flume监控文件夹收集数据传给kafka 实时监控文件夹,有新的文件产生的时候,就会传给kafka。这里kafka的Topic,会自动创建。...因为业务的处理是在Executor处理的,而Offset的存储是在Driver端存储的。所以,处理完业务之后,首先将业务数据保存在一个临时的数据库中。...在Driver端保存offset的时候,从临时数据库中读取业务处理数据,和保存offset做一个事务。同时保存到数据库。 三、代码实现 3.1 收集,分析,处理,保存数据。...", tp._2(1).toLong) redis.hincrBy("A-" + tp._1, "money", tp._2(2).toLong) redis.hincrBy
遍历全部日志数据, 根据品类 id 和操作类型分别累加....> scala-maven-plugin 3.4.6中 // 合并map /*other match { case o: CategoryAcc => o.map.foreach {...).toLong, fields(7).toLong, fields(8), fields(9), fields(10),...) //关闭项目(sc) sc.stop() } } 2.计算Top10 热门品类的具体代码(在APP中创建) import acc.CategoryAcc import bean
akka在alpakka工具包里提供了对cassandra数据库的streaming功能。...可以直接接通Flow[Row,Row,NotUsed]和Sink来使用。...val sts = jdbcSource.take(100).via(actionFlow).to(sink).run() 下面的例子里我们用CassandraStream的流元素更新h2数据库中的数据...0x0003 val QUORUM : CONSISTENCY_LEVEL = 0x0004 val ALL...0x0007 val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A val LOCAL_SERIAL
对应方案内容 通常我们的数据内容,涵盖很多的唯一ID字段,如:用户id, 电子邮件,以及在移动APP中经常有使用的唯一设备信息内容(imei, imsi,mac, Device ID, etc..., id) }) vertices.foreach(ele => println(ele._1 + " : " + ele._2)) val edges: RDD[Edge[String...=> tp._2 > 2) .map(x => x._1) //用 点集合 和 边集合 构造一张图 使用Graph算法 val graph = Graph(vertices..., id) }) vertices.foreach(ele => println(ele._1 + " : " + ele._2)) val edges: RDD[Edge[...<5(经验阈值)的边去掉, .filter(tp => tp._2 > 2) .map(x => x._1) //从初次的guid读取 val firstIdmap
运算符本质 在Scala中其实是没有运算符的,所有运算符都是方法。...函数和方法的区别 方法定义在类中可以实现重载,函数不可以重载。 方法是保存在方法区,函数是保存在堆中。 定义在方法中的方法可以称之为函数,不可以重载。 方法可以转成函数, 转换语法: 方法名 _。...中属性和方法的默认访问权限为public,但Scala中无public关键字。...private为私有权限,只在类的内部和伴生对象中可用。 protected为受保护权限,Scala中受保护权限比Java中更严格,同类、子类可以访问,同包无法访问。...我们发现经典的数据结构比如Queue和Stack被归属到LinearSeq(线性序列)。 大家注意Scala中的Map体系有一个SortedMap,说明Scala的Map可以支持排序。
ssc.checkpoint("hdfs://cdh1.macro.com:8020/user/catelf/sparkstreaming/checkpoint") //查询mysql中是否有偏移量...下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度,将正确率和掌握度存入mysql中,用户点击交卷后刷新页面能立马看到自己做题的详情。...{DbConfig, DbSearcher} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer..., Array(page2ToPage3Rate, 3)) } } 实时统计学员播放视频各时长 用户在线播放视频进行学习课程,后台记录视频播放开始区间和结束区间,及播放开始时间和播放结束时间,后台手机数据传输...{Seconds, StreamingContext} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer
Spark Mllib的矩阵有多种形式,分布式和非分布式,非分布式在这里浪尖就不讲了,很简单,因为他是基于数组的。而分布式存储是基于RDD的,那么问题就又变成了如何将一个RDD进行转置。...2,针对RDD的每一行,转化为(value, colIndex),并整理的到(colIndex.toLong, (rowIndex, value)) 3,进行flatmap 4,步骤3完成后,我们只需要按照...5,完成步骤4后,我们就可以按照每一行的(rowIndex, value),使用下标和其值构建新的行,保证每一行转换后的顺序。 到此转换完成。...=> x) // (newRowIndex, (newColIndex, value)) .groupByKey .sortByKey().map(_._2) // 对row进行排序,...Double)]): Vector = { val resArr = new Array[Double](rowWithIndexes.size) rowWithIndexes.foreach
flatMap=map + flatten 例1: scala> val test=List("hello java","hello python","hello hadoop") test: List...[String] = List(hello java, hello python, hello hadoop) map输出结果: scala> test.map(line=>line).foreach(...x=>println(x)) hello java hello python hello hadoop flatMap输出结果1: scala> test.flatMap(line=>line.split...(" ")).foreach(x=>println(x)) hello java hello python hello hadoop flatMap输出结果2: scala> text.flatMap(...line=>line).foreach(x=>println(x)) h e l l o j a v a h ……
offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JMX ConsumerOffsetChecker 在0.8.2.2.../kafka/tools/ConsumerOffsetChecker.scala object ConsumerOffsetChecker extends Logging { private val...zkConnect, 30000, 30000, ZKStringSerializer) val topicList = topics match { case Some(x)...ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong.../kafka/admin/ConsumerGroupCommand.scala object ConsumerGroupCommand extends Logging { //...
基本概念 Lambda 表达式是 Java 在 JDK 8 中引入的一种新的语法元素和操作符(操作符为“->”,也称Lambda操作符或箭头操作符)它将 Lambda 分为两个部分: 左侧:小括号内 指定了...; 有返回值且只返回为单行代码可以不写return和{}:(x,y) -> Integer.compare(x,y); //使用Lambda表达式遍历HashMap中的键值对 HashMap.forEach... com = (x,y) -> Integer.compare(x,y); 方法引用 方法引用是函数式接口的一个实例,通过方法的名字来指向一个方法。...语法如下: 类或对象名::方法名 包括如下3种情况: 对象::非静态方法 类::静态方法 类::非静态方法 针对第一第二种情况要求:接口中抽象方法的形参列表 和 返回值类型 与 方法引用中的形参列表 和...map ToLong(ToLongFunction f) 接收一个函数作为参数,该函数会被应用到每个元素上,产生一个新的 LongStream。
= sc.parallelize(arr,2) /* * 将RDD中的数据写入到数据库中,绝大部分使用mapPartitions算子来实现 */ rdd.mapPartitions(x => {...以下scala程序可以说明map函数、flatMap函数和flatten函数的区别和联系: scala> val arr = Array("hello hadoop","hello hive","hello...spark") arr: Array[String] = Array(hello hadoop, hello hive, hello spark) scala> val map = arr.map(..., spark)) scala> map.flatten res1: Array[String] = Array(hello, hadoop, hello, hive, hello, spark)...scala> arr.flatMap(_.split(" ")) res2: Array[String] = Array(hello, hadoop, hello, hive, hello, spark
当然,JDBC-Engine的功能是基于ScalikeJDBC的,所有的操作和属性都包嵌在SQL这个类型中: /** * SQL abstraction....Update功能置于下面这几个子类中: /** * SQL which execute java.sql.Statement#executeUpdate()....这个示范在一个完整的Transaction里包括了两条DDL语句。...这些主键一般在构建表时注明,包括:serial, auto_increment等。如果不返回主键则返回update语句的更新状态如更新数据条数等。...} } 这个update函数又被细分为单条语句singleTxUpdate和多条语句multiTxUpdates。
领取专属 10元无门槛券
手把手带您无忧上云