前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Seatunnel - 架构解析

Apache Seatunnel - 架构解析

作者头像
tyrantlucifer
发布2022-08-30 14:28:31
5.1K0
发布2022-08-30 14:28:31
举报
文章被收录于专栏:Tyrant LuciferTyrant Lucifer

概述

Seatunnel 是一个非常易用,高性能、支持实时流式离线批处理的海量数据处理产品,架构于Apache SparkApache Flink之上,开源项目地址:https://github.com/apache/incubator-seatunnel

版本演变

Seatunnel原名为Waterdrop,在更名之后正式孵化为Apache项目,同时对于两个名字也对应了不同的版本,Waterdrop指1.x版本,Seatunnel指2.x版本,对于1.x和2.x有以下区别:

关键功能

1.x

2.x

支持spark

yes

yes

支持flink

no

yes

主要开发语言

scala

java

主要构建工具

sbt

maven

为什么我们需要Seatunnel

Apache Spark和Apache Flink对于分布式数据处理和流式数据处理来说是一个伟大的进步,但较高的使用门槛让数据处理人员需要学习spark和flink复杂的运行机制和api才能够使用的更加顺畅,为降低数据处理门槛,且让spark和flink变得更加易用,减少学习成本,加快分布式数据处理在生产环境的落地,Seatunnel应运而生。

基于当前大多数数据处理工作的一些思考

  1. 更多的数据处理是重复的
  2. 数据处理的代码是冗余的
  3. 在数据处理工作中有一部分的比例是数据同步工作,在离线数仓计算完成之后,往往会将ads层数据同步至对查询专门优化过的OLAP数据库(ck、es等)中以提供前端报表展示的功能,这些功能是否可以沉淀?是否可以复用?
  4. 在数据处理过程中,可能会有多种异构数据源接入的需求,例如file、redis、hdfs、kafka、mysql….,在面对这种异构数据源集成的需求时如何去更好的应对?
  5. 在当前越来越多大数据框架面世的基础上,大数据处理的方向慢慢变向了sql化和低代码化,在业务看来无论底层有多少数据都会是落成一张表或是多张表,如果可以使用sql就能够计算海量数据并快速获取正确结果,对于整个业务部门对于数据的利用将更加高效
  6. 假设企业中需要组建数据中台,如何对外快速提供数据处理的中台能力

Seatunnel可以解决的业务痛点

  1. 背靠spark和flink两大分布式数据框架,天生具有分布式数据处理的能力,使业务可以更加专注于数据的价值挖掘与处理,而不是专注于底层技术对于大数据的兼容和开发
  2. 利用spark和flink分布式框架对于异构数据源的兼容,可以实现快速的异构数据源同步和接入
  3. 高度抽象业务处理逻辑,减少代码的冗余和重复开发

Seatunnel优势与缺点

优势

  1. 简单易用,灵活配置,无需开发
  2. 模块化和插件化
  3. 支持利用SQL做数据处理和聚合
  4. 由于其高度封装的计算引擎架构,可以很好的与中台进行融合,对外提供分布式计算能力

缺点

  1. Spark支持2.2.0 - 2.4.8,不支持spark3.x
  2. Flink支持1.9.0,目前flink已经迭代至1.14.x,无法向上兼容
  3. Spark作业虽然可以很快配置,但相关人员还需要懂一些参数的调优才能让作业效率更优

相关竞品及对比

  • FlinkX,现已更名为chunjun
  • StreamX
  • DataX

关键功能

Seatunnel

FlinkX

StreamX

DataX

spark是否支持

yes

no

yes

no

flink是否支持

yes,高版本兼容性不好

yes,高版本兼容性不好

yes,高版本兼容性好

no

部署难度

轻松

中等

较难

容易

主要功能对比

etl、数据同步

数据同步

flink任务可视化部署

数据同步

Seatunnel核心理念与内核原理

核心概念

  1. 整个Seatunnel设计的核心是利用设计模式中的“控制翻转”或者叫“依赖注入”,主要概括为以下两点:
    1. 上层不依赖底层,两者都依赖抽象
    2. 流程代码与业务逻辑应该分离
  2. 对于整个数据处理过程,大致可以分为以下几个流程:输入 -> 转换 -> 输出,对于更复杂的数据处理,实质上也是这几种行为的组合:
  1. Seatunnel对于这几种数据处理的行为进行高度的抽象,在基于这层比较完善的抽象之上,对业务数据处理过程中的80%重复操作进行沉淀,做成可热插拔的插件,这样业务处理逻辑与整个数据处理实现了解耦,让用户更专注于业务的落地与实施。

内核原理

  1. 插件的动态注册使用了java spi技术,保证了框架的灵活扩展,设计思路参考了presto、es等,有兴趣的同学可以下去自行研究,es使用了google guice,presto使用的就是上面提到的java spi
  2. 在以上理论基础上,数据的转换需要做一个统一的抽象与转化,很契合的是spark或者flink都已经为我们做好了这个工作,spark的DataSet,flink的DataSet、DataStream都已经是对接入数据的一个高度抽象,本质上对数据的处理就是对这些数据结构的转换,同时这些数据在接入进来之后可以注册成上下文中的表,基于表就可以使用SQL进行处理
  3. 整个Seatunnel通过配置文件生成的是一个spark job或者flink job
  4. 技术栈包括以下:
    1. Java
    2. Scala
    3. Flink
    4. Spark
    5. Java spi

Spark插件体系架构设计

Flink插件体系架构设计

程序执行流程

最上层插件抽象实现细节

代码语言:javascript
复制
public interface Plugin<T> extends Serializable {
    // 配置文件的key
    String RESULT_TABLE_NAME = "result_table_name";
    String SOURCE_TABLE_NAME = "source_table_name";
    
    // 设置每个插件的config
    void setConfig(Config config);
    
    // 获取插件的配置 
    Config getConfig();
    
    // 对于config的校验
    CheckResult checkConfig();
    
    // 插件前准备
    void prepare(T prepareEnv);
}

Spark插件上层抽象实现细节

批处理Batch

Source
代码语言:javascript
复制
trait BaseSparkSource[Data] extends BaseSource[SparkEnvironment] {

  protected var config: Config = ConfigFactory.empty()

  override def setConfig(config: Config): Unit = this.config = config

  override def getConfig: Config = config

  def getData(env: SparkEnvironment): Data;

}
Transform
代码语言:javascript
复制
trait BaseSparkTransform extends BaseTransform[SparkEnvironment] {

  protected var config: Config = ConfigFactory.empty()

  override def setConfig(config: Config): Unit = this.config = config

  override def getConfig: Config = config

  def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row];

}
Output
代码语言:javascript
复制
trait BaseSparkSink[OUT] extends BaseSink[SparkEnvironment] {

  protected var config: Config = ConfigFactory.empty()

  override def setConfig(config: Config): Unit = this.config = config

  override def getConfig: Config = config

  def output(data: Dataset[Row], env: SparkEnvironment): OUT;

}

流处理Stream

代码语言:javascript
复制
trait SparkStreamingSource[T] extends BaseSparkSource[DStream[T]] {

  def beforeOutput(): Unit = {}

  def afterOutput(): Unit = {}

  def rdd2dataset(sparkSession: SparkSession, rdd: RDD[T]): Dataset[Row]

  def start(env: SparkEnvironment, handler: Dataset[Row] => Unit): Unit = {
    getData(env).foreachRDD(rdd => {
      val dataset = rdd2dataset(env.getSparkSession, rdd)
      handler(dataset)
    })
  }

}

Flink插件上层抽象实现细节

批处理Batch

Source
代码语言:javascript
复制
public interface FlinkBatchSource<T> extends BaseFlinkSource {

    DataSet<T> getData(FlinkEnvironment env);
}
Transform
代码语言:javascript
复制
public interface FlinkBatchTransform<IN, OUT> extends BaseFlinkTransform {

    DataSet<OUT> processBatch(FlinkEnvironment env, DataSet<IN> data);

}
Output
代码语言:javascript
复制
public interface FlinkBatchSink<IN, OUT> extends BaseFlinkSink {

    DataSink<OUT> outputBatch(FlinkEnvironment env, DataSet<IN> inDataSet);

}

流处理Stream

Source
代码语言:javascript
复制
public interface FlinkStreamSource<T> extends BaseFlinkSource {

    DataStream<T> getData(FlinkEnvironment env);

}
Transform
代码语言:javascript
复制
public interface FlinkStreamTransform<IN, OUT> extends BaseFlinkTransform {

    DataStream<OUT> processStream(FlinkEnvironment env, DataStream<IN> dataStream);
}
Output
代码语言:javascript
复制
public interface FlinkStreamSink<IN, OUT> extends BaseFlinkSink {

    DataStreamSink<OUT> outputStream(FlinkEnvironment env, DataStream<IN> dataStream);

}

自定义插件步骤

  1. 针对不同的框架和插件类型继承对应的接口,接口中的核心处理方法
  2. 在java spi中注册
  3. 将自己定义的jar包放在Seatunnel主jar包的plugins目录下

Java spi原理解析

概念

SPI全称Service Provider Interface,是Java提供的一套用来被第三方实现或者扩展的接口,它可以用来启用框架扩展和替换组件,SPI的作用就是为这些被扩展的API寻找服务实现

API和SPI的区别

API-(Application Programming Interface)大多数情况下,都是实现方制定接口并完成对接口的实现,调用方仅仅依赖接口调用,且无权选择不同实现。从使用人员上来说,API 直接被应用开发人员使用,SPI-(Service Provider Interface)是调用方来制定接口规范,提供给外部来实现调用方选择自己需要的外部实现。从使用人员上来说,SPI 被框架扩展人员使用

实现demo

  1. 定义接口
代码语言:javascript
复制
 package com.tyrantlucifer;
 
 public interface Animal {
     void shut();
 }
  1. 定义main函数,使用service loader进行动态加载
代码语言:javascript
复制
 package com.tyrantlucifer;
 
 import java.util.ServiceLoader;
 
 public class Main {
     public static void main(String[] args) {
         ServiceLoader<Animal> services = ServiceLoader.load(Animal.class);
         for (Animal service : services) {
             service.shut();
         }
     }
 }
  1. 实现接口
代码语言:javascript
复制
 package com.tyrantlucifer;
 
 public class Cat implements Animal {
     public void shut() {
         System.out.println("cat shut miao miao!!!");
     }
 }
代码语言:javascript
复制
 package com.tyrantlucifer;
 
 public class Dog implements Animal{
     public void shut() {
         System.out.println("dog shut wang wang!!!");
     }
 }

注册spi,需要在resources/META-INF/services下新建以接口全类名的文件,比如我们这次的接口com.tyrantlucifer.Animal,那么就新建一个com.tyrantlucifer.Animal文件,并在文件中添加自己的实现类:

代码语言:javascript
复制
com.tyrantlucifer.Cat
com.tyrantlucifer.Dog

Seatunnel demo演示

  1. Spark
代码语言:javascript
复制
 spark {
   spark.streaming.batchDuration = 5
   spark.app.name = "seatunnel"
   spark.ui.port = 13000
 }
 
 input {
   socketStream {}
 }
 
 filter {
   split {
     fields = ["msg", "name"]
     delimiter = ","
   }
 }
 
 output {
   stdout {}
 }
  1. Flink
代码语言:javascript
复制
 env {
   execution.parallelism = 1
 }
 
 source {
     SocketStream{
           result_table_name = "fake"
           field_name = "info"
     }
 }
 
 transform {
   Split{
     separator = "#"
     fields = ["name","age"]
   }
   sql {
     sql = "select * from (select info,split(info) as info_row from fake) t1"
   }
 }
 
 sink {
   ConsoleSink {}
 }
  1. 自定义插件
代码语言:javascript
复制
 class MyStdout extends BaseOutput {
 
   var config: Config = ConfigFactory.empty()
 
   /**
    * Set Config.
    * */
   override def setConfig(config: Config): Unit = {
     this.config = config
   }
 
   /**
    * Get Config.
    * */
   override def getConfig(): Config = {
     this.config
   }
 
   override def checkConfig(): (Boolean, String) = {
     if (!config.hasPath("limit") || (config.hasPath("limit") && config.getInt("limit") >= -1)) {
       (true, "")
     } else {
       (false, "please specify [limit] as Number[-1, " + Int.MaxValue + "]")
     }
   }
 
   override def prepare(spark: SparkSession): Unit = {
     super.prepare(spark)
 
     val defaultConfig = ConfigFactory.parseMap(
       Map(
         "limit" -> 100,
         "format" -> "plain" // plain | json | schema
       )
     )
     config = config.withFallback(defaultConfig)
   }
 
   override def process(df: Dataset[Row]): Unit = {
 
     val limit = config.getInt("limit")
 
     var format = config.getString("format")
     if (config.hasPath("serializer")) {
       format = config.getString("serializer")
     }
     format match {
       case "plain" => {
         if (limit == -1) {
           df.show(Int.MaxValue, false)
         } else if (limit > 0) {
           df.show(limit, false)
         }
       }
       case "json" => {
         if (limit == -1) {
           df.toJSON.take(Int.MaxValue).foreach(s => println(s))
 
         } else if (limit > 0) {
           df.toJSON.take(limit).foreach(s => println(s))
         }
       }
       case "schema" => {
         df.printSchema()
       }
     }
   }
 }

Q&A

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-04-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Tyrant Lucifer 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 版本演变
  • 为什么我们需要Seatunnel
  • 基于当前大多数数据处理工作的一些思考
  • Seatunnel可以解决的业务痛点
  • Seatunnel优势与缺点
    • 优势
      • 缺点
        • 相关竞品及对比
        • Seatunnel核心理念与内核原理
          • 核心概念
            • 内核原理
            • Spark插件体系架构设计
            • Flink插件体系架构设计
            • 程序执行流程
            • 最上层插件抽象实现细节
            • Spark插件上层抽象实现细节
              • 批处理Batch
                • Source
                • Transform
                • Output
              • 流处理Stream
              • Flink插件上层抽象实现细节
                • 批处理Batch
                  • Source
                  • Transform
                  • Output
                • 流处理Stream
                  • Source
                  • Transform
                  • Output
              • 自定义插件步骤
              • Java spi原理解析
                • 概念
                  • API和SPI的区别
                    • 实现demo
                      • Seatunnel demo演示
                      • Q&A
                      相关产品与服务
                      腾讯云 BI
                      腾讯云 BI(Business Intelligence,BI)提供从数据源接入、数据建模到数据可视化分析全流程的BI能力,帮助经营者快速获取决策数据依据。系统采用敏捷自助式设计,使用者仅需通过简单拖拽即可完成原本复杂的报表开发过程,并支持报表的分享、推送等企业协作场景。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档