前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 实践教程:入门2-写入 Elasticsearch

Flink 实践教程:入门2-写入 Elasticsearch

原创
作者头像
吴云涛
修改2021-12-08 16:05:27
1.1K0
修改2021-12-08 16:05:27
举报

Oceanus简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch 。

通过 Flink 生成数据写入到 Elasticsearch

前置准备

创建 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群

进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群

创建 Elasticsearch 集群

进入 Elasticsearch 控制台,点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问 创建 Elasticsearch 集群

!创建 Oceanus 集群和 Elasticsearch 集群时所选 VPC 必须是同一 VPC。

Oceanus 作业

1. 创建 Source
代码语言:txt
复制
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

CREATE TABLE random_source ( 
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='1',  -- 每秒产生的数据条数
  
  'fields.f_sequence.kind'='sequence',   -- 有界序列(结束后自动停止输出)
  'fields.f_sequence.start'='1',         -- 序列的起始值
  'fields.f_sequence.end'='10000',       -- 序列的终止值
  
  'fields.f_random.kind'='random',       -- 无界的随机数
  'fields.f_random.min'='1',             -- 随机数的最小值
  'fields.f_random.max'='1000',          -- 随机数的最大值
      
  'fields.f_random_str.length'='10'      -- 随机字符串的长度
);
2. 创建 Sink
代码语言:txt
复制
-- Elasticsearch 只能作为数据目的表(Sink)写入
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

CREATE TABLE Student (
    `user_id`   INT,
    `user_name` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch

    'connector.version' = '6',            -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 注意务必要和所选的内置 Connector 版本一致
    'connector.hosts' = 'http://10.0.0.175:9200',  -- Elasticsearch 的连接地址
    'connector.index' = 'Student',        -- Elasticsearch 的 Index 名
    'connector.document-type' = 'stu',    -- Elasticsearch 的 Document 类型
    'connector.username' = 'elastic',     -- 可选参数: 请替换为实际 Elasticsearch 用户名
    'connector.password' = 'xxxxxxxxxx',  -- 可选参数: 请替换为实际 Elasticsearch 密码

    'update-mode' = 'append',             -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
    'connector.key-delimiter' = '$',      -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.failure-handler' = 'retry-rejected',   -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)
    'connector.connection-max-retry-timeout' = '300',     -- 每次请求的最大超时时间 (ms)

    'format.type' = 'json'        -- 输出数据格式, 目前只支持 'json'
);
3. 编写业务 SQL
代码语言:txt
复制
INSERT INTO Student
SELECT
    f_sequence   AS user_id,
    f_random_str AS user_name
FROM random_source;
4. 选择 Connector

点击【作业参数】,在【内置 Connector】选择 flink-connector-elasticsearch6,点击【保存】>【发布草稿】运行作业。

?新版 Flink 1.13 集群不需要用户选择内置 Connector。其他版本集群请根据实际购买的 Elasticsearch 版本选择对应的 Connector。

5. 数据查询

进入 Elasticsearch 控制台,点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考 通过 Kibana 访问集群

总结

本示例用 Datagen 连接器随机生成数据,经过 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Oceanus简介
  • 前置准备
    • 创建 Oceanus 集群
      • 创建 Elasticsearch 集群
      • Oceanus 作业
        • 1. 创建 Source
          • 2. 创建 Sink
            • 3. 编写业务 SQL
              • 4. 选择 Connector
                • 5. 数据查询
                • 总结
                相关产品与服务
                流计算 Oceanus
                流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档