geotrellis使用(三十六)瓦片入库更新图层

前言

Geotrellis 是针对大数据量栅格数据进行分布式空间计算的框架,这一点毋庸置疑,并且无论采取何种操作,其实都是先将大块的数据切割成一定大小的小数据(专业术语为瓦片),这是分治的思想,也是分布式计算的精髓,所以使用 Geotrellis 的第一步工作就是要将数据切片(无论是存储在内存中还是进行持久化),然而即使其能力再“大”在实际工作中也难以处理以下几种需求:

  1. 全球(大范围)高分辨率遥感影像数据,数据量在 TB 级;
  2. 局部地区数据更新;
  3. 不同时间数据融合。

这几种情况下我们都很难或者没有办法同时对这些数据进行处理,可行的方案就是执行更新操作或者分批处理。在 Geotrellis 框架中提供了数据的 ETL 接口,但是只能进行 write 操作,并不能进行 update 操作,write 操作会覆盖掉此图层中已有数据,并且相邻数据之间无法进行拼接,导致接边处数据缺失,所以分批处理只能写到不同的图层,这又给数据的调用计算等处理造成很大的麻烦。本文在原有 ETL 的基础上简单介绍如何实现同层瓦片的 update 操作。

一、原生 ETL

1.1 ETL 工作流程介绍

ETL 完成的工作是将数据切割成瓦片并进行持久化,在 Geotrellis 中你可以将数据直接放在内存中(虽然也未提供现成的解决方案,我前面的文章简单介绍了如何实现),也可以将数据放在 Accumulo、HBASE 等分布式数据库或者是 HDFS 和 普通文件系统中。实现代码在 geotrellis.spark.etl 包下的 Etl 类中,调用 ingest 方法的时候传入不同的参数即可实现数据入库的操作,此部分前面也已经介绍过,这里不再赘述。ingest 方法主要代码如下:

val etl = Etl(conf, modules)
val sourceTiles = etl.load[I, V]
val (zoom, tiled) = etl.tile(sourceTiles)
etl.save[K, V](LayerId(etl.input.name, zoom), tiled)

整个流程为首先使用 load 函数读取原始数据,再调用 tile 函数对数据进行切割,而后调用 save 函数将切割后的瓦片进行持久化。所以只要在 save 方法中判断要存放数据的图层是否存在,如果不存在执行已有操作,如果存在则执行 update 操作。

1.2 save 方法介绍

原生 save 方法如下:

def save[
K: SpatialComponent: TypeTag,
V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]]
): Unit = {
implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]

val outputPlugin =
  combinedModule
    .findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]]
    .find { _.suitableFor(output.backend.`type`.name) }
    .getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'"))

def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = {
  val currentId = id.copy(zoom = zoom)
  outputPlugin(currentId, rdd, conf, saveAction)

  scheme match {
    case Left(s) =>
      if (output.pyramid && zoom >= 1) {
        val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions)
        savePyramid(nextLevel, nextRdd)
      }
    case Right(_) =>
      if (output.pyramid)
        logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step")
  }
}

savePyramid(id.zoom, rdd)
logger.info("Done")
}

主要逻辑在 savePyramid 函数中(scala 支持内部函数),其中 outputPlugin(currentId, rdd, conf, saveAction) 是将瓦片持久化的关键操作,val outputPlugin = ... 是取到持久化的种类,这里无需过多考虑,只要考虑成是 Accumulo 或者其他种类即可,所以 outputPlugin(currentId, rdd, conf, saveAction) 调用了 OutputPlugin 类型的 apply 方法,如下:

def apply(
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
conf: EtlConf,
saveAction: SaveAction[K, V, M] = SaveAction.DEFAULT[K, V, M]
): Unit = {
    implicit val sc = rdd.sparkContext
    saveAction(attributes(conf), writer(conf), id, rdd)
}

saveAction 默认取了 SaveAction.DEFAULT[K, V, M],这是定义在 ETL 类中的一个方法,是的,此处传入了一个方法, saveAction(attributes(conf), writer(conf), id, rdd) 实际执行了下述方法:

def DEFAULT[K, V, M] = {
  (_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) =>
    writer.write(layerId, rdd)
}

可以看到最后调用的是 writer.write(layerId, rdd),此处 writer 根据持久化对象不同而不同,在 Accumulo 中为 AccumuloLayerWriter。

到此我们便清楚了 save 方法的工作流程以及整个 ETL 操作的工作流程,下面开始对其进行改造。

二、改造 ETL

本文仅针对瓦片数据持久化放到 Accumulo 数据库中进行介绍,并未如原代码一样对所有情况进行自动适配,其他持久化方式只需判断和修改对应的 LayerWriter 实例即可。

2.1 改造 save 方法

首先判断持久化对象中是否已存在此图层,代码如下:

val currentId: LayerId = ...
val instance = conf.outputProfile.get.asInstanceOf[AccumuloProfile].getInstance.get
val attributeStore = AccumuloAttributeStore(instance)
val exist = attributeStore.layerExists(currentId)

首先取到持久化的实例,本文直接指定为 Accumulo 类型,而后获取 AccumuloAttributeStore 对象,此对象相当于是元数据,其中存储图层的范围层级等信息,最后通过 layerExists 方法即可得到图层是否存在。

如果图层不存在则直接调用原生的 outputPlugin(currentId, rdd, conf) 即可,如果图层已经存在则执行下述操作:

AccumuloLayerWriter(instance = instance, conf.output.backend.path.toString, AccumuloLayerWriterOptions(SocketWriteStrategy()))
          .update(currentId, rdd, (v1: V, v2: V) => v1.merge(v2))

此处需要特别指出的是 AccumuloLayerWriterOptions(SocketWriteStrategy()),此句指明了 Accumulo 的操作策略,按照官方说法,使用 SocketWriteStrategy 会导致操作变慢,切不能针对大量数据的导入操作,使用 HdfsWriteStrategy 支持 Accumulo 大批量导入操作(个人猜测是 Accumulo 数据存放在 HDFS 中,首先把数据写入 HDFS 然后再并行持久化到 Accumulo,所以可以进行大量数据操作)。虽然看上去 HdfsWriteStrategy 非常完美,但是问题在于使用此策略无法执行 update 操作,会报错。鱼和熊掌不能兼得,需要根据实际情况进行选择和设计。

这样就可实现图层中瓦片的更新操作。

2.2 Key Index

当然写到这并没有完成工作,如果仅在 save 函数中完成上述改造,再真正的 update 的时候会报错,提示 key index 超出定义的范围,需要重新定义。还记得上面说的 attributeStore 吧,通过此方法可以取到元数据信息,此处的 key index 也写在元数据中,key index 说白了就是瓦片编号的范围,我们都知道瓦片是根据编号进行请求的,那么一块数据就会有一个编号范围,所以图层不存在的时候执行的是 write 方法,写入的是当时数据瓦片编号范围,但是真正执行 update 的时候一般肯定是跟第一次数据范围不同的,于是提示你需要更新编号的范围。这个问题很容易解决,我们只需要在第一次写入的时候将数据范围设置成全球即可。

在 tile 方法的 resizingTileRDD 方法定义如下:

def resizingTileRDD(
  rdd: RDD[(I, V)],
  floatMD: TileLayerMetadata[K],
  targetLayout: LayoutDefinition
): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = {
  // rekey metadata to targetLayout
  val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent))
  val tiledMD = floatMD.copy(
    bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds),
    layout = targetLayout
  )

  // > 1 means we're upsampling during tiling process
  val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution
  val tilerOptions = Tiler.Options(
    resampleMethod = method,
    partitioner = new HashPartitioner(
      partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt))

  val tiledRDD = rdd.tileToLayout[K](tiledMD, tilerOptions)
  ContextRDD(tiledRDD, tiledMD)
}

val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent)) 是获取到当前数据在此 zoom 下的瓦片编号范围,那么我们只需要将此处改成整个范围即可,如下:

val newSpatialBounds = KeyBounds(
    SpatialKey(0, 0),
    SpatialKey(
      col = targetLayout.layoutCols,
      row = targetLayout.layoutRows
    ))

这样即可实现正常的 update 操作。

三、总结

阅读此文需要对 Geotrellis 框架有整体了解并熟悉其基本使用,可以参考本系列博客,使用 geotrellis 也需要对 scala 有所掌握,scala 语法在我接触过的所有语言中应当是比较灵活的,灵活就导致麻烦。。。。

本文简单介绍了如何实现 ETL 过程的 update 操作。这是我失业后写的第一篇博客,失业后整个人对所有事情的理解更上了一步,无论是对技术还是生活都有更多的感悟,生活和技术都需要慢慢品味。

Geotrellis系列文章链接地址http://www.cnblogs.com/shoufengwei/p/5619419.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java学习

Java基础第一天学习笔记

01.01_计算机基础知识(计算机概述)(了解) * A:什么是计算机?计算机在生活中的应用举例 * 计算机(Computer)全称:电子计算机,俗称电脑。是...

38150
来自专栏大闲人柴毛毛

使用Eclipse插件提高Java编码质量

代码质量概述 ? 怎样辨别一个项目代码写得好还是坏?优秀的代码和腐化的代码区别在哪里?怎么让自己写的代码既漂亮又有生命力?接下来将对代码质量的问题进行...

39070
来自专栏Jimoer

JVM学习记录-线程安全与锁优化(二)

高效并发是程序员们写代码时一直所追求的,HotSpot虚拟机开发团队也为此付出了很多努力,为了在线程之间更高效地共享数据,以及解决竞争问题,HotSpot开发团...

11120
来自专栏编程微刊

2018年各大互联网前端面试题四(美团)

18820
来自专栏张善友的专栏

无特性的 MEF 配置方法

Managed Extensibility Framework (MEF) 旨在为 Microsoft .NET Framework 开发人员提供一种简便的方法...

22350
来自专栏Python爱好者

Java基础笔记01

33760
来自专栏王清培的专栏

WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)——所有webapi似乎都缺失的一个功能

最近的工作我在做一个有关于消息发送和接受封装工作。大概流程是这样的,消息中间件是采用rabbitmq,为了保证消息的绝对无丢失,我们需要在发送和接受前对消息进行...

26490
来自专栏NetCore

Fluent Nhibernate之旅(五)--利用AutoMapping进行简单开发

Fluent Nhibernate(以下简称FN)发展到如今,已经相当成熟了,在Nhibernate的书中也相应的推荐了使用FN来进行映射配置,之前写的FN之旅...

26560
来自专栏猿人谷

PHP程序员应该掌握的10个技能

php程序员应该掌握的10个技能,看看你都掌握了哪些:   1、语法:必须熟练掌握 ,写代码的时候IDE的编辑器对某一行报错应该能够根据报错信息知道是什么样的语...

25870
来自专栏java学习

学习java需要会哪些知识才能够去应聘工作?

Java基础 | 数据库 | Android | 学习视频 | 学习资料下载 按照我去培训机构的学习经历,给初学还有自学Java 的同学一个基本的学习脉络,希望...

39460

扫码关注云+社区

领取腾讯云代金券