作业开发和发布

最近更新时间:2019-10-23 17:33:02

步骤1:创建 SQL 作业

在共享集群上创建一个 SQL 作业:网站流量统计

  1. 进入 流计算 Oceanus 控制台,单击左侧导航【流计算】下的【作业管理】,进入作业管理页面。
  2. 单击【新建 SQL 作业】,进入【创建SQL作业】页面,输入相关信息后单击【立即购买】。
    新建SQL作业
  3. 作业创建成功,新创建的 SQL 作业可在【作业管理】查看,此时状态为“未初始化”。

步骤2:流计算服务委托授权

当进入作业【分析开发】时,会提示访问授权。

选择【作业管理】中名称为网站流量统计的作业,单击【分析开发】。在未授权时,弹出访问授权对话框如下,单击【前往授权】,授权流计算作业访问您的 CKafka、TencentDB 等资源。
角色授权弹框

说明:

此授权的详细说明参见 流计算服务委托授权

步骤3:编写 SQL 语句

完成授权操作后即可在【分析开发】的代码编辑框中输入 SQL 语句。
分析开发页面
操作过程如下:

  1. 创建数据源表、维表和数据结果表。
  2. 编写 SQL 代码,计算 PV/UV 值。
    /** 创建输入流表,从 CKafka 获取数据。
    *
    * type: 表示数据来源类型
    * instanceId:购买的 CKafka 实例的 ID,请到 CKafka 控制台获取
    * encoding: 输入数据编码格式为 json
    * topic: 前面建好的输入数据来源主题名称
    **/
    CREATE TABLE `page_visits` (
    `record_time` VARCHAR,
    `user_id` VARCHAR,
    `page_id` VARCHAR
    ) WITH (
    `type` = 'ckafka',
    `instanceId` = 'ckafka-xxxxxxxx',/*您自己的 id 与这里显示不同*/
    `encoding` = 'json',
    `topic` = 'page_visits'
    );
    /** 创建维表数据,定时从云数据库 MySQL 获取数据。
    * 
    * type: 表示维表数据来源类型
    * instanceId:购买的云数据库 MySQL 实例的 ID,请到 MySQL 控制台获取
    * user: 登录 MySQL 的用户名
    * password: 使用上述用户名登录 MySQL 实例的密码
    * database: 在购买的 MySQL 实例中新建的数据库名
    * table: 在上述新建的数据库中新建的维表名
    **/
    CREATE TABLE `page_info` (
    `page_id` VARCHAR,
    `page_name` VARCHAR,
    PRIMARY KEY ( `page_id` )
    ) WITH (
    `type` = 'mysql',
    `instanceId` = 'cdb-xxxxxxxx',/*您自己的 id 与这里显示不同*/
    `user` = 'root',
    `password` = '********',
    `database` = 'mydatabase',
    `table` = 'page_info'
    );
    /** 创建输出流表,结果输出到云数据库 MySQL 中。
    *
    * type: 表示输出数据目的地类型
    * instanceId:购买的云数据库 MySQL 实例的 ID,请到 MySQL 控制台获取
    * user: 登录 MySQL 的用户名
    * password: 使用上述用户名登录 MySQL 实例的密码
    * database: 在购买的 MySQL 实例中新建的数据库名
    * table: 在上述新建的数据库中新建的结果表名
    **/
    CREATE TABLE `pv_uv_results` (
    `record_time` VARCHAR,
    `page_name` VARCHAR,
    `pv` BIGINT,
    `uv` BIGINT,
    PRIMARY KEY ( `record_time`,`page_name` )
    ) WITH (
    `type` = 'mysql',
    `instanceId` = 'cdb-xxxxxxxx',/*您自己的 id 与这里显示不同*/
    `user` = 'root',
    `password` = '********',
    `database` = 'mydatabase',
    `table` = 'pv_uv_results'
    );
    /** 计算每个页面 每分钟的 PV 和 UV
    * 将它与保存页面名映射关系的维表实时 join
    * 得到每个页面名对应的每分钟 PV 和 UV
    * 将结果输出到云数据库 MySQL 的结果表中
    **/
    INSERT INTO pv_uv_results
    SELECT
    SUBSTRING(a.record_time, 1, 16) as record_time,
    b.page_name as page_name,
    count(a.user_id) as pv,
    count(DISTINCT a.user_id) as uv
    FROM page_visits a
    JOIN page_info b
    on a.page_id = b.page_id
    GROUP BY SUBSTRING(a.record_time, 1, 16),b.page_name;
  3. 单击【语法检查】,检查语法是否完全正确。如果提示错误,请根据提示修改 SQL 语句。

步骤4:调试 SQL 作业

编写完代码后,可选择在发布之前先进行调试。我们为用户提供了一套独立的模拟环境,用户可以在该环境中上传数据,模拟运行并检查输出结果。

  1. 在【分析开发】中单击【调试】,若编写 SQL 语句时语法检查无误,则会进入调试页面。
    上传调试数据界面
  2. 单击对应数据源的【上传】,从本地上传数据文件,上传的样例数据如下。
    • page_visits.txt文件,每行一条记录 ( record_time,user_id,page_id ),数据之间以逗号分隔, 示例如下:
      2018-05-23 18:00:48,user_5,pid_9
      2018-05-23 18:00:38,user_4,pid_2
      2018-05-23 18:00:28,user_6,pid_2
      2018-05-23 18:00:18,user_1,pid_0
      2018-05-23 18:01:08,user_4,pid_2
      2018-05-23 18:01:28,user_1,pid_0
      2018-05-23 18:01:18,user_2,pid_8
      2018-05-23 18:01:08,user_9,pid_5
    • page_info.txt文件,每行一条记录 ( page_id,page_name ),数据之间以逗号分隔,示例如下:
      pid_0,page0
      pid_1,page1
      pid_2,page2
      pid_3,page3
      pid_4,page4
      pid_5,page5
      pid_6,page6
      pid_7,page7
      pid_8,page8
      pid_9,page9
  3. 单击【开始调试】,界面显示如下。
    等待调试结果界面
  4. 单击【确定】,等待片刻后,调试结果显示如下。可单击【下载结果】下载调试结果。
    调试结果
    根据调试结果判断执行的 SQL 代码已达到预期效果后,即可进行发布操作。

步骤5:发布运行 SQL 作业

  1. 在【分析开发】页的【参数设置】中,设置是否开启 checkpoint(可默认开启)和时间间隔(如60秒)。
    保存并发布运行
  2. 单击【保存并发布运行】后,弹出对话框如下,允许指定数据消费时间点来运行作业,可选择默认值。选择数据消费时间点并单击【确定】,提交运行作业。
    指定时间运行

步骤6:导入数据到云数据库 MySQL 的维表 page_info

登录前面购买的云数据库 MySQL 实例,在新创建的数据库中,向维表 page_info 中插入(page_id,page_name)记录,命令如下:

insert into page_info values ("pid_0","page0"),("pid_1","page1"),("pid_2","page2"),("pid_3","page3"),("pid_4","page4"),("pid_5","page5"),("pid_6","page6"),("pid_7","page7"),("pid_8","page8"),("pid_9","page9");

步骤7:发送数据到消息队列 CKafka

前面已经购买消息队列 CKafka 作为数据源,提交作业后,用户可以将数据上传至 Ckafka 实例,实现向流计算 SQL 作业提供实时流数据源。

在本示例中,将构造本地数据并通过 Kafka 工具包上传至 Ckafka 实例,Kafka 工具包的使用请参见 下载 Kafka 工具包

发送数据到 Kafka 的命令如下:

# broker-list 中的 $ip $port 变量,指的是 CKafka 的接入点的 IP 和 port, 接入点信息可以在 CKafka 实例详情页查看
# topicName 为 CKafka 实例中的 topic 名称
./kafka-console-producer.sh --broker-list $ip:$port --topic page_visits

发送的样例数据示例如下,页面访问信息,每条记录为 json 格式,包含3个 key-value 对,即访问时间、访客 ID、页面 ID:

{record_time:2018-05-23 18:00:48, user_id:user_5, page_id:pid_9} 
{record_time:2018-05-23 18:00:38, user_id:user_4, page_id:pid_2}
{record_time:2018-05-23 18:00:28, user_id:user_6, page_id:pid_2}
{record_time:2018-05-23 18:00:18, user_id:user_1, page_id:pid_0}

步骤8:查看作业运行结果

单击作业的【线上运维】,可以查看运行作业的实例情况,包括线上正执行的 SQL 语句、抽样的源数据记录和计算结果记录,这里展示的都是明细数据。

抽样的计算结果如下:

page_name pv record_time uv
page9 1 2018-05-23 18:00 1
page2 1 2018-05-23 18:00 1
page2 2 2018-05-23 18:00 2
page0 1 2018-05-23 18:00 1
说明:

控制台展示的抽样的计算结果是针对每一个条输入数据的计算结果,最终聚合后的结果还需要登录云数据库 MySQL,查看 pv_uv_results 表 。