前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkSQL使用UDF函数代替MySQL空间函数读取MySQL空间字段

SparkSQL使用UDF函数代替MySQL空间函数读取MySQL空间字段

原创
作者头像
静谧星空TEL
修改2021-06-25 10:26:50
2.2K0
修改2021-06-25 10:26:50
举报

一、问题描述

SparkSQL虽然可以访问MySQL数据,但是对于MySQL的空间字段,SparkSQL并没有提供内置函数去解析

二、问题分析

SparkSQL没有内置函数解析空间类型,需要手动编写UDF函数实现

SparkSQL网络传输的数据格式是Byte数组,返回的数据格式中没有Geometry类型,需要将Geometry类型转成String类型返回

三、代码实现

1、自定义UDF函数

代码语言:javascript
复制
      @throws[Exception]
      def sparkUDFSTAsText(geometryAsBytes: Array[Byte]): Geometry = {
        var dbGeometry: Geometry = null
        if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes")
        //first four bytes of the geometry are the SRID,
        //followed by the actual WKB.  Determine the SRID
        //here
        val sridBytes = new Array[Byte](4)
        System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4)
        val bigEndian: Boolean = geometryAsBytes(4) == 0x00
        var srid = 0
        if (bigEndian) for (i <- 0 until sridBytes.length) {
          srid = (srid << 8) + (sridBytes(i) & 0xff)
        }
        else for (i <- 0 until sridBytes.length) {
          srid += (sridBytes(i) & 0xff) << (8 * i)
        }
        //use the JTS WKBReader for WKB parsing
        val wkbReader = new WKBReade
        //copy the byte array, removing the first fou
        //SRID bytes
        val wkb = new Array[Byte](geometryAsBytes.length - 4)
        System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length)
        dbGeometry = wkbReader.read(wkb)
        dbGeometry.setSRID(srid)
        dbGeometry
      }

java版

代码语言:javascript
复制
    public Geometry sparkUDFSTAsText(byte[] geometryAsBytes) throws Exception {
        Geometry dbGeometry = null;
        if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes");
        byte[] sridBytes = new byte[4];
        System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4);
        boolean bigEndian = geometryAsBytes[4] == 0x00;
        int srid = 0;
        if(bigEndian) {
            for(int i=0; i<sridBytes.length; i++) {
                srid = (srid << 8) + (sridBytes[i] & 0xff);
            }
        } else {
            for(int i=0; i<sridBytes.length; i++) {
                srid += (sridBytes[i] & 0xff) << (8 * i);
            }
        }
        WKBReader wkbReader = new WKBReader();
        byte[] wkb = new byte[geometryAsBytes.length - 4];
        System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length);
        dbGeometry = wkbReader.read(wkb);
        dbGeometry.setSRID(srid);
        return dbGeometry;
    }

2、SparkSQL调用UDF函数

代码语言:javascript
复制
        def toGeometryText(binary: Array[Byte]) = sparkUDFSTAsText(binary).toText
        spark.udf.register("ST_ASTEXT",toGeometryText(_))
        val rddROW: RDD[Row] = spark.sql("SELECT id, ST_ASTEXT(point), ST_ASTEXT(polygon) FROM t_point_polygon").limit(10).rdd

四、知识拓展

1、MySQL中的空间扩展

https://www.mysqlzh.com/doc/172.html

http://dcx.sap.com/1201/zh/dbspatial/pg-api-spatial-st-geometry-type.html

2、MySQL中的空间类型

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题描述
  • 二、问题分析
  • 三、代码实现
    • 1、自定义UDF函数
      • 2、SparkSQL调用UDF函数
      • 四、知识拓展
        • 1、MySQL中的空间扩展
          • 2、MySQL中的空间类型
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档