前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink sql fromDataSet fromDataStream 使用row

flink sql fromDataSet fromDataStream 使用row

原创
作者头像
stys35
发布2019-03-23 14:54:04
6.7K0
发布2019-03-23 14:54:04
举报
文章被收录于专栏:工作笔记精华工作笔记精华

源代码

代码语言:txt
复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.getConfig().disableSysoutLogging();
    StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
    DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() {
        private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}";
        private long count = 0L;
        private volatile boolean isRunning = true;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (isRunning && count<2){
                synchronized (ctx.getCheckpointLock()){
                    ctx.collect(str1);
                    count++;
                }
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    });
    DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
        @Override
        public JsonNode map(String s) throws Exception {
            ObjectMapper objectMapper = new ObjectMapper();
            JsonNode node = objectMapper.readTree(s);
            return node;
        }
    });
    DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
        @Override
        public Row map(JsonNode jsonNode) throws Exception {
            int pos = 0;
            Row row = new Row(jsonNode.size());
            Iterator<String> iterator = jsonNode.fieldNames();
            while (iterator.hasNext()){
                String key = iterator.next();
                row.setField(pos,jsonNode.get(key).asText());
                pos++;
            }

            return row;
        }


    });

    dataStreamRow.addSink(new SinkFunction<Row>() {
        @Override
        public void invoke(Row value) throws Exception {
            System.out.println(value.getField(0));
        }
    });

    Table myTable = tableEnvironment.fromDataStream(dataStreamRow);

    Table result = myTable.select("f0");

    DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class);

    dataStreamResult.print();

    environment.execute();
}

运行时报错

提示org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.

处理

代码语言:txt
复制
DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
        ...
    }).returns(new RowTypeInfo(Types.STRING, Types.STRING, ...)); // Add as many fields as your Row has

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档