专栏首页NebulaGraph 技术文章Spark Connector Writer 原理与实践
原创

Spark Connector Writer 原理与实践

nebula-spark-connector-reader

《Spark Connector Reader 原理与实践》中我们提过 Spark Connector 是一个 Spark 的数据连接器,可以通过该连接器进行外部数据系统的读写操作,Spark Connector 包含两部分,分别是 Reader 和 Writer,而本文主要讲述如何利用 Spark Connector 进行 Nebula Graph 数据的写入。

Spark Connector Writer 原理

Spark SQL 允许用户自定义数据源,支持对外部数据源进行扩展。

Nebula 的 Spark Connector 单条数据写入是基于 DatasourceV2 实现的,需要以下几个步骤:

  1. 继承 WriteSupport 并重写 createWriter,创建自定义的 DataSourceWriter
  2. 继承 DataSourceWriter 创建 NebulaDataSourceVertexWriter 类和 NebulaDataSourceEdgeWriter 类,重写 createWriterFactory 方法并返回自定义的 DataWriterFactory,重写 commit 方法,用来提交整个事务。重写 abort 方法,用来做事务回滚。Nebula Graph 1.x 不支持事务操作,故该实现中 commitabort 无实质性操作。
  3. 继承 DataWriterFactory 创建 NebulaVertexWriterFactory 类和 NebulaEdgeWriterFactory 类,重写 createWriter 方法返回自定义的 DataWriter
  4. 继承 DataWriter 创建 NebulaVertexWriter 类和 NebulaEdgeWriter 类,重写 write 方法,用来将数据写出,重写 commit 方法用来提交事务,重写 abort 方法用来做事务回滚 ,同样 DataWriter 中的 commit 方法和 abort 方法无实质性操作。

Nebula 的 Spark Connector Writer 的实现类图如下:

nebula-spark-connector-writer

具体写入逻辑在 NebulaVertexWriter 和 NebulaEdgeWriter 的 write 方法中,一次写入的逻辑如下:

  1. 创建客户端,连接 Nebula 的 graphd 服务;
  2. 数据写入前先指定 graphSpace;
  3. 构造 Nebula 的数据写入 statement;
  4. 提交 statement,执行写入操作;
  5. 定义回调函数接收写入操作执行结果。

Nebula 的 Spark Connector 的批量数据写入与 Exchange 工具类似,是通过对 DataFrame 进行 map 操作批量数据累计提交实现的。

Spark Connector Writer 实践

Spark Connector 的 Writer 功能提供了两类接口供用户编程进行数据写入。写入的数据源为 DataFrame,Spark Writer 提供了单条写入批量写入两类接口。

拉取 GitHub 上 Spark Connector 代码:

git clone -b v1.0 https://github.com/vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true

将编译打成的包 copy 到本地 maven 库。

应用示例如下:

  1. 在 mvn 项目的 pom 文件中加入 nebula-spark 依赖
<dependency>
  <groupId>com.vesoft</groupId>
  <artifactId>nebula-spark</artifactId>
  <version>1.0.1</version>
</dependency>
  1. 在 Spark 程序中将 DataFrame 数据写入 Nebula
  • 2.1 逐条写入 Nebula:
// 构造点和边数据的 DataFrame ,示例数据在 nebula-java/examples/src/main/resources 目录下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
    vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
		edgeDF.show()

// 写入点
vertexDF.write
  .nebula("127.0.0.1:3699", "nb", "100")
  .writeVertices("player", "vertexId", "hash")
  
// 写入边
edgeDF.write
	.nebula("127.0.0.1:3699", "nb", "100")
  .wirteEdges("follow", "source", "target")

配置说明:

  • nebula(address: String, space: String, partitionNum: String)
    • address:可以配置多个地址,以英文逗号分割,如“ip1:3699,ip2:3699”
    • space: Nebula 的 graphSpace
    • partitionNum:创建 space 时指定的 Nebula 中的 partitionNum,未指定则默认为 100
  • writeVertices(tag: String, vertexFiled: String, policy: String = "")
    • tag:Nebula 中点的 tag
    • vertexFiled:Dataframe 中可作为 Nebula 点 ID 的列,如 DataFrame 的列为 a,b,c,如果把 a 列作为点的 ID 列,则该参数设置为 a
    • policy:若 DataFrame 中 vertexFiled 列的数据类型非数值型,则需要配置 Nebula 中 VID 的映射策略
  • writeEdges(edge: String, srcVertexField: String, dstVertexField: String, policy: String = "")
    • edge:Nebula 中边的 edge
    • srcVertexField:DataFrame 中可作为源点的列
    • dstVertexField:DataFrame 中可作为边目标点的列
    • policy:若 DataFrame 中 srcVertexField 列或 dstVertexField 列的数据类型非数值型,则需要配置 Nebula 中 edge ID 的映射策略
  • 2.2 批量写入 Nebula
// 构造点和边数据的 DataFrame ,示例数据在 nebula-java/examples/src/main/resources 目录下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
    vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
		edgeDF.show()

// 批量写入点
new NebulaBatchWriterUtils()
      .batchInsert("127.0.0.1:3699", "nb", 2000)
      .batchToNebulaVertex(vertexDF, "player", "vertexId")
  
// 批量写入边
new NebulaBatchWriterUtils()
      .batchInsert("127.0.0.1:3699", "nb", 2000)
      .batchToNebulaEdge(edgeDF, "follow", "source", "target")

配置说明:

  • batchInsert(address: String, space: String, batch: Int = 2000)
    • address:可以配置多个地址,以英文逗号分割,如“ip1:3699,ip2:3699”
    • space:Nebula 的 graphSpace
    • batch:批量写入时一批次的数据量,可不配置,默认为 2000
  • batchToNebulaVertex(data: DataFrame, tag: String, vertexField: String, policy: String = "")
    • data:待写入 Nebula 的 DataFrame 数据
    • tag:Nebula 中点的 tag
    • vertexField:Dataframe 中可作为 Nebula 点 ID 的列
    • policy:Nebula 中 VID 的映射策略,当 vertexField 列的值为数值时可不配置
  • batchToNebulaEdge(data: DataFrame,  edge: String, srcVertexField: String, dstVertexField: String, rankField: String = "",  policy: String = "")
    • data:待写入 Nebula 的 DataFrame 数据
    • edge:Nebula 中边的 edge
    • srcVertexField:DataFrame 中可作为源点的列
    • dstVertexField:DataFrame 中可作为边目标点的列
    • rankField:DataFrame 中可作为边 rank 值的列,可不配置
    • policy:edge 中点的映射策略,当 srcVertexField 和 dstVertexField 列的值为数值时可不配置

至此,Nebula Spark Connector Writer 讲解完毕,欢迎前往 GitHub:https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/nebula-spark 试用。

喜欢这篇文章?来来来,给我们的 GitHub 点个 star 表鼓励啦~~ 🙇‍♂️🙇‍♀️ 手动跪谢

交流图数据库技术?交个朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你进交流群~~

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark Connector Reader 原理与实践

    本文主要讲述如何利用 Spark Connector 进行 Nebula Graph 数据的读取。

    NebulaGraph
  • Nebula Flink Connector 的原理和实践

    摘要:本文所介绍 Nebula Graph 连接器 Nebula Flink Connector,采用类似 Flink 提供的 Flink Connector ...

    NebulaGraph
  • Nebula Exchange 工具 Hive 数据导入的踩坑之旅

    摘要:本文由社区用户 xrfinbj 贡献,主要介绍 Exchange 工具从 Hive 数仓导入数据到 Nebula Graph 的流程及相关的注意事项。

    NebulaGraph
  • 【问底】许鹏:使用Spark+Cassandra打造高性能数据分析平台(一)

    【导读】笔者(许鹏)看Spark源码的时间不长,记笔记的初衷只是为了不至于日后遗忘。在源码阅读的过程中秉持着一种非常简单的思维模式,就是努力去寻找一条贯穿全局的...

    CSDN技术头条
  • Apache Hudi在Linkflow构建实时数据湖的生产实践

    Linkflow 作为客户数据平台(CDP),为企业提供从客户数据采集、分析到执行的运营闭环。每天都会通过一方数据采集端点(SDK)和三方数据源,如微信,微博等...

    ApacheHudi
  • 【问底】许鹏:使用Spark+Cassandra打造高性能数据分析平台(二)

    【导读】笔者(许鹏)看Spark源码的时间不长,记笔记的初衷只是为了不至于日后遗忘。在源码阅读的过程中秉持着一种非常简单的思维模式,就是努力去寻找一条贯穿全局的...

    CSDN技术头条
  • Tomcat connector 实现原理

    java404
  • Apache Hudi 0.8.0版本重磅发布

    自从Hudi 0.7.0版本支持Flink写入后,Hudi社区又进一步完善了Flink和Hudi的集成。包括重新设计性能更好、扩展性更好、基于Flink状态索引...

    ApacheHudi
  • 基于HBase和Spark构建企业级数据处理平台

    Micro-Batch Processing:100ms延迟 ,Continuous Processing:1ms延迟

    用户1564362

扫码关注云+社区

领取腾讯云代金券