首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >fiink中datastream转换成table后,流中对象在表中无法使用?

fiink中datastream转换成table后,流中对象在表中无法使用?

提问于 2019-12-31 14:26:13
回答 0关注 0查看 298
代码语言:javascript
复制
目前是用datastream跑数据进行计算,打算升级一下API,切换到table.
问题是stream中通过flatMap解析出的对象转换成table时候,打印schema只有一列(UserCommon对象),无法后续使用tableAPI进行操作;
想问问有什么办法能在stream->table的时候,把对象的成员变量映射成table的列,才好进行后续的计算操作.

//获取Kafka配置
Properties properties = getProperties();
//获取topic解析类
TopicCSVParsers topicCsvParser = new TopicCSVParsers();
//数据样例:asdc|213132|af23|dwqd1|sdfwef3|...  ; | 分隔每个成员变量,每一行字符串解析为一个对象
FlinkKafkaConsumer010<String> consumer010 =
        new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), properties);
consumer010.setCommitOffsetsOnCheckpoints(true);
//UeFunction重写了faltMAp 将字符串转换成对应的UserCommon对象
DataStream<UserCommon> stream2 = env.addSource(consumer010, "ue-source-".concat(topic))//.setParallelism(128)
        .flatMap(new UeFunction(topic, topicCsvParser)).name("ue-flatMap-r_lte_http");//.setParallelism(128);
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.registerDataStream("httpTable", stream2);
Table httpTable = tEnv.scan("httpTable");
httpTable.printSchema();
try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
        *****************************************
    public UeFunction(String topic, ITopicCsvParser topicCsvParser) {
        this.topic = topic;
        this.topicCsvParser = topicCsvParser;
    }
    @Override
    public void flatMap(String record, Collector<UserCommon> collector) throws Exception {
        UserCommon userCommon = (UserCommon) processRecord(topic, record, topicCsvParser);
        if (userCommon == null) {
            return;
        }
        collector.collect(userCommon);
    }
    
    
********************************
控制台打印结果:
    
    root
 |-- f0: GenericType<com.eastcom.pm.stream.data.UserCommon>
 
 仅一列

回答

和开发者交流更多问题细节吧,去 写回答
相关文章

相似问题

相关问答用户
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档