本文主要研究一下storm trident的operations
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Function.java
public interface Function extends EachOperation {
/**
* Performs the function logic on an individual tuple and emits 0 or more tuples.
*
* @param tuple The incoming tuple
* @param collector A collector instance that can be used to emit tuples
*/
void execute(TridentTuple tuple, TridentCollector collector);
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Filter.java
public interface Filter extends EachOperation {
/**
* Determines if a tuple should be filtered out of a stream
*
* @param tuple the tuple being evaluated
* @return `false` to drop the tuple, `true` to keep the tuple
*/
boolean isKeep(TridentTuple tuple);
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
*
* For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"
*
*
* mystream.project(new Fields("b", "d"))
*
*
* would produce a stream containing only the fields `["b", "d"]`.
*
*
* @param keepFields The fields in the Stream to keep
* @return
*/
public Stream project(Fields keepFields) {
projectionValidation(keepFields);
return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* @param partitioner
* @return
*/
public Stream partition(CustomStreamGrouping partitioner) {
return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* @param fields
* @return
*/
public Stream partitionBy(Fields fields) {
projectionValidation(fields);
return partition(Grouping.fields(fields.toList()));
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* @return
*/
public Stream identityPartition() {
return partition(new IdentityGrouping());
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* Use random round robin algorithm to evenly redistribute tuples across all target partitions
*
* @return
*/
public Stream shuffle() {
return partition(Grouping.shuffle(new NullStruct()));
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference
* for local tasks.
*
* @return
*/
public Stream localOrShuffle() {
return partition(Grouping.local_or_shuffle(new NullStruct()));
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
* @return
*/
public Stream global() {
// use this instead of storm's built in one so that we can specify a singleemitbatchtopartition
// without knowledge of storm's internals
return partition(new GlobalGrouping());
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* All tuples in the batch are sent to the same partition. Different batches in the stream may go to different
* partitions.
*
* @return
*/
public Stream batchGlobal() {
// the first field is the batch id
return partition(new IndexHashGrouping(0));
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Repartitioning Operation
*
* Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do
* a stateQuery on every partition of data.
*
* @return
*/
public Stream broadcast() {
return partition(Grouping.all(new NullStruct()));
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
/**
* ## Grouping Operation
*
* @param fields
* @return
*/
public GroupedStream groupBy(Fields fields) {
projectionValidation(fields);
return new GroupedStream(this, fields);
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/Stream.java
//partition aggregate
public Stream partitionAggregate(Aggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
public Stream partitionAggregate(CombinerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
public Stream partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
}
public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
return partitionAggregate(null, agg, functionFields);
}
public Stream partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
}
//aggregate
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}
public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}
public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
}
//persistent aggregate
public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(spec, null, agg, functionFields);
}
public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
// replaces normal aggregation here with a global grouping because it needs to be consistent across batches
return new ChainedAggregatorDeclarer(this, new GlobalAggScheme())
.aggregate(inputFields, agg, functionFields)
.chainEnd()
.partitionPersist(spec, functionFields, new CombinerAggStateUpdater(agg), functionFields);
}
public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(spec, null, agg, functionFields);
}
public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
projectionValidation(inputFields);
return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/Aggregator.java
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T val, TridentTuple tuple, TridentCollector collector);
void complete(T val, TridentCollector collector);
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/CombinerAggregator.java
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
没有的话取zero的值
)与init取得的值进行新的combine操作,如果该partition中没有tuple,则返回zero方法的值storm-core-1.2.2-sources.jar!/org/apache/storm/trident/operation/ReducerAggregator.java
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
return join(streams, joinFields, outFields, JoinType.INNER);
}
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
return join(streams, joinFields, outFields, repeat(streams.size(), type));
}
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
return join(streams, joinFields, outFields, mixed, JoinOutFieldsMode.COMPACT);
}
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinOutFieldsMode mode) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mode);
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinOutFieldsMode mode) {
return join(streams, joinFields, outFields, JoinType.INNER, mode);
}
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type, mode);
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type, JoinOutFieldsMode mode) {
return join(streams, joinFields, outFields, repeat(streams.size(), type), mode);
}
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed, mode);
}
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed, JoinOutFieldsMode mode) {
switch (mode) {
case COMPACT:
return multiReduce(strippedInputFields(streams, joinFields),
groupedStreams(streams, joinFields),
new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
outFields);
case PRESERVE:
return multiReduce(strippedInputFields(streams, joinFields),
groupedStreams(streams, joinFields),
new PreservingFieldsOrderJoinerMultiReducer(mixed, joinFields.get(0).size(),
getAllOutputFields(streams), joinFields, strippedInputFields(streams, joinFields)),
outFields);
default:
throw new IllegalArgumentException("Unsupported out-fields mode: " + mode);
}
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream merge(Fields outputFields, Stream... streams) {
return merge(outputFields, Arrays.asList(streams));
}
public Stream merge(Stream... streams) {
return merge(Arrays.asList(streams));
}
public Stream merge(List<Stream> streams) {
return merge(streams.get(0).getOutputFields(), streams);
}
public Stream merge(Fields outputFields, List<Stream> streams) {
return multiReduce(streams, new IdentityMultiReducer(), outputFields);
}
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java
public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
}
public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
}
public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
}
public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
}
public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
}
public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
}
public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
List<Fields> fullInputFields = new ArrayList<>();
List<Stream> streams = new ArrayList<>();
List<Fields> fullGroupFields = new ArrayList<>();
for(int i=0; i<groupedStreams.size(); i++) {
GroupedStream gs = groupedStreams.get(i);
Fields groupFields = gs.getGroupFields();
fullGroupFields.add(groupFields);
streams.add(gs.toStream().partitionBy(groupFields));
fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
}
return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
}
public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
List<String> names = new ArrayList<>();
for(Stream s: streams) {
if(s._name!=null) {
names.add(s._name);
}
}
Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
return addSourcedNode(streams, n);
}
字段名可以不一样
),可以选择JoinType,是INNER还是OUTER,不过join是对于spout的small batch来进行join的;merge的话,就是纯粹的几个stream进行tuple的归总。