Storm作业转化为Flink作业流程分析

一、 Storm的topology作业可以转化为Flink Job放到Flink上运行,需要修改Storm作业的代码。以wordcount为例,代码修改成可以在Flink上运行的作业后,如下:

public class WordCountTopology {
   static class WordCountSpout extends BaseRichSpout {
      private SpoutOutputCollector collector;
      public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
         this.collector = spoutOutputCollector;
      }
      public void nextTuple() {
         for(String word : WordCountData.WORDS) {
            this.collector.emit(new Values(new String[]{word}));
            Utils.sleep(100);
         }
      }
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields(new String[]{"word"}));
      }
   }
   static class WordCountBolt extends BaseRichBolt {
      private OutputCollector outputCollector;
      private Map<String,Integer> wordCountMap = new ConcurrentHashMap<String, Integer>();
      public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
         this.outputCollector = outputCollector;
      }
      public void execute(Tuple tuple) {
         String words = tuple.getStringByField("word");
         String[] wordArray = words.toLowerCase().split("\\W+");
         for(String word : wordArray) {
            if(!wordCountMap.containsKey(word)){
               wordCountMap.put(word,1);
            }
            else{
               int wordCount = wordCountMap.get(word);
               wordCount = wordCount + 1 ;
               wordCountMap.put(word,wordCount);
            }
         }
      }
      public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
   }
   static class WordCountData {
      public static final String[] WORDS = new String[] {
            "To be, or not to be,--that is the question:--",
            "Whether 'tis nobler in the mind to suffe"
      };
   }
   public static void main(String[] args) throws Exception {
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("spout", new WordCountSpout());
      builder.setBolt("bolt", new WordCountBolt(), 1).shuffleGrouping("spout");
      Config conf = new Config();
      conf.setDebug(false);
      if (args != null && args.length > 0) {
         conf.setNumWorkers(1);
         // -- 代码修改前的使用StormSubmitter提交作业到远端Storm集群
         //StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
         // -- 代码修改后使用FlinkSubmitter提交作业到远端Flink集群
         FlinkSubmitter.submitTopologyWithProgressBar(args[0], conf, FlinkTopology.createTopology(builder));
      } else {
         // --代码修改前使用LocalCluster提交作业到本地运行
         //StormTopology topology = builder.createTopology();
			   //LocalCluster localCluster = new LocalCluster();
			   //localCluster.submitTopology("test",conf,topology);
         // -- 代码修改后使用FlinkLocalCluster提交作业到本地运行
         FlinkLocalCluster cluster = new FlinkLocalCluster();
         cluster.submitTopology("test", conf, FlinkTopology.createTopology(builder));
         Utils.sleep(40000);
         cluster.killTopology("test");
         cluster.shutdown();
      }
   }
}

Storm的作业中会定义Spout,Bolt这些组件对输入数据的处理逻辑:Spout对数据的处理在nextTuple中完成,Bolt对数据的处理在execute中;以及组件之间的grouping规则:包括shuffle_grouping(数据随机分发下游bolt组件),fields_grouping(按照字段取值不同分发到不同bolt组件),all_grouping(数据分发到下游的每一个bolt组件)等;当将其转化为Flink的Job时候,对应组件的数据处理逻辑转化为Flink Job内部的DataSource,Operator等算子的处理逻辑,对应的grouping规则转化为Flink 流式作业的上下游DataSource,Operator等之间数据的分发规则,这个转化操作是通过FlinkTopology.createTopology这一步完成的;

二、Storm topology解析成为Flink job主要是在FlinkTopology这个类中完成的,以上的wordcount作业中是通过createTopology方法进入的,如下:

private FlinkTopology(TopologyBuilder builder) {
   this.builder = builder;
   this.stormTopology = builder.createTopology();
   // extract the spouts and bolts
   this.spouts = getPrivateField("_spouts");
   this.bolts = getPrivateField("_bolts");

   this.env = StreamExecutionEnvironment.getExecutionEnvironment();

   // Kick off the translation immediately
   translateTopology();
}

/**
 * Creates a Flink program that uses the specified spouts and bolts.
 * @param stormBuilder The Storm topology builder to use for creating the Flink topology.
 * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
 */
public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
   return new FlinkTopology(stormBuilder);
}

在以上代码中,首先调用createTopology方法,生成一个FlinkTopology对象,在FlinkTopology构造方法里面调用translateTopology进行作业的拓扑转换,在该方法中完成主要的转换工作。在FlinkToplogy中进行作业转化解析的主要流程如下:

作业转化流程

1. 首先获取Flink流式作业的执行环境,以及Storm作业中定义的Spout,Bolt组件集合;这些都是在FlinkTopology的构造方法中完成,代码如下:

this.spouts = getPrivateField("_spouts");
this.bolts = getPrivateField("_bolts");
this.env = StreamExecutionEnvironment.getExecutionEnvironment();

对于Storm中Spouts,Bolts组件的获取,具体实现方式是通过java反射获取私有变量的方式来完成的,略;

2. 然后完成对Spout组件的解析,挨个遍历Spout组件,使用SpoutWrapper封装起来:spout对象作为一个参数传入到SpoutWrapper,生成一个SpoutWrapper对象,然后再将SpoutWrapper转为Flink job的DatStreamSource对象;Spoutwrapper的代码实现如下:

public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction
...............
private final IRichSpout spout;
@Override
	public final void run(final SourceContext<OUT> ctx) throws Exception {
		final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig()
				.getGlobalJobParameters();
		StormConfig stormConfig = new StormConfig();
		........
		final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext(
				(StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
				this.stormTopology, stormConfig);
	  .............
	  //打开数据源
		this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector));
		this.spout.activate();
		//读取和发送数据
		if (numberOfInvocations == null) {
			if (this.spout instanceof FiniteSpout) {
				final FiniteSpout finiteSpout = (FiniteSpout) this.spout;

				while (this.isRunning && !finiteSpout.reachedEnd()) {
					finiteSpout.nextTuple();
				}
			} else {
				while (this.isRunning) {
					this.spout.nextTuple();
				}
			}
		} else {
			int counter = this.numberOfInvocations;
			if (counter >= 0) {
				while ((--counter >= 0) && this.isRunning) {
					this.spout.nextTuple();
				}
			} else {
				do {
					collector.tupleEmitted = false;
					this.spout.nextTuple();
				} while (collector.tupleEmitted && this.isRunning);
			}
		}
	}

SpoutWrapper类继承了Flink的RichParallelSourceFunction类,该类是实现了Flink的SourceFunction接口,用于连数据源;如上,在SpoutWrapper类中实现SourceFunction的run方法,在该方法中调用了Storm的组件方法进行数据源的连接和读取,发送操作:spout.open(...)进行数据源的连接,通过spout.nextTuple()数据的读取和发送,构造SpoutWrapper时候传入三个参数,构造样例如下:

new SpoutWrapper<String>(new WordCountSpout(),new String[]{Utils.DEFAULT_STREAM_ID})

第一个参数是要进行封装的Spout组件的对象,第二个参数是封装后该组件的数据流的ID;

然后通过env.addSource方法将传入的SpoutWrapper转为为Flink的DataStreamSource对象;

//调用env.addSource方法
DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
      declarer.getOutputType(outputStreamId));
...............
//env.addSource方法的具体实现
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo)
...............
if (function instanceof StoppableFunction) {
			sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
		} else {
			sourceOperator = new StreamSource<>(function);
		}

		return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);

addSource传入三个参数,第一个是实现了SourceFunction的SpoutWrapper,用于实现对数据的处理逻辑,第二个参数是spoutId,对应转化后的DataStreamSource的sourceName,第三个参数是该sourceFunction的数据输出类型;在addSource的具体实现中,直接将传入的SourceFunction参数,即SpoutWrapper对象封装成了一个数据源的操作的operator,然后将其封装到DataStreamSource中返回;

3. 根据以上操作就将Storm作业的spout组件转为成了Flink作业的DataStreamSource了,然后将转化出来的dataStream放入到availableInputs中;其中availableInputs的数据结构定义如下:

HashMap<String, HashMap<String, DataStream<Tuple>>>

第一个参数表示对应的组件Id,第二个参数表示组件对应处理的数据流的Id,第三个参数表示要处理的数据流;

4. 然后进行Bolt组件的解析,对Bolt组件的解析主要完成:1)对上游输入流的解析,转换上游输入与Bolt组件之间的数据分发规则,为Flink的数据分发规则;2)获取Bolt组件数据输出的schema,并构造Bolt组件的数据输出;主要在两个方法中完成呢个:一个是processInput,一个是createOutput;

5. processInput:主要实现获取Bolt组件的输出流的schema,以及对Bolt组件的输入流根据grouping规则进行处理:设置相应的数据流分发规则

private DataStream<Tuple> processInput(String boltId, IRichBolt userBolt,
      GlobalStreamId streamId, Grouping grouping,
      Map<String, DataStream<Tuple>> producer) {
   ...............
   final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
   declarers.put(boltId, declarer);
   userBolt.declareOutputFields(declarer);
   //获取Bolt组件的输出流schema
   this.outputStreams.put(boltId, declarer.outputStreams);
   ..................
   // if producer was processed already
   // 根据grouping规则设置相应的数据分发逻辑
   if (grouping.is_set_shuffle()) {
      // Storm uses a round-robin shuffle strategy
      inputStream = inputStream.rebalance();
   } else if (grouping.is_set_fields()) {
      // global grouping is emulated in Storm via an empty fields grouping list
      final List<String> fields = grouping.get_fields();
      if (fields.size() > 0) {
         FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
         inputStream = inputStream.keyBy(prodDeclarer
               .getGroupingFieldIndexes(inputStreamId,
                     grouping.get_fields()));
      } else {
         inputStream = inputStream.global();
      }
   } else if (grouping.is_set_all()) {
      inputStream = inputStream.broadcast();
   } else if (!grouping.is_set_local_or_shuffle()) {
      throw new UnsupportedOperationException(
            "Flink only supports (local-or-)shuffle, fields, all, and global grouping");
   }
   ......
}

在该段代码中,通过FlinkOutputFieldsDeclarer获取了Bolt组件的输出的schema,在构造数据流输出的时候使用;然后进行了storm作业中的grouping规则与Flink中数据流的操作进行了相似转换:shuffle grouping对应于rebalance操作,将数据流进行随机分发;field grouping对应于keyby操作,按照字段取值进行数据分发;all grouping对应于boardcast操作,将数据分发到每一个下游的operator;以上就实现了输入流到Bolt组件分发规则的变换;

6. createOutput:在这个方法中,主要完成对待解析Bolt组件的三个处理操作,将上游的input的DataStream进行合并;然后构造Bolt组件的OutputStream,并与上游的DataStream连接;最后将Bolt组件解析出来OutputStream作为输入放入到availableInput中,作为下游Bolt组件的输入,并继续下一个Bolt组件的解析;

首先,获取上游的inputStreams,并挨个遍历:如果只有一个输入,则直接转换到singleInputStream中,如果有多个输入,则使用DataStream的Connect操作,将各个输入流进行合并连接,放到mergedInputStream中,代码逻辑实现如下:

//只有一个输入将放在singleInputStream中
DataStream<Tuple> singleInputStream = input1.getValue();
DataStream<StormTuple<Tuple>> mergedInputStream = null;
//得带上游输入
while (iterator.hasNext()) {
   Entry<GlobalStreamId, DataStream<Tuple>> input2 = iterator.next();
   GlobalStreamId streamId2 = input2.getKey();
   DataStream<Tuple> inputStream2 = input2.getValue();
   //如果有多于一个输入流,则进行输入流的合并
   if (mergedInputStream == null) {
      mergedInputStream = singleInputStream
            .connect(inputStream2)
            .flatMap(
                  new TwoFlinkStreamsMerger(streamId1, inputSchema1,
                        streamId2, this.outputStreams.get(
                              streamId2.get_componentId()).get(
                                    streamId2.get_streamId())))
                                    .returns(StormTuple.class);
   } else {
      mergedInputStream = mergedInputStream
            .connect(inputStream2)
            .flatMap(
                  new StormFlinkStreamMerger(streamId2, this.outputStreams.get(
                        streamId2.get_componentId()).get(streamId2.get_streamId())))
                        .returns(StormTuple.class);
   }
}

进行两个输入流的connect操作后,然后在执行一个FlatMap操作,将多个输入流进行合并成一个,然后输出到下游一个flatmap的operator中,并返回对应的数据流,合并操作后的数据流如下所示:

Storm作业转化为Flink作业的拓扑图

然后,根据processInput中获取Bolt组件的输出信息schema判断其的输出的个数,如果是单个输出,则直接使用一个BoltWrapper<Tuple,Tuple>对Y进行封装,表示接收到一个Tuple类型的消息,也同样以Tuple类型转发出去;如果是多个输出,则使用BoltWrapper<Tuple,SplitStreamType<Tuple>>对Y进行封装,表示接受到一个Tuple类型的消息,则进行split到多个下游的Bolt组件的Operator上进行输出;其中BoltWrapper的代码定义如下

public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements 
OneInputStreamOperator<IN, OUT>
.....
@Override
	public void processElement(final StreamRecord<IN> element) throws Exception {
		this.flinkCollector.setTimestamp(element);
		IN value = element.getValue();
		if (this.stormTopology != null) {
			Tuple tuple = (Tuple) value;
			Integer producerTaskId = tuple.getField(tuple.getArity() - 1);
			this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId),
					producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds
					.get(producerTaskId), MessageId.makeUnanchored()));
		} else {
			this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null,
					MessageId.makeUnanchored()));
		}
	}

BoltWrapper继承Flink中的AbstractStreamOperator类,该类是实现了Flink的StreamOperator接口;这样传入的Bolt组件对象封装到BoltWrapper中,就转化为Flink的Operator对象了。在BoltWrapper中实现了StreamOperator的processElement方法,用于对输入元素进行处理:将输入的StreamRecord封装成可以供Bolt组件处理的StormTuple,并通过bolt将该数据分发出去;

将Bolt组件使用BoltWrapper封装后,根据上面构造的singleInputStream,或者mergedInputStream的输入合并,执行transform操作,传入对应的boltId,以及BoltWrapper参数,将其Bolt组件的处理逻辑应用到对应的输入流上,转换成对应的opeator,如下:

final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>(

bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null);

..................................

multiStream = singleInputStream.transform(boltId, outType, boltWrapperMultipleOutputs);

这一步转化操作会将对应的BoltWrapper的参数转为对应的StreamTransformation,并放入到Flink执行环境的transmations变量中,transmations用于生成作业执行的streamGraph;

public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo,
       OneInputStreamOperator<T, R> operator) {
   transformation.getOutputType();
   //operator生成Transform
   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,
         operatorName,operator,outTypeInfo,environment.getParallelism());
   //传入的输入流operator进行转化;
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
   //执行环境的transmations中加入transmation
   getExecutionEnvironment().addOperator(resultTransform);
   return returnStream;
}

最后,将转化出来的outStream放入到availableInputs中,作为解析下一个组件的输入,继续进行下一个组件的解析,直到整个拓扑解析完成;等到整个作业解析完成,则Storm作业中组件将全部转化为Flink的Transmation,放入到执行环境的transmations中,提交作业运行的时候,transmations转化StreamGraph,再转为JobGraph,提交作业后在服务端转为ExecutationGraph执行,从而Storm的整个Topology就转化为了Flink的Job执行了;

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏H2Cloud

Future Pattern

Started: 俗话说一年之计在于春,一天之计在于晨,当我起床的时候,看见表正指向九点钟,十一点下班,十点上班,这是我现在的工作节奏。来北京马上就一个月了,近...

3445
来自专栏智能大石头

线程池ThreadPool及Task调度机制分析

近1年,偶尔发生应用系统启动时某些操作超时的问题,特别在使用4核心Surface以后。笔记本和台式机比较少遇到,服务器则基本上没有遇到过。

680
来自专栏牛客网

深信服一面C++

1012
来自专栏更流畅、简洁的软件开发方式

通过“访问多种数据库”的代码来学习多态!(.net2.0版)

本帖子针对初学者,如果您是老鸟可以略过。 语言环境: asp.net2.0 。数据库没什么了,反正是要到达访问多种数据库的目的,但是语言一定是.net2.0。...

19310
来自专栏程序之美

第七节 netty前传-NIO 几种channel介绍

FileChannel.write()方法将数据写入FileChannel,该方法将Buffer作为参数。

1004
来自专栏小鄧子的技术博客专栏

All RxJava - 为Retrofit添加重试

在我们的日常开发中离不开I/O操作,尤其是网络请求,但并不是所有的请求都是可信赖的,因此我们必须为APP添加请求重试功能。

1121
来自专栏我是攻城师

简述ElasticSearch里面复杂关系数据的存储方式

4357
来自专栏Golang语言社区

Go-简洁的并发

多核处理器越来越普及。有没有一种简单的办法,能够让我们写的软件释放多核的威力?是有的。随着Golang, Erlang, Scala等为并发设计的程序语言的兴起...

39812
来自专栏互联网高可用架构

白话阿里巴巴Java开发手册(异常日志)

1822
来自专栏张善友的专栏

.NET 4 System.Threading.Barrier 类

在Visual Studio 2010 and .NET Framework 4 Training Kit中有个System.Threading.Barrier...

1929

扫码关注云+社区