聊聊storm trident的operations

本文主要研究一下storm trident的operations

function filter projection

Function

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.java

public interface Function extends EachOperation {
    /**
     * Performs the function logic on an individual tuple and emits 0 or more tuples.
     *
     * @param tuple The incoming tuple
     * @param collector A collector instance that can be used to emit tuples
     */
    void execute(TridentTuple tuple, TridentCollector collector);
}
  • Function定义了execute方法,它发射的字段会追加到input tuple中

Filter

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Filter.java

public interface Filter extends EachOperation {

    /**
     * Determines if a tuple should be filtered out of a stream
     *
     * @param tuple the tuple being evaluated
     * @return `false` to drop the tuple, `true` to keep the tuple
     */
    boolean isKeep(TridentTuple tuple);
}
  • Filter提供一个isKeep方法,用来决定该tuple是否输出

projection

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
     *
     * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"
     *
     * 
     * mystream.project(new Fields("b", "d"))
     * 
     *
     * would produce a stream containing only the fields `["b", "d"]`.
     *
     *
     * @param keepFields The fields in the Stream to keep
     * @return
     */
    public Stream project(Fields keepFields) {
        projectionValidation(keepFields);
        return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
    }
  • 这里使用了ProjectedProcessor来进行projection操作

repartitioning operations

partition

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * @param partitioner
     * @return
     */
    public Stream partition(CustomStreamGrouping partitioner) {
        return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
    }
  • 这里使用了CustomStreamGrouping

partitionBy

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * @param fields
     * @return
     */
    public Stream partitionBy(Fields fields) {
        projectionValidation(fields);
        return partition(Grouping.fields(fields.toList()));
    }
  • 这里使用Grouping.fields

identityPartition

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * @return
     */
    public Stream identityPartition() {
        return partition(new IdentityGrouping());
    }
  • 这里使用IdentityGrouping

shuffle

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * Use random round robin algorithm to evenly redistribute tuples across all target partitions
     *
     * @return
     */
    public Stream shuffle() {
        return partition(Grouping.shuffle(new NullStruct()));
    }
  • 这里使用Grouping.shuffle

localOrShuffle

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference
     * for local tasks.
     *
     * @return
     */
    public Stream localOrShuffle() {
        return partition(Grouping.local_or_shuffle(new NullStruct()));
    }
  • 这里使用Grouping.local_or_shuffle

global

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
     * @return
     */
    public Stream global() {
        // use this instead of storm's built in one so that we can specify a singleemitbatchtopartition
        // without knowledge of storm's internals
        return partition(new GlobalGrouping());
    }
  • 这里使用GlobalGrouping

batchGlobal

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     *  All tuples in the batch are sent to the same partition. Different batches in the stream may go to different
     *  partitions.
     *
     * @return
     */
    public Stream batchGlobal() {
        // the first field is the batch id
        return partition(new IndexHashGrouping(0));
    }
  • 这里使用IndexHashGrouping,是对整个batch维度的repartition

broadcast

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Repartitioning Operation
     *
     * Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do
     * a stateQuery on every partition of data.
     *
     * @return
     */
    public Stream broadcast() {
        return partition(Grouping.all(new NullStruct()));
    }
  • 这里使用Grouping.all

groupBy

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    /**
     * ## Grouping Operation
     *
     * @param fields
     * @return
     */
    public GroupedStream groupBy(Fields fields) {
        projectionValidation(fields);
        return new GroupedStream(this, fields);
    }
  • 这里返回的是GroupedStream

aggregators

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java

    //partition aggregate
    public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(CombinerAggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .partitionAggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
        return partitionAggregate(null, agg, functionFields);
    }

    public Stream partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .partitionAggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    //aggregate
    public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .aggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
               .aggregate(inputFields, agg, functionFields)
               .chainEnd();
    }

    public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return chainedAgg()
                .aggregate(inputFields, agg, functionFields)
                .chainEnd();
    }

    //persistent aggregate
    public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(spec, null, agg, functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        // replaces normal aggregation here with a global grouping because it needs to be consistent across batches 
        return new ChainedAggregatorDeclarer(this, new GlobalAggScheme())
                .aggregate(inputFields, agg, functionFields)
                .chainEnd()
               .partitionPersist(spec, functionFields, new CombinerAggStateUpdater(agg), functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(spec, null, agg, functionFields);
    }

    public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
    }

    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
        projectionValidation(inputFields);
        return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
    }
  • trident的aggregators主要分为三类,分别是partitionAggregate、aggregate、persistentAggregate;aggregator操作会改变输出
  • partitionAggregate其作用的粒度为每个partition,而非整个batch
  • aggregrate操作作用的粒度为batch,对每个batch,它先使用global操作将该batch的tuple从所有partition合并到一个partition,最后再对batch进行aggregation操作;这里提供了三类参数,分别是Aggregator、CombinerAggregator、ReducerAggregator;调用stream.aggregrate方法时,相当于一次global aggregation,此时使用Aggregator或ReducerAggregator时,stream会先将tuple划分到一个partition,然后再进行aggregate操作;而使用CombinerAggregator时,trident会进行优化,先对每个partition进行局部的aggregate操作,然后再划分到一个partition,最后再进行aggregate操作,因而相对Aggregator或ReducerAggregator可以节省网络传输耗时
  • persistentAggregate操作会对stream上所有batch的tuple进行aggretation,然后将结果存储在state中

Aggregator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Aggregator.java

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T val, TridentTuple tuple, TridentCollector collector);
    void complete(T val, TridentCollector collector);
}
  • Aggregator首先会调用init进行初始化,然后通过参数传递给aggregate以及complete方法
  • 对于batch partition中的每个tuple执行一次aggregate;当batch partition中的tuple执行完aggregate之后执行complete方法
  • 假设自定义Aggregator为累加操作,那么对于[4]、[7]、[8]这批tuple,init为0,对于[4],val=0,0+4=4;对于[7],val=4,4+7=11;对于[8],val=11,11+8=19;然后batch结束,val=19,此时执行complete,可以使用collector发射数据

CombinerAggregator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/CombinerAggregator.java

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}
  • CombinerAggregator每收到一个tuple,就调用init获取当前tuple的值,调用combine操作使用前一个combine的结果(没有的话取zero的值)与init取得的值进行新的combine操作,如果该partition中没有tuple,则返回zero方法的值
  • 假设combine为累加操作,zero返回0,那么对于[4]、[7]、[8]这批tuple,init值分别是4、7、8,对于[4],没有前一个combine结果,于是val1=0,val2=4,combine结果为4;对于[7],val1=4,val2=7,combine结果为11;对于[8],val1为11,val2为8,combine结果为19
  • CombinerAggregator操作的网络开销相对较低,因此性能比其他两类aggratator好

ReducerAggregator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/ReducerAggregator.java

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}
  • ReducerAggregator在对一批tuple进行计算时,先调用一次init获取初始值,然后再执行reduce操作,curr值为前一次reduce操作的值,没有的话,就是init值
  • 假设reduce为累加操作,init返回0,那么对于[4]、[7]、[8]这批tuple,对于[4],init为0,然后curr=0,先是0+4=4;对于[7],curr为4,就是4+7=11;对于[8],curr为11,最后就是11+8=19

topology stream operations

join

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);        
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
        return join(streams, joinFields, outFields, JoinType.INNER);        
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);        
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
        return join(streams, joinFields, outFields, repeat(streams.size(), type));
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);

    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
        return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
        return join(streams, joinFields, outFields, JoinType.INNER, mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
        return join(streams, joinFields, outFields, repeat(streams.size(), type), mode);
    }

    public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);

    }

    public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
        switch (mode) {
            case COMPACT:
                return multiReduce(strippedInputFields(streams, joinFields),
                        groupedStreams(streams, joinFields),
                        new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
                        outFields);

            case PRESERVE:
                return multiReduce(strippedInputFields(streams, joinFields),
                        groupedStreams(streams, joinFields),
                        new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(),
                                getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)),
                        outFields);

            default:
                throw new IllegalArgumentException("Unsupported out-fields mode: " + mode);
        }
    }
  • 可以看到join最后调用了multiReduce,对于COMPACT类型使用的GroupedMultiReducer是JoinerMultiReducer,对于PRESERVE类型使用的GroupedMultiReducer是PreservingFieldsOrderJoinerMultiReducer

merge

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

    public Stream merge(Fields outputFields, Stream... streams) {
        return merge(outputFields, Arrays.asList(streams));
    }

    public Stream merge(Stream... streams) {
        return merge(Arrays.asList(streams));
    }

    public Stream merge(List<Stream> streams) {
        return merge(streams.get(0).getOutputFields(), streams);
    } 

    public Stream merge(Fields outputFields, List<Stream> streams) {
        return multiReduce(streams, new IdentityMultiReducer(), outputFields);
    }
  • 可以看到merge最后是调用了multiReduce,使用的MultiReducer是IdentityMultiReducer

multiReduce

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

    public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
    }

    public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
    }    

    public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(s1, s2), function, outputFields);        
    }

    public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);        
    } 

    public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
    }

    public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
        return multiReduce(getAllOutputFields(streams), streams, function, outputFields);        
    }    

    public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
        List<Fields> fullInputFields = new ArrayList<>();
        List<Stream> streams = new ArrayList<>();
        List<Fields> fullGroupFields = new ArrayList<>();
        for(int i=0; i<groupedStreams.size(); i++) {
            GroupedStream gs = groupedStreams.get(i);
            Fields groupFields = gs.getGroupFields();
            fullGroupFields.add(groupFields);
            streams.add(gs.toStream().partitionBy(groupFields));
            fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));

        }
        return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
    }

    public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
        List<String> names = new ArrayList<>();
        for(Stream s: streams) {
            if(s._name!=null) {
                names.add(s._name);
            }
        }
        Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
        return addSourcedNode(streams, n);
    }
  • multiReduce方法有个MultiReducer参数,join与merge虽然都调用了multiReduce,但是他们传的MultiReducer值不一样

小结

  • trident的操作主要有几类,一类是基本的function、filter、projection操作;一类是repartitioning操作,主要是一些grouping;一类是aggregate操作,包括aggregate、partitionAggregate、persistentAggregate;一类是在topology对stream的join、merge操作
  • function的话,若有emit字段会追加到原始的tuple上;filter用于过滤tuple;projection用于提取字段
  • repartitioning操作有Grouping.local_or_shuffle、Grouping.shuffle、Grouping.all、GlobalGrouping、CustomStreamGrouping、IdentityGrouping、IndexHashGrouping等;partition操作可以理解为将输入的tuple分配到task上,也可以理解为是对stream进行grouping
  • aggregate操作的话,普通的aggregate操作有3类接口,分别是Aggregator、CombinerAggregator、ReducerAggregator,其中Aggregator是最为通用的,它继承了Operation接口,而且在方法参数里头可以使用到collector,这是CombinerAggregator与ReducerAggregator所没有的;而CombinerAggregator与Aggregator及ReducerAggregator不同的是,调用stream.aggregrate方法时,trident会优先在partition进行局部聚合,然后再归一到一个partition做最后聚合,相对来说比较节省网络传输耗时,但是如果将CombinerAggregator与非CombinerAggregator的进行chaining的话,就享受不到这个优化;partitionAggregate主要是在partition维度上进行操作;而persistentAggregate则是在整个stream的维度上对所有batch的tuple进行操作,结果持久化在state上
  • 对于stream的join及merge操作,其最后都是依赖multiReduce来实现,只是传递的MultiReducer值不一样;join的话join的话需要字段来进行匹配(字段名可以不一样),可以选择JoinType,是INNER还是OUTER,不过join是对于spout的small batch来进行join的;merge的话,就是纯粹的几个stream进行tuple的归总。

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏函数式编程语言及工具

Scalaz(14)- Monad:函数组合-Kleisli to Reader

  Monad Reader就是一种函数的组合。在scalaz里函数(function)本身就是Monad,自然也就是Functor和applicative。我...

2005
来自专栏HansBug's Lab

3097: Hash Killer I

3097: Hash Killer I Time Limit: 5 Sec  Memory Limit: 128 MBSec  Special Judge Su...

2354
来自专栏Java帮帮-微信公众号-技术文章全总结

【编程题】Java编程题二(10道)

【编程题】Java编程题二(10道) 【程序11】 题目:有1、2、3、4四个数字,能组成多少个互不相同且无重复数字的三位数?都是多少? public c...

4029
来自专栏HansBug's Lab

1622: [Usaco2008 Open]Word Power 名字的能量

1622: [Usaco2008 Open]Word Power 名字的能量 Time Limit: 5 Sec  Memory Limit: 64 MB Su...

2714
来自专栏码匠的流水账

聊聊flink的BoltWrapper

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper....

1124
来自专栏mathor

二分查找与二分答案(3)

1704
来自专栏码匠的流水账

聊聊flink的SpoutWrapper

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper...

1072
来自专栏刘笑江的专栏

通过Swift学函数式编程

在文章SWIFT IS A LOT LIKE SCALA [1] 提到Swift和Scala有很大的相似之处,在某些特性甚至比Scala对函数式编程的支持更友好...

1695
来自专栏SeanCheney的专栏

《Pandas Cookbook》第07章 分组聚合、过滤、转换1. 定义聚合2. 用多个列和函数进行分组和聚合3. 分组后去除多级索引4. 自定义聚合函数5. 用 *args 和 **kwargs

第01章 Pandas基础 第02章 DataFrame运算 第03章 数据分析入门 第04章 选取数据子集 第05章 布尔索引 第06章 索引对齐 ...

4312
来自专栏专注研发

poj-1503-java大数相加

One of the first users of BIT's new supercomputer was Chip Diller. He extended h...

1461

扫码关注云+社区

领取腾讯云代金券