前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据开发之ChunJun使用

大数据开发之ChunJun使用

作者头像
码客说
发布2022-11-22 16:27:04
1K0
发布2022-11-22 16:27:04
举报
文章被收录于专栏:码客码客

前言

项目地址

https://gitee.com/dtstack_dev_0/chunjun

下载地址

https://github.com/DTStack/chunjun/releases

解压

代码语言:javascript
复制
mkdir -p /data/tools/bigdata/taier/chunjun
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz -C /data/tools/bigdata/taier/chunjun

配置环境变量

创建配置文件

代码语言:javascript
复制
vi /etc/profile.d/chunjun.sh

加入:

代码语言:javascript
复制
export CHUNJUN_HOME=/data/tools/bigdata/taier/chunjun 
export PATH=$CHUNJUN_HOME/bin:$PATH

配置立即生效

代码语言:javascript
复制
source /etc/profile

查看ZK_HOME

代码语言:javascript
复制
echo $CHUNJUN_HOME

提交任务

Local

本地提交

Local 模式不依赖Flink环境和Hadoop环境,在本地环境启动一个JVM进程执行纯钧任务。

代码语言:javascript
复制
sh $CHUNJUN_HOME/bin/chunjun-local.sh  -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json

Standalone

Standalone模式依赖Flink Standalone环境,不依赖Hadoop环境。

将依赖文件复制到Flink lib目录下,例如

代码语言:javascript
复制
cp -r chunjun-dist $FLINK_HOME/lib

注意: 这个复制操作需要在所有Flink cluster机器上执行,否则部分任务会出现类找不到的错误。

启动Flink Standalone环境

代码语言:javascript
复制
sh $FLINK_HOME/bin/start-cluster.sh

运行

代码语言:javascript
复制
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json

Yarn Session

Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOMEFLINK_HOME

我们需要使用yarn-session -t参数上传chunjun-dist

代码语言:javascript
复制
$FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_HOME -d

提交任务

Yarn监控页面查询:

http://192.168.7.102:8088/cluster

通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令

yarn.application.id 也可以在 flink-conf.yaml 中设置;提交成功之后,可以通过 yarn web ui 上观察任务情况。

成功后会打印

代码语言:javascript
复制
JobManager Web Interface: http://hadoop01:45685
2022-11-14 14:17:52,433 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1667981758965_0017
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1667981758965_0017
Note that killing Flink might not clean up all job artifacts and temporary files.

我们可以看到

运行

代码语言:javascript
复制
bash $CHUNJUN_HOME/bin/chunjun-yarn-session.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"application_1667981758965_0017\"}

如果有正在运行的可以这样杀掉

代码语言:javascript
复制
yarn application -kill application_1667981758965_0017

Yarn Per-Job

Yarn Per-Job 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOMEFLINK_HOME

提交步骤

Yarn Per-Job 提交任务配置正确即可提交。

进入本地chunjun-dist目录,执行命令提交任务。

代码语言:javascript
复制
bash $CHUNJUN_HOME/bin/chunjun-yarn-perjob.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json

提交成功之后,可以通过 yarn web ui 上观察任务情况

Yarn-Per-Job和Yarn-Session对比

Flink on Yarn-Per Job

edbe4f0ced94f1ead87a8da3bcf77da6.png
edbe4f0ced94f1ead87a8da3bcf77da6.png

Flink on Yarn 中的 Per Job 模式是指每次提交一个任务,然后任务运行完成之后资源就会被释放。

在了解了 Yarn 的原理之后,Per Job 的流程也就比较容易理解了,具体如下:

  • 首先 Client 提交 Yarn App,比如 JobGraph 或者 JARs。
  • 接下来 Yarn 的 ResourceManager 会申请第一个 Container。这个 Container 通过 Application Master 启动进程,Application Master 里面运行的是 Flink 程序,即 Flink-Yarn ResourceManager 和 JobManager。
  • 最后 Flink-Yarn ResourceManager 向 Yarn ResourceManager 申请资源。当分配到资源后,启动 TaskManager。 TaskManager 启动后向 Flink-Yarn ResourceManager 进行注册,注册成功后 JobManager 就会分配具体的任务给 TaskManager 开始执行。

Flink on Yarn-Session

4c625a6ebac4d04b110718701d70df03.png
4c625a6ebac4d04b110718701d70df03.png

在 Per Job 模式中,执行完任务后整个资源就会释放,包括 JobManager、TaskManager 都全部退出。

而 Session 模式则不一样,它的 Dispatcher 和 ResourceManager 是可以复用的。

Session 模式下,当 Dispatcher 在收到请求之后,会启动 JobManager(A),让 JobManager(A) 来完成启动 TaskManager,接着会启动 JobManager(B) 和对应的 TaskManager 的运行。

当 A、B 任务运行完成后,资源并不会释放。

Session 模式也称为多线程模式,其特点是资源会一直存在不会释放,多个 JobManager 共享一个 Dispatcher,而且还共享 Flink-YARN ResourceManager。

应用场景

Session 模式和 Per Job 模式的应用场景不一样。

Per Job 模式比较适合那种对启动时间不敏感,运行时间较长的任务。

Seesion 模式适合短时间运行的任务,一般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。

配置

mysql to hive

代码语言:javascript
复制
{
  "job": {
    "content": [
      {
        "reader": {
          "parameter" : {
            "username" : "username",
            "password" : "password",
            "cat" : "insert,delete,update",
            "jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
            "host" : "ip",
            "port" : 3308,
            "start" : {
            },
            "table" : [ "tudou.kudu" ],
            "splitUpdate" : false,
            "pavingData" : true
          },
          "name" : "binlogreader"
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
            "username" : "",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "compress" : "",
            "charsetName" : "UTF-8",
            "maxFileSize" : 1073741824,
            "analyticalRules" : "test_${schema}_${table}",
            "schema" : "tudou",
            "tablesColumn" : "{\"kudu\":[{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"type\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"schema\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"table\"},{\"comment\":\"\",\"type\":\"bigint\",\"key\":\"ts\"},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_id\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_id\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"before_name\"},{\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"after_name\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_age\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_age\",\"part\":false}]}",
            "partition" : "pt",
            "partitionType" : "MINUTE",
            "defaultFS" : "hdfs://ns",
            "hadoopConfig" : {
              "dfs.ha.namenodes.ns": "nn1,nn2",
              "fs.defaultFS": "hdfs://ns",
              "dfs.namenode.rpc-address.ns.nn2": "ip:9000",
              "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.ns.nn1": "ip:9000",
              "dfs.nameservices": "ns",
              "fs.hdfs.impl.disable.cache": "true",
              "hadoop.user.name": "root",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-11-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 配置环境变量
  • 提交任务
    • Local
      • Standalone
        • Yarn Session
          • Yarn Per-Job
          • Yarn-Per-Job和Yarn-Session对比
            • Flink on Yarn-Per Job
              • Flink on Yarn-Session
                • 应用场景
                • 配置
                  • mysql to hive
                  相关产品与服务
                  大数据
                  全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档