前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Flink 实践教程:入门(2):写入 Elasticsearch

Flink 实践教程:入门(2):写入 Elasticsearch

作者头像
腾讯云大数据
发布于 2021-11-01 02:10:18
发布于 2021-11-01 02:10:18
60300
代码可运行
举报
文章被收录于专栏:腾讯云大数据腾讯云大数据
运行总次数:0
代码可运行

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用 datagen 连接器生成随机数据,经过流计算 Oceanus,最终将计算数据存入 Elasticsearch

前置准备
创建 流计算 Oceanus 集群

进入流计算 Oceanus 控制台(https://console.cloud.tencent.com/oceanus/overview),点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档创建独享集群(https://cloud.tencent.com/document/product/849/48298)。

创建 Elasticsearch 集群

进入Elasticsearch 控制台(https://console.cloud.tencent.com/es),点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问创建 Elasticsearch 集群(https://cloud.tencent.com/document/product/845/19536)

!创建流计算 Oceanus 集群和 Elasticsearch 集群时所选 VPC 必须是同一 VPC。

流计算 Oceanus 作业
1. 创建 Source
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

CREATE TABLE random_source ( 
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='1',  -- 每秒产生的数据条数
      
  'fields.f_sequence.kind'='sequence',   -- 有界序列(结束后自动停止输出)
  'fields.f_sequence.start'='1',         -- 序列的起始值
  'fields.f_sequence.end'='10000',       -- 序列的终止值
      
  'fields.f_random.kind'='random',       -- 无界的随机数
  'fields.f_random.min'='1',             -- 随机数的最小值
  'fields.f_random.max'='1000',          -- 随机数的最大值
      
  'fields.f_random_str.length'='10'      -- 随机字符串的长度
);
2. 创建 Sink
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- Elasticsearch 只能作为数据目的表(Sink)写入
-- 注意! 如果您启用了 Elasticsearch 的用户名密码鉴权功能, 目前只能使用 Flink 1.10 的旧语法。若无需鉴权, 则可以使用 Flink 1.11 的新语法。
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

CREATE TABLE Student (
    `user_id`   INT,
    `user_name` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch

    'connector.version' = '6',            -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 注意务必要和所选的内置 Connector 版本一致
    'connector.hosts' = 'http://10.0.0.175:9200',  -- Elasticsearch 的连接地址
    'connector.index' = 'Student',        -- Elasticsearch 的 Index 名
    'connector.document-type' = 'stu',    -- Elasticsearch 的 Document 类型
    'connector.username' = 'elastic',     -- 可选参数: 请替换为实际 Elasticsearch 用户名
    'connector.password' = 'xxxxxxxxxx',  -- 可选参数: 请替换为实际 Elasticsearch 密码

    'update-mode' = 'append',             -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
    'connector.key-delimiter' = '$',      -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.failure-handler' = 'retry-rejected',   -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)

    'connector.flush-on-checkpoint' = 'true',   -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
    'connector.bulk-flush.max-actions' = '42',  -- 可选参数, 每批次最多的条数
    'connector.bulk-flush.max-size' = '42 mb',  -- 可选参数, 每批次的累计最大大小 (只支持 mb)
    'connector.bulk-flush.interval' = '60000',  -- 可选参数, 批量写入的间隔 (ms)
    'connector.connection-max-retry-timeout' = '300',     -- 每次请求的最大超时时间 (ms)

    'format.type' = 'json'        -- 输出数据格式, 目前只支持 'json'
);
3. 编写业务 SQL
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSERT INTO Student
SELECT
f_sequence   AS user_id,
f_random_str AS user_name
FROM random_source;
4. 选择 Connector

点击【作业参数】,在【内置 Connector】选择 flink-connector-elasticsearch6,点击【保存】>【发布草稿】运行作业。

?新版 Flink 1.13 集群不需要用户选择内置 Connector。其他版本集群请根据实际购买的 Elasticsearch 版本选择对应的 Connector。

5. 数据查询

进入Elasticsearch 控制台(https://console.cloud.tencent.com/es),点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考通过 Kibana 访问集群(https://cloud.tencent.com/document/product/845/19541)

总结

本示例用 Datagen 连接器随机生成数据,经过 流计算 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Python安装PyGraphics包简
Python还比较年轻,这几个小软件很小,但找齐不那么容易,官网上可能由于对方有部分库遗失下载不下来,各处收集,全部收齐上传在此,且在命名时已按照逐一安装的顺序编好了号,需要的下载下来按序号安装!
py3study
2020/01/09
8270
windows环境下搭建python+nltk开发环境
nltk(Natural Language Tookit)实际上是python的一个开发包。对于自然语言处理任务非常有用。 =============================================================================================== NLTK 2.0官网:http://nltk.org/install.html ==========================================================
NateHuang
2018/03/14
8630
windows环境下搭建python+nltk开发环境
Selenium win7+selenium2.0+python+JetBrains PyCharm环境搭建
担心最新版的支持不太好,这里我下载的是python 2.7(selenium之前不支持python3.x)
授客
2019/09/11
6180
Selenium win7+selenium2.0+python+JetBrains PyCharm环境搭建
Python安装MySQL库详解(解决Microsoft Visual C++ 9.0 is required )
前面我们介绍的Python网络爬虫通常将抓取的数据存储至TXT或CSV文件,而当数据量增加之时,就需要将其存储至本地数据库了。Python访问数据库需要对应的接口程序,我们可以把接口程序理解为Python的一个模块,它提供了数据库客户端的接口供您访问。本文主要讲述Python操作MySQL数据库,通过调用MySQLdb扩展包实现。而在安装过程中会遇到各种各样的错误,这里给出安装MySQL库的详细步骤及错误的解决方法。
统计学家
2019/04/10
1.7K0
Python安装MySQL库详解(解决Microsoft Visual C++ 9.0 is required )
windows python flas
是一个轻量级的Web应用框架, 使用Python编写。基于 WerkzeugWSGI工具箱和 Jinja2模板引擎。Flask使用 BSD 授权。
py3study
2020/01/10
4980
selenium win7+selenium2.0+python环境搭建
担心最新版的支持不太好,这里我下载的是python 2.7(selenium之前不支持python3.x)
授客
2019/09/12
1.2K0
【Python基础】08、Python模
 可以将代码量较大的程序分割成多个有组织的、彼此独立但又能互相交互的代码片段,这些自我包含的有组织的代码段就是模块
py3study
2020/01/06
1.8K0
Python包管理整理:setuptoo
setuptool管理python相关的包 一、介绍 setuptool管理python相关的包的工具。这些包是zip格式发布,但是后缀一般都是.egg setuptool能解决python包的依赖关系 setuptool安装的包默认安装到/usr/local/lib/pythonX.X/site-packages/目录下 下载包默认到http://pypi.python.org/pypi下载 pypi为Python PackageIndex 二、安装setuptool工具 1、rhel/centos #yum -y install python-setuptools 2、freebsd #cd /usr/ports/devel/py-setuptools && make install clean 3、debian/ubuntu #sudo apt-get install python-setuptools 以上使用系统包管理系统安装后需要更新一下: # easy_install -U setuptools 4、通用方式 Download ez_setup.py , and then run: ez_setup.py -Zf http://peak.telecommunity.com/snapshots/ RuleDispatch #fetch http://peak.telecommunity.com/dist/ez_setup.py #python2.7 ez_setup.py python2.7指定版本号,以表示setuptool使用的python版本。未指定版本则使用默认,也表示默认安装的版本是最新版本。 这一约定方便,旧版本也可以继续使用 三、通过easy_install安装python包 (一)普通安装 #easy_install Babel (二)安装本地或网络文件系统中安装egg文件 #easy_install /net/src/eggs/py2.5.egg (三)指定包的下载路径安装 #easy_install http://trac-hacks.org/svn/iniadminplugin/0.11/ #easy_install http://trac-hacks.org/svn/accountmanagerplugin/trunk (四)从URL源码包安装 #easy_install  http://pypi.python.org/simple/asp/asp-0.1.2.4.tar.gz 条件asp-0.1.2.4.tar.gz包中的根目录中必须包括setup.py文件 (五)web上面搜索包,并自动安装 # easy_install -f http://pypi.python.org/simple/ asp (六)指定包的版本 # easy_install asp==0.1.2.1 如果指定的版本高于现有已安装的保本就是升级了 (七)升级包 升级到最新版本(不指定版本就会升级到最新版本 # easy_install -U asp 升级到指定版本 # easy_install -U asp==0.1.2.2 四、认证和配置文件 1、有些需要认证的python站点 easy_install -f http://uid@password@pypi.python.org/simple/packages 2、使用配置文件定义下载的站点和安装的目录 配置文件位置 当前目录/setup.cfg 或当前目录/.pydistutils.cfg 配置文件内容 find-links=http://pypi.python.org/simple/ #特定搜索包的URL allow=*.python.org #搜索的域名 install_dir=/src/lib/python    #这个目录需要在PYTHONPATH中 (sys.path) 更多帮助请看easy_install --help
py3study
2020/01/07
6780
python2.7.12源码编译
下载python源码包:  https://www.python.org/downloads/release/python-2712/ 下载setuptool包:  https://pypi.python.org/pypi/setuptools#code-of-conduct 下载pip包:  http://pypi.python.org/packages/source/p/pip/pip-1.0.tar.gz python官网上说2.7.9之后pip就在默认的包里,反正我发现2.7.12是没
BGBiao
2018/02/26
1.1K0
windows平台python 2.7环境编译安装zbar
最近一个项目需要识别二维码,找来找去找到了zbar和zxing,中间越过无数坑,总算基本上弄明白,分享出来给大家。
黯然销魂掌
2018/09/27
1.1K0
python监控windows的CPU,
有一批windows系统需要监控,无论是zabbix、nagios都需要安装相关插件,操作起来比较麻烦。
py3study
2020/01/07
1.5K0
selenium + python自动化测试环境搭建
-------------------------------------------------------------
流柯
2018/08/30
7450
Python+Selenium2 搭建自动化测试环境
米扑科技的许多项目都用到了爬虫采集网页数据,突破反爬虫、自动化测试、回归测试也要求米扑考虑构建自动化,来提高整个团队的极致工作效率。 由于忙于需求以及产品的流程规范,现在对于测试技术方面的研究也积累了很多。不过不管做什么,做好最重要! 搞自动化主要是出于团队建设考虑,一方面为了提供测试部门的工作效率,保障产品质量;另一方面,也是为了提升团队成员的测试技能,保证Team良性发展。不过不管如何,自动化是必须要搞,不然繁琐的回归测试是没有任何效率保证和质量保障的。 初步计划通过Python作为脚本语言,Selen
阳光岛主
2018/05/17
1.2K0
将打飞机游戏打包成 exe
发现很多朋友在写了 pygame 的打飞机游戏之后,都很想打包成 exe 文件分享给别人玩。但是在打包的过程中,可能遇到一些问题。今天我就来整理一下 pygame 打包 exe 的一些注意事项。 另外,pygame 打飞机游戏的相关资源,包括图片和代码,我放在了论坛的帖子里,需要的朋友可去下载。 在这里,我介绍两种较为常见的打包 exe 工具,cx_freeze 和 py2exe。先说 cx_freeze: cx_freeze 打包的过程比较简单,安装好之后运行一条命令就可以了。基本步骤就是: 1.搜索并下
Crossin先生
2018/04/17
1.5K0
python2.x和python3.x共
   下载完成之后,解压到python3所在的安装目录,用CMD控制台进入解压目录,输入:
py3study
2020/01/08
5130
升级python2.6.6到python
python2.6.6 升级到python2.7.14,此处不再描述 可以参考如下文档升级: https://blog.csdn.net/see_you_see_me/article/details/78550977
py3study
2020/01/08
7240
2018-03-24python3.6.2 Tensorflow环境配置(win10 64位)
一、机器配置 win10 64位 二、软件下载及安装(cmd以管理员身份运行) 1、python下载安装 https://www.python.org/ 版本3.6.2 https://www.pyt
用户1733354
2018/05/22
4790
Python-工具安装
Windows 2003平台,安装Python2.7.4,Python3.3,setuptools,pip,virtualenv。
py3study
2020/01/13
4760
python环境配置
第一步、安装python 的开发环境包,选择需要安装路径进行安装,笔者下载的是目前最新的 python2.7.5版本,安装目录为:C:\Python27。 第二步、安装setuptools 通过前面提供的setuptools 的连接 https://pypi.python.org/pypi/setuptools, 拖动页面到底部找到,https://pypi.python.org/packages/source/s/setuptools/setuptools-12.0.3.tar.gz#md5=f07e4b0f4c1c9368fcd980d888b29a65setuptools-1.3.2.tar.gz    文件(版本随着时间版本会有更新),对文件进行解压,找到ez_install.py 文件,进入windows 命令提示(开始--运行--cmd 命令,回车)下执行ez_install.py: C:\setuptools-1.3>python ez_install.py
py3study
2020/01/10
8190
电脑上同时安装Python2和Pytho
1.1、到Python的官网 https://www.python.org/ 下载Python的安装文件
py3study
2020/01/03
7930
相关推荐
Python安装PyGraphics包简
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文