目前是用datastream跑数据进行计算,打算升级一下API,切换到table.
问题是stream中通过flatMap解析出的对象转换成table时候,打印schema只有一列(UserCommon对象),无法后续使用tableAPI进行操作;
想问问有什么办法能在stream->table的时候,把对象的成员变量映射成table的列,才好进行后续的计算操作.
//获取Kafka配置
Properties properties = getProperties();
//获取topic解析类
TopicCSVParsers topicCsvParser = new TopicCSVParsers();
//数据样例:asdc|213132|af23|dwqd1|sdfwef3|... ; | 分隔每个成员变量,每一行字符串解析为一个对象
FlinkKafkaConsumer010<String> consumer010 =
new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), properties);
consumer010.setCommitOffsetsOnCheckpoints(true);
//UeFunction重写了faltMAp 将字符串转换成对应的UserCommon对象
DataStream<UserCommon> stream2 = env.addSource(consumer010, "ue-source-".concat(topic))//.setParallelism(128)
.flatMap(new UeFunction(topic, topicCsvParser)).name("ue-flatMap-r_lte_http");//.setParallelism(128);
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.registerDataStream("httpTable", stream2);
Table httpTable = tEnv.scan("httpTable");
httpTable.printSchema();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
*****************************************
public UeFunction(String topic, ITopicCsvParser topicCsvParser) {
this.topic = topic;
this.topicCsvParser = topicCsvParser;
}
@Override
public void flatMap(String record, Collector<UserCommon> collector) throws Exception {
UserCommon userCommon = (UserCommon) processRecord(topic, record, topicCsvParser);
if (userCommon == null) {
return;
}
collector.collect(userCommon);
}
********************************
控制台打印结果:
root
|-- f0: GenericType<com.eastcom.pm.stream.data.UserCommon>
仅一列
相似问题