前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中DataStream和Table互相转换

Flink中DataStream和Table互相转换

作者头像
码客说
发布2023-01-13 13:49:29
1.7K0
发布2023-01-13 13:49:29
举报
文章被收录于专栏:码客码客

前言

Flink 为处理一列转多列的场景提供了两种返回类型 Tuple 和 Row

  • Tuple 只支持1~25个字段,且不能为null,不支持拓展
  • Row 支持null同时也无限制字段数,但如果需要使用Row,必须重载实现getResultType方法

DataStream=>Table

代码语言:javascript
复制
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

public class TCS002 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        List<Row> list = new ArrayList<Row>();
        list.add(Row.of("张三",18,"男"));
        list.add(Row.of("xiaohong",16,"女"));

        RowTypeInfo rowTypeInfo = getRowTypeInfo(list.get(0));
        DataStream<Row> ds = env.fromCollection(list,rowTypeInfo);
        tabEnv.registerDataStream("table01",ds);
        tabEnv.from("table01").execute().print();
    }

    private static RowTypeInfo getRowTypeInfo(Row row) {
        TypeInformation[] types = new TypeInformation[row.getArity()];
        String[] fieldNames = new String[row.getArity()];
        for (int i = 0; i < row.getArity(); i++) {
            Object field = row.getField(i);
            if(field instanceof Integer){
                types[i] = BasicTypeInfo.INT_TYPE_INFO;
            }else{
                types[i] = BasicTypeInfo.STRING_TYPE_INFO;
            }
            fieldNames[i] = "f"+i;
        }
        return new RowTypeInfo(types, fieldNames);
    }
}

Table=>DataStream

代码语言:javascript
复制
DataStream<Row> ds02 = tabEnv.toAppendStream(tb01, rowTypeInfo);
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-01-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • DataStream=>Table
  • Table=>DataStream
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档