作业开发和发布

最近更新时间:2019-10-23 17:33:35

步骤1:JAR 包开发

根据业务逻辑编写 JAR 包。

  1. 创建一个流计算作业,根据上报到 Ckafka 的监控数据,计算内存使用率。
    基于 Flink 提供的 Table API 编写流计算作业,在 Ckafka 中创建两个 Topic,分别作为流计算作业的 source 和 sink,即 CKafka 源表和中间结果表, 指定维表为 TencentDB,代码示例如下。可保存为TestJob.jar文件。
    //指定源数据表为流表CKafka
    tableEnv.connect(new Kafka().version("0.10").topic("DemoSource")
                  .property("bootstrap.servers", "******")
                  .property("group.id", "Demo").startFromEarliest())
                  .withFormat(new Json())
                  .withSchema(new Schema()
                     .field("JobName", Types.STRING)
                     .field("UsedGB", Types.DOUBLE)
                     .field("TotalGB", Types.DOUBLE)
                     .field("Type", Types.INT)
                      )
                  .inAppendMode().registerTableSource("MoniteDataSource");
    //指定源数据表为维表TencentDB
    String sql = "SELECT * FROM Threshold";
    JDBCInputFormat inputFormat = JDBCInputFormat.buildJDBCInputFormat()
                 .setDrivername("com.mysql.cj.jdbc.Driver")
                 .setDBUrl("**********")
                 .setUsername("******")
                 .setPassword("******")
                 .setQuery(sql)
                 .setRowTypeInfo(
                     new RowTypeInfo(
                         new TypeInformation<?>[]{                                                                           BasicTypeInfo.DOUBLE_TYPE_INFO,
                         BasicTypeInfo.INT_TYPE_INFO}
                  )).finish();
    DataStream<Row> mysqlStream = env.createInput(inputFormat);
    tableEnv.registerDataStream("Threshold", mysqlStream, "Value, Type");
    //指定目的表为流表CKafka
    tableEnv.connect(new Kafka().version("0.10").topic("DemoSink")
             .property("bootstrap.servers", "*******")
             .sinkPartitionerFixed())
             .inAppendMode()
             .withFormat(new Json())
             .withSchema(new Schema()
                     .field("JobName", Types.STRING)
                     .field("UsedRatio", Types.DOUBLE)
                     .field("UsedThreshold", Types.DOUBLE))
             .registerTableSink("MoniteDataSink");
    //计算内存使用率的逻辑
    tableEnv.sqlUpdate("insert into MoniteDataSink select a.JobName, a.UsedGB * 1.0 / a.TotalGB, b.Value from MoniteDataSource as a, Threshold as b where a.Type=b.Type");
  2. 创建另一个流计算作业,用于根据内存使用情况判断是否要触发阈值告警。
    将上述作业生成的 Ckafka 中间结果表作为新作业的源表,同时在此 CKafka 中创建一个 Topic 作为新作业的最终结果表来保存计算的告警结果。定义新作业的数据源、数据目的,以及告警计算的逻辑,代码示例如下,可以保存为TestJob2.jar文件。
    //指定源表为流表CKafka,使用第一个作业的计算结果的Topic作为第二个作业的数据源
    tableEnv.connect(new Kafka().version("0.10").topic("DemoSink")
         .property("bootstrap.servers", "*******")
         .sinkPartitionerFixed())
         .inAppendMode()
         .withFormat(new Json())
         .withSchema(new Schema()
                 .field("JobName", Types.STRING)
                 .field("UsedRatio", Types.DOUBLE)
                 .field("UsedThreshold", Types.DOUBLE))
                 .registerTableSink("MoniteDataSource_1");
    //指定目的表为ckafka
    tableEnv.connect(new Kafka().version("0.10").topic("DemoSink_1")
         .property("bootstrap.servers", "*******")
         .sinkPartitionerFixed())
         .inAppendMode()
         .withFormat(new Json())
         .withSchema(new Schema()
             .field("JobName", Types.STRING)
             .field("AlertOrNot", Types.INT)).registerTableSink("MoniteDataSink_1");
    //计算告警结果
    tableEnv.sqlUpdate("insert into MoniteDataSink_1 select JobName, case when UsedRatio - UsedThreshold > 0 then 1 else 0 end  from MoniteSource_1");

步骤2:创建 JAR 作业

在独享集群上创建 JAR 作业。

  1. 进入 流计算 Oceanus 控制台,单击左侧导航【流计算】下的【作业管理】,进入作业管理页面。
  2. 单击【新建JAR作业】,进入【新建作业】页面,选择您的独享集群,然后单击【下一步】。
    新建作业1
    说明:

    如果这一步没有可选的独享集群,请参见 准备独享集群 创建独享集群。

  3. 输入作业名称作业监控告警1,算子默认并行度选择默认值,然后单击【完成创建】,即可在【作业管理】页面看到新创建的作业。
    新建作业2
  4. 创建另一个流计算作业作业监控告警2,步骤同上。

步骤3:流计算服务委托授权

当进入作业【分析开发】时,会提示访问授权。

选择【作业管理】中名称为作业监控告警1的作业,单击【分析开发】。在未授权时,弹出访问授权对话框如下,单击【前往授权】,授权流计算作业访问您的 CKafka、TencentDB 等资源。
角色授权弹框

说明:

此授权的详细说明参见 流计算服务委托授权

步骤4:JAR 作业开发和发布

  1. 选择【作业管理】中作业名称为作业监控告警1的作业,然后单击【分析开发】,进入到作业开发页面。上传TestJob1.jar,输入下图 MainClass 和主类入参,单击【保存并发布运行】。
    JAR作业开发
  2. 选择【作业管理】中作业名称为作业监控告警2的作业,然后单击【分析开发】,进入到作业开发页面。上传TestJob2.jar,MainClass 填写com.tencent.demo.TestJob2,主类入参填写同上。

步骤5:结果数据查看

作业提交完成后,即可向数据源 CKafka 的 Topic 中导入数据。数据会经过流计算作业处理,并将结果写入到数据目的 CKafka 的 Topic 中。您可以通过官方 Kafka 的客户端,来消费目的 CKafka 中对应的 Topic 来获得相应的结果。