前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >快速体验 Flink Table Store

快速体验 Flink Table Store

作者头像
一见
发布2022-11-23 16:24:59
5050
发布2022-11-23 16:24:59
举报
文章被收录于专栏:蓝天蓝天

在本地安装单机版本,能够实现快速体验 Flink Table Store 的目的,本文以 Flink 1.15.2、flink-table-store-dist-0.2.1 和 flink-shaded-hadoop-2-uber-2.8.3-10.0 为例,系统为 Centos 3.10。

下载

  • 1、下载 Flink 1.15.2
代码语言:javascript
复制
https:///closer.lua/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz 
  • 2、下载 Table Store 包
代码语言:javascript
复制
https:///closer.lua/flink/flink-table-store-0.2.1/flink-table-store-dist-0.2.1.jar 
  • 3、下载 hadoop 包
代码语言:javascript
复制

安装

将下载好的 flink-1.15.2-bin-scala_2.12.tgz、flink-table-store-dist-0.2.1.jar 和 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 都放到安装目录下,执行以下步骤完成安装:

  • tar -xzf flink-1.15.2-bin-scala_2.12.tgz
  • cp flink-table-store-dist-0.2.1.jar flink-1.15.2/lib/
  • cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-1.15.2/lib/

配置

编辑 conf 目录下的配置文件 flink-conf.yaml,将配置项 taskmanager.numberOfTaskSlots 的值改为 2 。

代码语言:javascript
复制
vi ./conf/flink-conf.yaml

taskmanager.numberOfTaskSlots: 2 

启动本地(单机模式)集群

代码语言:javascript
复制
./bin/start-cluster.sh 

如果启动成功,则执行 jps 命令可看到 TaskManagerRunner 和 StandaloneSessionClusterEntrypoint 两个进程,执行“netstat -lpnt”可看到开启了 127.0.0.1:8081 看板端口。如果想修改看板端口,则只需要编辑配置文件 conf/flink-conf.yaml,将配置项 rest.port 改为其它值,然后重启(执行 bin/stop-cluster.sh 停止本地集群)即可。默认看板的地址为 localhost,如果需要远程查看,则还需要修改配置项 rest.bind-address,比如可将 localhost 改为 0.0.0.0 。

如果启动报错“Unsupported major.minor version 52.0”,这是因为 JDK 的版本不匹配,可执行命令“java -version”查看 JDK 的版本。根据 flink-1.15 的发布说明,建议 Java 11,但 Java 8 也可以。本文使用“Tencent Kona JDK 11.0.17”,下载地址:

代码语言:javascript
复制

可将 TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz 上传到 /usr/local 目录,然后解压,再建立软链接:

代码语言:javascript
复制
cd /usr/local
tar xzf TencentKona-11.0.17.b1-jdk_linux-x86_64.tar.gz ln -s TencentKona-11.0.17.b1 jdk 

将 JDK 加入 PATH 中:

代码语言:javascript
复制
export JAVA_HOME=/usr/local/jdk export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=$JAVA_HOME/lib/tools.jar 

启动成功后,就可开始体验了。

体验 Flink Table Store

以单机模式执行“./bin/sql-client.sh embedded”,进入 SQL 操作界面,然后可按如下步骤开始体验:

  • 1、创建表*
代码语言:javascript
复制
CREATE CATALOG my_catalog WITH ( 'type'='table-store', 'warehouse'='file:/tmp/table_store' );

USE CATALOG my_catalog; -- create a word count table CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT ); 
  • 2、写数据
代码语言:javascript
复制
-- create a word data generator table CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.word.length' = '1' ); -- table store requires checkpoint interval in streaming mode SET 'execution.checkpointing.interval' = '10 s'; -- write streaming data to dynamic table -- 在 flink 的看板可看到产生了一个 job INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word; 

注意,对表 word_table 只能执行流查询,执行批查询时会报如下所示的错误:

代码语言:javascript
复制
org.apache.flink.table.api.ValidationException: Querying an unbounded table 'my_catalog.default.word_table' in batch mode is not allowed. The table source is unbounded. 
  • 3、批量查询
代码语言:javascript
复制
-- 设置结果输出模式为 tableau,即表哥方式输出 SET 'sql-client.execution.result-mode' = 'tableau'; -- 切换到批模式 RESET 'execution.checkpointing.interval'; SET 'execution.runtime-mode' = 'batch'; SELECT * FROM word_count; 
  • 4、流式查询
代码语言:javascript
复制
-- 切换到流模式 SET 'execution.runtime-mode' = 'streaming'; SELECT * FROM word_count; 
  • 5、结束体验
代码语言:javascript
复制
-- drop the dynamic table, clear the files DROP TABLE word_count; -- exit sql-client EXIT; 
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-11-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 下载
  • 安装
  • 配置
  • 启动本地(单机模式)集群
  • 体验 Flink Table Store
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档