首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >视频:JDBCRDD源码及自定义JDBCRDD的分区策略

视频:JDBCRDD源码及自定义JDBCRDD的分区策略

作者头像
Spark学习技巧
发布2018-08-01 11:36:33
7290
发布2018-08-01 11:36:33
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

jdbcRDD虽然是鸡肋,但是也值得一讲。帮助大家更进一步理解RDD。

1,JDBCRDD使用

val data = new JdbcRDD(sc, getConnection

, "SELECT id,aa FROM bbb where ? <= ID AND ID <= ?", lowerBound = 3, upperBound =5, numPartitions = 1, mapRow = extractValues)

参数解释:

1,sparkcontext。

2,一个创建链接的函数。

3,sql。必须有? <= ID AND ID <= ?。

4,要取数据的id最小行。

5,要取数据的id最大行号。

6,分区数。

7,一个将ResultSet转化为需要类型的方法。

2,JdbcRDD的getPartition方法

override def getPartitions: Array[Partition] = { // bounds are inclusive, hence the + 1 here and - 1 on end val length = BigInt(1) + upperBound - lowerBound (0 until numPartitions).map(i => { val start = lowerBound + ((i * length) / numPartitions) val end = lowerBound + (((i + 1) * length) / numPartitions) - 1 new JdbcPartition(i, start.toLong, end.toLong) }).toArray }

3,JdbcRDD的compute方法

就是一个通过jdbc获取指定范围数据的过程。

val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery()

4,重写JDBC方法

重写分区的方法即可。

如:

CustomizedJdbcRDD[T: ClassTag]( sc: SparkContext, getConnection: () => Connection, sql: String, getCustomizedPartitions: () => Array[Partition], prepareStatement: (PreparedStatement, CustomizedJdbcPartition) => PreparedStatement, mapRow: (ResultSet) => T = CustomizedJdbcRDD.resultSetToObjectArray _)

同时把getPartition方法重写为:

override def getPartitions: Array[Partition] = { getCustomizedPartitions(); }

视频内容
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档