项目地址
https://gitee.com/dtstack_dev_0/chunjun
下载地址
https://github.com/DTStack/chunjun/releases
解压
mkdir -p /data/tools/bigdata/taier/chunjun
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz -C /data/tools/bigdata/taier/chunjun
创建配置文件
vi /etc/profile.d/chunjun.sh
加入:
export CHUNJUN_HOME=/data/tools/bigdata/taier/chunjun
export PATH=$CHUNJUN_HOME/bin:$PATH
配置立即生效
source /etc/profile
查看ZK_HOME
echo $CHUNJUN_HOME
本地提交
Local 模式不依赖Flink环境和Hadoop环境,在本地环境启动一个JVM进程执行纯钧任务。
sh $CHUNJUN_HOME/bin/chunjun-local.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json
Standalone模式依赖Flink Standalone环境,不依赖Hadoop环境。
将依赖文件复制到Flink lib目录下,例如
cp -r chunjun-dist $FLINK_HOME/lib
注意: 这个复制操作需要在所有Flink cluster机器上执行,否则部分任务会出现类找不到的错误。
启动Flink Standalone环境
sh $FLINK_HOME/bin/start-cluster.sh
运行
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json
Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME
和FLINK_HOME
我们需要使用yarn-session -t参数上传chunjun-dist
$FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_HOME -d
提交任务
Yarn监控页面查询:
http://192.168.7.102:8088/cluster
通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID
,进入到本地chunjun-dist目录,执行命令
yarn.application.id
也可以在 flink-conf.yaml 中设置;提交成功之后,可以通过 yarn web ui 上观察任务情况。
成功后会打印
JobManager Web Interface: http://hadoop01:45685
2022-11-14 14:17:52,433 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1667981758965_0017
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1667981758965_0017
Note that killing Flink might not clean up all job artifacts and temporary files.
我们可以看到
yarn.application.id
为application_1667981758965_0017
yarn application -kill application_1667981758965_0017
运行
bash $CHUNJUN_HOME/bin/chunjun-yarn-session.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"application_1667981758965_0017\"}
如果有正在运行的可以这样杀掉
yarn application -kill application_1667981758965_0017
Yarn Per-Job 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME
和FLINK_HOME
。
提交步骤
Yarn Per-Job 提交任务配置正确即可提交。
进入本地chunjun-dist目录,执行命令提交任务。
bash $CHUNJUN_HOME/bin/chunjun-yarn-perjob.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json
提交成功之后,可以通过 yarn web ui 上观察任务情况
Flink on Yarn 中的 Per Job 模式是指每次提交一个任务,然后任务运行完成之后资源就会被释放。
在了解了 Yarn 的原理之后,Per Job 的流程也就比较容易理解了,具体如下:
在 Per Job 模式中,执行完任务后整个资源就会释放,包括 JobManager、TaskManager 都全部退出。
而 Session 模式则不一样,它的 Dispatcher 和 ResourceManager 是可以复用的。
Session 模式下,当 Dispatcher 在收到请求之后,会启动 JobManager(A),让 JobManager(A) 来完成启动 TaskManager,接着会启动 JobManager(B) 和对应的 TaskManager 的运行。
当 A、B 任务运行完成后,资源并不会释放。
Session 模式也称为多线程模式,其特点是资源会一直存在不会释放,多个 JobManager 共享一个 Dispatcher,而且还共享 Flink-YARN ResourceManager。
Session 模式和 Per Job 模式的应用场景不一样。
Per Job 模式比较适合那种对启动时间不敏感,运行时间较长的任务。
Seesion 模式适合短时间运行的任务,一般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。
{
"job": {
"content": [
{
"reader": {
"parameter" : {
"username" : "username",
"password" : "password",
"cat" : "insert,delete,update",
"jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",
"host" : "ip",
"port" : 3308,
"start" : {
},
"table" : [ "tudou.kudu" ],
"splitUpdate" : false,
"pavingData" : true
},
"name" : "binlogreader"
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://ip:10000/tudou",
"username" : "",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"compress" : "",
"charsetName" : "UTF-8",
"maxFileSize" : 1073741824,
"analyticalRules" : "test_${schema}_${table}",
"schema" : "tudou",
"tablesColumn" : "{\"kudu\":[{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"type\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"schema\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"table\"},{\"comment\":\"\",\"type\":\"bigint\",\"key\":\"ts\"},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_id\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_id\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"before_name\"},{\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"after_name\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_age\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_age\",\"part\":false}]}",
"partition" : "pt",
"partitionType" : "MINUTE",
"defaultFS" : "hdfs://ns",
"hadoopConfig" : {
"dfs.ha.namenodes.ns": "nn1,nn2",
"fs.defaultFS": "hdfs://ns",
"dfs.namenode.rpc-address.ns.nn2": "ip:9000",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.ns.nn1": "ip:9000",
"dfs.nameservices": "ns",
"fs.hdfs.impl.disable.cache": "true",
"hadoop.user.name": "root",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}