聊聊storm的CustomStreamGrouping

本文主要研究一下storm的CustomStreamGrouping

CustomStreamGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java

public interface CustomStreamGrouping extends Serializable {

    /**
     * Tells the stream grouping at runtime the tasks in the target bolt. This information should be used in chooseTasks to determine the
     * target tasks.
     *
     * It also tells the grouping the metadata on the stream this grouping will be used on.
     */
    void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

    /**
     * This function implements a custom stream grouping. It takes in as input the number of tasks in the target bolt in prepare and returns
     * the tasks to send the tuples to.
     *
     * @param values the values to group on
     */
    List<Integer> chooseTasks(int taskId, List<Object> values);
}
  • 这里定义了prepare以及chooseTasks方法
  • GrouperFactory里头定义了FieldsGrouper、GlobalGrouper、NoneGrouper、AllGrouper、BasicLoadAwareCustomStreamGrouping
  • 另外org.apache.storm.grouping包里头也定义了ShuffleGrouping、PartialKeyGrouping、LoadAwareShuffleGrouping

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class FieldsGrouper implements CustomStreamGrouping {

        private Fields outFields;
        private List<List<Integer>> targetTasks;
        private Fields groupFields;
        private int numTasks;

        public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
            this.outFields = outFields;
            this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));

        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = new ArrayList<List<Integer>>();
            for (Integer targetTask : targetTasks) {
                this.targetTasks.add(Collections.singletonList(targetTask));
            }
            this.numTasks = targetTasks.size();
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
            return targetTasks.get(targetTaskIndex);
        }

    }
  • 对选中fields的values通过TupleUtils.chooseTaskIndex选择task下标;chooseTaskIndex主要是采用Arrays.deepHashCode取哈希值然后对numTask向下取模

GlobalGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class GlobalGrouper implements CustomStreamGrouping {

        private List<Integer> targetTasks;

        public GlobalGrouper() {
        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = targetTasks;
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            if (targetTasks.isEmpty()) {
                return null;
            }
            // It's possible for target to have multiple tasks if it reads multiple sources
            return Collections.singletonList(targetTasks.get(0));
        }
    }
  • 这里固定取第一个task,即targetTasks.get(0)

NoneGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class NoneGrouper implements CustomStreamGrouping {

        private final Random random;
        private List<Integer> targetTasks;
        private int numTasks;

        public NoneGrouper() {
            random = new Random();
        }

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = targetTasks;
            this.numTasks = targetTasks.size();
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            int index = random.nextInt(numTasks);
            return Collections.singletonList(targetTasks.get(index));
        }
    }
  • 这里通过random.nextInt(numTasks)随机取task

AllGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class AllGrouper implements CustomStreamGrouping {

        private List<Integer> targetTasks;

        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
            this.targetTasks = targetTasks;
        }

        @Override
        public List<Integer> chooseTasks(int taskId, List<Object> values) {
            return targetTasks;
        }
    }
  • 这里返回所有的targetTasks

ShuffleGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java

public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
    private ArrayList<List<Integer>> choices;
    private AtomicInteger current;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        choices = new ArrayList<List<Integer>>(targetTasks.size());
        for (Integer i : targetTasks) {
            choices.add(Arrays.asList(i));
        }
        current = new AtomicInteger(0);
        Collections.shuffle(choices, new Random());
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        int rightNow;
        int size = choices.size();
        while (true) {
            rightNow = current.incrementAndGet();
            if (rightNow < size) {
                return choices.get(rightNow);
            } else if (rightNow == size) {
                current.set(0);
                return choices.get(0);
            }
        } // race condition with another thread, and we lost. try again
    }
}
  • 这里在prepare的时候对ArrayList> choices进行随机化
  • 采用current.incrementAndGet()实现round robbin的效果,超过size的时候重置返回第一个,没有超过则返回incr后的index的值

PartialKeyGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
    private static final long serialVersionUID = -1672360572274911808L;
    private List<Integer> targetTasks;
    private Fields fields = null;
    private Fields outFields = null;

    private AssignmentCreator assignmentCreator;
    private TargetSelector targetSelector;

    public PartialKeyGrouping() {
        this(null);
    }

    public PartialKeyGrouping(Fields fields) {
        this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
        this(fields, assignmentCreator, new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
        this.fields = fields;
        this.assignmentCreator = assignmentCreator;
        this.targetSelector = targetSelector;
    }

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.targetTasks = targetTasks;
        if (this.fields != null) {
            this.outFields = context.getComponentOutputFields(stream);
        }
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        List<Integer> boltIds = new ArrayList<>(1);
        if (values.size() > 0) {
            final byte[] rawKeyBytes = getKeyBytes(values);

            final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
            final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);

            boltIds.add(selectedTask);
        }
        return boltIds;
    }
    //......
}
  • 这里通过RandomTwoTaskAssignmentCreator来选中两个taskId,然后选择使用次数小的那个

LoadAwareCustomStreamGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java

public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
    void refreshLoad(LoadMapping loadMapping);
}
  • 继承了CustomStreamGrouping接口,然后新定义了refreshLoad方法用于刷新负载,这里的负载主要是executor的receiveQueue的负载(qMetrics.population() / qMetrics.capacity())
  • LoadAwareCustomStreamGrouping有几个实现类,有BasicLoadAwareCustomStreamGrouping以及LoadAwareShuffleGrouping

小结

  • storm的CustomStreamGrouping接口定义了chooseTasks方法,用于选择tasks来处理tuples
  • ShuffleGrouping类似round robbin,FieldsGrouper则根据所选字段值采用Arrays.deepHashCode取哈希值然后对numTask向下取模,GlobalGrouper返回index为0的taskId,NoneGrouper则随机返回,AllGrouper不做过滤返回所有taskId,PartialKeyGrouping则使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标,然后选择使用次数少的那个task。
  • LoadAware的grouping有BasicLoadAwareCustomStreamGrouping以及LoadAwareShuffleGrouping,他们都实现了LoadAwareCustomStreamGrouping接口,该接口定义了refreshLoad方法,用于动态刷新负载,这里的负载主要是executor的receiveQueue的负载(qMetrics.population() / qMetrics.capacity())

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张俊红

数据结构-栈和队列

我们把类似于弹夹那种先进后出的数据结构称为栈,栈是限定仅在表尾进行插入和删除操作的线性表,我们把允许插入和删除的一端称为栈顶,另一端称为栈底,不含任何数据元素的...

12520
来自专栏码匠的流水账

聊聊storm的CustomStreamGrouping

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping....

12110
来自专栏机器学习入门

LWC 55:712. Minimum ASCII Delete Sum for Two Strings

LWC 55:712. Minimum ASCII Delete Sum for Two Strings 传送门:712. Minimum ASCII Dele...

23570
来自专栏zhisheng

#每日一题#3

如下代码,执行test()函数后,屏幕打印结果为() public class Test2 { public void add(Byte b) ...

369110
来自专栏文武兼修ing——机器学习与IC设计

栈与栈的实现栈栈的基本操作栈的实现

栈 栈是一种基础的数据结构,只从一端读写数据。基本特点就”后进先出“,例如顺序入栈1,2,3,4,5,再顺序出栈是5,4,3,2,1 栈的基本操作 栈的基本操作...

34350
来自专栏写代码的海盗

scala如何解决类型强转问题

scala如何解决类型强转问题   scala属于强类型语言,在指定变量类型时必须确定数据类型,即便scala拥有引以为傲的隐式推到,这某些场合也有些有心无力。...

36390
来自专栏Ryan Miao

oracle表数据类型number对应java中BIgDecimal转int

oracle中id为number类型,在java获取id时用getBigDecimal 相匹配, 如果想转换成int,重写model中的getInt方法: 1 ...

30260
来自专栏Fundebug

10个JavaScript常见BUG及修复方法

14250
来自专栏JavaEdge

"聊胜于无",浅析Java中的原子操作Java的指针Unsafe类i++不是线程安全的1 原子更新基本类型类2 原子更新数组3 AtomicReference(原子更新引用)4 原子更新字段Atomi

67160
来自专栏鸿的学习笔记

python的装饰器和闭包

因为在编译时,python会认为b是局部变量,这是python的一个设计选择,为了避免变量的污染,想一想。如果某人在函数内部改动了变量,你没有办法看到这个函数,

10510

扫码关注云+社区

领取腾讯云代金券