我有一个用例,我们是流事件,对于每个事件,我必须做一些查找。查找是在Redis中,我想知道创建连接的最佳方法是什么。火花流将运行40个执行程序,我有5个这样的流作业,全部连接到同一个Redis集群。因此,我不知道应该采取什么方法来创建Redis连接。
val update = xyz.transform(rdd => { // on driver if (xyz.isNewDay) { ..... } rdd }) update.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(Key_trans => { // perform some lookups logic here } } })
因此,如果我在每个分区内创建一个连接,就意味着对于每个RDD和RDD中的每个分区,我将创建一个新的连接。
有没有办法为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接了吗?
如果需要,我可以添加更多的上下文/信息。
发布于 2019-03-15 14:09:16
1.在驱动程序上创建一个连接对象,并将其广播给执行器(不确定它是否真的工作,因为我必须使这个对象可序列化)。我能用广播变量做这件事吗?
回答-不。由于与连接相关的机器相关数据,大多数连接对象是不可序列化的。
2.是否可以为每个分区维护一个连接并缓存该对象,这样我就不必一次又一次地创建连接了吗?
Ans-是的,创建一个连接池并在分区中使用。这是风格。您可以创建一个类似于以下https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala的连接池
然后再用它
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
请检查这个:使用foreachRDD的设计模式
https://stackoverflow.com/questions/55190315
复制