聊聊flink如何兼容StormTopology

本文主要研究一下flink如何兼容StormTopology

实例

    @Test
    public void testStormWordCount() throws Exception {
        //NOTE 1 build Topology the Storm way
        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomWordSpout(), 1);
        builder.setBolt("count", new WordCountBolt(), 5)
                .fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1)
                .shuffleGrouping("count");
​
        //NOTE 2 convert StormTopology to FlinkTopology
        FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);
​
        //NOTE 3 execute program locally using FlinkLocalCluster
        Config conf = new Config();
        // only required to stabilize integration test
        conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);
​
        final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        cluster.submitTopology("stormWordCount", conf, flinkTopology);
        cluster.shutdown();
    }
  • 这里使用FlinkLocalCluster.getLocalCluster()来创建或获取FlinkLocalCluster,之后调用FlinkLocalCluster.submitTopology来提交topology,结束时通过FlinkLocalCluster.shutdown来关闭cluster
  • 这里构建的RandomWordSpout继承自storm的BaseRichSpout,WordCountBolt继承自storm的BaseBasicBolt;PrintBolt继承自storm的BaseRichBolt(由于flink是使用的Checkpoint机制,不会转换storm的ack操作,因而这里用BaseBasicBolt还是BaseRichBolt都无特别要求)
  • FlinkLocalCluster.submitTopology这里使用的topology是StormTopoloy转换后的FlinkTopology

LocalClusterFactory

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java

    // ------------------------------------------------------------------------
    //  Access to default local cluster
    // ------------------------------------------------------------------------
​
    // A different {@link FlinkLocalCluster} to be used for execution of ITCases
    private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
​
    /**
     * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
     * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
     *
     * @return a {@link FlinkLocalCluster} to be used for execution
     */
    public static FlinkLocalCluster getLocalCluster() {
        return currentFactory.createLocalCluster();
    }
​
    /**
     * Sets a different factory for FlinkLocalClusters to be used for execution.
     *
     * @param clusterFactory
     *      The LocalClusterFactory to create the local clusters for execution.
     */
    public static void initialize(LocalClusterFactory clusterFactory) {
        currentFactory = Objects.requireNonNull(clusterFactory);
    }
​
    // ------------------------------------------------------------------------
    //  Cluster factory
    // ------------------------------------------------------------------------
​
    /**
     * A factory that creates local clusters.
     */
    public interface LocalClusterFactory {
​
        /**
         * Creates a local Flink cluster.
         * @return A local Flink cluster.
         */
        FlinkLocalCluster createLocalCluster();
    }
​
    /**
     * A factory that instantiates a FlinkLocalCluster.
     */
    public static class DefaultLocalClusterFactory implements LocalClusterFactory {
​
        @Override
        public FlinkLocalCluster createLocalCluster() {
            return new FlinkLocalCluster();
        }
    }
  • flink在FlinkLocalCluster里头提供了一个静态方法getLocalCluster,用来获取FlinkLocalCluster,它是通过LocalClusterFactory来创建一个FlinkLocalCluster
  • LocalClusterFactory这里使用的是DefaultLocalClusterFactory实现类,它的createLocalCluster方法,直接new了一个FlinkLocalCluster
  • 目前的实现来看,每次调用FlinkLocalCluster.getLocalCluster,都会创建一个新的FlinkLocalCluster,这个在调用的时候是需要注意一下的

FlinkTopology

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java

    /**
     * Creates a Flink program that uses the specified spouts and bolts.
     * @param stormBuilder The Storm topology builder to use for creating the Flink topology.
     * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
     */
    public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
        return new FlinkTopology(stormBuilder);
    }
​
    private FlinkTopology(TopologyBuilder builder) {
        this.builder = builder;
        this.stormTopology = builder.createTopology();
        // extract the spouts and bolts
        this.spouts = getPrivateField("_spouts");
        this.bolts = getPrivateField("_bolts");
​
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        // Kick off the translation immediately
        translateTopology();
    }
  • FlinkTopology提供了一个静态工厂方法createTopology用来创建FlinkTopology
  • FlinkTopology先保存一下TopologyBuilder,然后通过getPrivateField反射调用getDeclaredField获取spouts、bolts私有属性然后保存起来,方便后面转换topology使用
  • 之后先获取到ExecutionEnvironment,最后就是调用translateTopology进行整个StormTopology的转换

translateTopology

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java

    /**
     * Creates a Flink program that uses the specified spouts and bolts.
     */
    private void translateTopology() {
​
        unprocessdInputsPerBolt.clear();
        outputStreams.clear();
        declarers.clear();
        availableInputs.clear();
​
        // Storm defaults to parallelism 1
        env.setParallelism(1);
​
        /* Translation of topology */
​
        for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
            final String spoutId = spout.getKey();
            final IRichSpout userSpout = spout.getValue();
​
            final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
            userSpout.declareOutputFields(declarer);
            final HashMap<String, Fields> sourceStreams = declarer.outputStreams;
            this.outputStreams.put(spoutId, sourceStreams);
            declarers.put(spoutId, declarer);
​
            final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
            final DataStreamSource<?> source;
​
            if (sourceStreams.size() == 1) {
                final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
                spoutWrapperSingleOutput.setStormTopology(stormTopology);
​
                final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
​
                DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
                        declarer.getOutputType(outputStreamId));
​
                outputStreams.put(outputStreamId, src);
                source = src;
            } else {
                final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
                        userSpout, spoutId, null, null);
                spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
​
                @SuppressWarnings({ "unchecked", "rawtypes" })
                DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
                        spoutWrapperMultipleOutputs, spoutId,
                        (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));
​
                SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
                        .split(new StormStreamSelector<Tuple>());
                for (String streamId : sourceStreams.keySet()) {
                    SingleOutputStreamOperator<Tuple> outStream = splitSource.select(streamId)
                            .map(new SplitStreamMapper<Tuple>());
                    outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
                    outputStreams.put(streamId, outStream);
                }
                source = multiSource;
            }
            availableInputs.put(spoutId, outputStreams);
​
            final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
            if (common.is_set_parallelism_hint()) {
                int dop = common.get_parallelism_hint();
                source.setParallelism(dop);
            } else {
                common.set_parallelism_hint(1);
            }
        }
​
        /**
         * 1. Connect all spout streams with bolts streams
         * 2. Then proceed with the bolts stream already connected
         *
         * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before
         * its producer
         * ->thus, we might need to repeat multiple times
         */
        boolean makeProgress = true;
        while (bolts.size() > 0) {
            if (!makeProgress) {
                StringBuilder strBld = new StringBuilder();
                strBld.append("Unable to build Topology. Could not connect the following bolts:");
                for (String boltId : bolts.keySet()) {
                    strBld.append("\n  ");
                    strBld.append(boltId);
                    strBld.append(": missing input streams [");
                    for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt
                            .get(boltId)) {
                        strBld.append("'");
                        strBld.append(streams.getKey().get_streamId());
                        strBld.append("' from '");
                        strBld.append(streams.getKey().get_componentId());
                        strBld.append("'; ");
                    }
                    strBld.append("]");
                }
​
                throw new RuntimeException(strBld.toString());
            }
            makeProgress = false;
​
            final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
            while (boltsIterator.hasNext()) {
​
                final Entry<String, IRichBolt> bolt = boltsIterator.next();
                final String boltId = bolt.getKey();
                final IRichBolt userBolt = copyObject(bolt.getValue());
​
                final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
​
                Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
                if (unprocessedBoltInputs == null) {
                    unprocessedBoltInputs = new HashSet<>();
                    unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
                    unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
                }
​
                // check if all inputs are available
                final int numberOfInputs = unprocessedBoltInputs.size();
                int inputsAvailable = 0;
                for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
                    final String producerId = entry.getKey().get_componentId();
                    final String streamId = entry.getKey().get_streamId();
                    final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
                    if (streams != null && streams.get(streamId) != null) {
                        inputsAvailable++;
                    }
                }
​
                if (inputsAvailable != numberOfInputs) {
                    // traverse other bolts first until inputs are available
                    continue;
                } else {
                    makeProgress = true;
                    boltsIterator.remove();
                }
​
                final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);
​
                for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
                    final GlobalStreamId streamId = input.getKey();
                    final Grouping grouping = input.getValue();
​
                    final String producerId = streamId.get_componentId();
​
                    final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId);
​
                    inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));
                }
​
                final SingleOutputStreamOperator<?> outputStream = createOutput(boltId,
                        userBolt, inputStreams);
​
                if (common.is_set_parallelism_hint()) {
                    int dop = common.get_parallelism_hint();
                    outputStream.setParallelism(dop);
                } else {
                    common.set_parallelism_hint(1);
                }
​
            }
        }
    }
  • 整个转换是先转换spout,再转换bolt,他们根据的spouts及bolts信息是在构造器里头使用反射从storm的TopologyBuilder对象获取到的
  • flink使用FlinkOutputFieldsDeclarer(它实现了storm的OutputFieldsDeclarer接口)来承载storm的IRichSpout及IRichBolt里头配置的declareOutputFields信息,不过要注意的是flink不支持dirct emit;这里通过userSpout.declareOutputFields方法,将原始spout的declare信息设置到FlinkOutputFieldsDeclarer
  • flink使用SpoutWrapper来包装spout,将其转换为RichParallelSourceFunction类型,这里对spout的outputStreams的个数是否大于1进行不同处理;之后就是将RichParallelSourceFunction作为StreamExecutionEnvironment.addSource方法的参数创建flink的DataStreamSource,并添加到availableInputs中,然后根据spout的parallelismHit来设置DataStreamSource的parallelism
  • 对于bolt的转换,这里维护了unprocessdInputsPerBolt,key为boltId,value为该bolt要连接的GlobalStreamId及Grouping方式,由于是使用map来进行遍历的,因此转换的bolt可能乱序,如果连接的GlobalStreamId存在则进行转换,然后从bolts中移除,bolt连接的GlobalStreamId不在availableInputs中的时候,需要跳过处理下一个,不会从bolts中移除,因为外层的循环条件是bolts的size大于0,就是依靠这个机制来处理乱序
  • 对于bolt的转换有一个重要的方法就是processInput,它把bolt的grouping转换为对spout的DataStream的对应操作(比如shuffleGrouping转换为对DataStream的rebalance操作,fieldsGrouping转换为对DataStream的keyBy操作,globalGrouping转换为global操作,allGrouping转换为broadcast操作),之后调用createOutput方法转换bolt的执行逻辑,它使用BoltWrapper或者MergedInputsBoltWrapper将bolt转换为flink的OneInputStreamOperator,然后作为参数对stream进行transform操作返回flink的SingleOutputStreamOperator,同时将转换后的SingleOutputStreamOperator添加到availableInputs中,之后根据bolt的parallelismHint对这个SingleOutputStreamOperator设置parallelism

FlinkLocalCluster

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/api/FlinkLocalCluster.java

/**
 * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
 */
public class FlinkLocalCluster {
​
    /** The log used by this mini cluster. */
    private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
​
    /** The Flink mini cluster on which to execute the programs. */
    private FlinkMiniCluster flink;
​
    /** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */
    public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING";
​
    public FlinkLocalCluster() {
    }
​
    public FlinkLocalCluster(FlinkMiniCluster flink) {
        this.flink = Objects.requireNonNull(flink);
    }
​
    @SuppressWarnings("rawtypes")
    public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
            throws Exception {
        this.submitTopologyWithOpts(topologyName, conf, topology, null);
    }
​
    @SuppressWarnings("rawtypes")
    public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
        LOG.info("Running Storm topology on FlinkLocalCluster");
​
        boolean submitBlocking = false;
        if (conf != null) {
            Object blockingFlag = conf.get(SUBMIT_BLOCKING);
            if (blockingFlag instanceof Boolean) {
                submitBlocking = ((Boolean) blockingFlag).booleanValue();
            }
        }
​
        FlinkClient.addStormConfigToTopology(topology, conf);
​
        StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
        streamGraph.setJobName(topologyName);
​
        JobGraph jobGraph = streamGraph.getJobGraph();
​
        if (this.flink == null) {
            Configuration configuration = new Configuration();
            configuration.addAll(jobGraph.getJobConfiguration());
​
            configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
            configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
​
            this.flink = new LocalFlinkMiniCluster(configuration, true);
            this.flink.start();
        }
​
        if (submitBlocking) {
            this.flink.submitJobAndWait(jobGraph, false);
        } else {
            this.flink.submitJobDetached(jobGraph);
        }
    }
​
    public void killTopology(final String topologyName) {
        this.killTopologyWithOpts(topologyName, null);
    }
​
    public void killTopologyWithOpts(final String name, final KillOptions options) {
    }
​
    public void activate(final String topologyName) {
    }
​
    public void deactivate(final String topologyName) {
    }
​
    public void rebalance(final String name, final RebalanceOptions options) {
    }
​
    public void shutdown() {
        if (this.flink != null) {
            this.flink.stop();
            this.flink = null;
        }
    }
​
    //......
}
  • FlinkLocalCluster的submitTopology方法调用了submitTopologyWithOpts,而后者主要是设置一些参数,调用topology.getExecutionEnvironment().getStreamGraph()根据transformations生成StreamGraph,再获取JobGraph,然后创建LocalFlinkMiniCluster并start,最后使用LocalFlinkMiniCluster的submitJobAndWait或submitJobDetached来提交整个JobGraph

小结

  • flink通过FlinkTopology对storm提供了一定的兼容性,这对于迁移storm到flink非常有帮助
  • 要在flink上运行storm的topology,主要有几个步骤,分别是构建storm原生的TopologyBuilder,之后通过FlinkTopology.createTopology(builder)来将StormTopology转换为FlinkTopology,最后是通过FlinkLocalCluster(本地模式)或者FlinkSubmitter(远程提交)的submitTopology方法提交FlinkTopology
  • FlinkTopology是flink兼容storm的核心,它负责将StormTopology转换为flink对应的结构,比如使用SpoutWrapper将spout转换为RichParallelSourceFunction,然后添加到StreamExecutionEnvironment创建DataStream,把bolt的grouping转换为对spout的DataStream的对应操作(比如shuffleGrouping转换为对DataStream的rebalance操作,fieldsGrouping转换为对DataStream的keyBy操作,globalGrouping转换为global操作,allGrouping转换为broadcast操作),然后使用BoltWrapper或者MergedInputsBoltWrapper将bolt转换为flink的OneInputStreamOperator,然后作为参数对stream进行transform操作
  • 构建完FlinkTopology之后,就使用FlinkLocalCluster提交到本地执行,或者使用FlinkSubmitter提交到远程执行
  • FlinkLocalCluster的submitTopology方法主要是通过FlinkTopology作用的StreamExecutionEnvironment生成StreamGraph,通过它获取JobGraph,然后创建LocalFlinkMiniCluster并start,最后通过LocalFlinkMiniCluster提交JobGraph

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏玄魂工作室

Python黑帽编程2.1 Python编程哲学

本节的内容有些趣味性,涉及到很多人为什么会选择Python,为什么会喜欢这门语言。我带大家膜拜下Python作者的Python之禅,然后再来了解下Python的...

34770
来自专栏写代码的海盗

分水岭 golang入坑系列

第三式开篇语有些负面, 所以这里就不贴了。有兴趣的自己可以去看看 https://andy-zhangtao.gitbooks.io/golang/conten...

40240
来自专栏张善友的专栏

Immutable(不可变)集合

不可变集合,顾名思义就是说集合是不可被修改的。集合的数据项是在创建的时候提供,并且在整个生命周期中都不可改变。 为什么要用immutable对象?immutab...

22560
来自专栏java学习

Java每日一题3_关于Java

ArrayList:有序,可重复;底层使用数组,查询快,增删慢;线程不安全,效率高;容量不足时扩增为当前容量*1.5 + 1;

10310
来自专栏码匠的流水账

聊聊flink如何兼容StormTopology

flink-release-1.6.2/flink-contrib/flink-storm/src/main/java/org/apache/flink/sto...

16030
来自专栏菩提树下的杨过

scala 学习笔记(03) 参数缺省值、不定个数参数、类的属性(Property)、泛型初步

继续学习,这一篇主要是通过scala来吐槽java的,同样是jvm上的语言,差距咋就这么大呢? 作为一个有.NET开发经验的程序员,当初刚接触java时,相信很...

32060
来自专栏工科狗和生物喵

【我的漫漫跨考路】数据结构·队列的链表实现

? 正文之前 今天看无穷级数这个数学内容实在看得头疼,索性看到八点多就不看了。愉快的写起了码,对我来说这个可有趣了!虽然有时候莫名其妙的就会Run succe...

29250
来自专栏Java技术栈

跟我学 Java 8 新特性之 Stream 流(五)映射

经过了前面四篇文章的学习,相信大家对Stream流已经是相当的熟悉了,同时也掌握了一些高级功能了,如果你之前有阅读过集合框架的基石 Collection 接口,...

10420
来自专栏工科狗和生物喵

对菜鸟教程的Python一百例的个别改进

好吧,其实是小妹子Python公选课结课,所以我来帮忙做个大作业(简单到哭的大作业好吗?)!她的大作业就是老师把菜鸟教程的Python一百例扒下来做成文档,然后...

48960
来自专栏程序员的SOD蜜

JavaScript的“原型甘露”

今天跟朋友讨论JS的面向对象编程问题,想起了原来曾经看过一篇文章,但是看过很久想不起来了,用了很多关键词,终于用“悟透JavaScript  面向对象”这两个关...

28380

扫码关注云+社区

领取腾讯云代金券