专栏首页码匠的流水账聊聊elasticsearch的RoutingService
原创

聊聊elasticsearch的RoutingService

本文主要研究一下elasticsearch的RoutingService

RoutingService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

public class RoutingService extends AbstractLifecycleComponent {
    private static final Logger logger = LogManager.getLogger(RoutingService.class);
​
    private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
​
    private final ClusterService clusterService;
    private final AllocationService allocationService;
​
    private AtomicBoolean rerouting = new AtomicBoolean();
​
    @Inject
    public RoutingService(ClusterService clusterService, AllocationService allocationService) {
        this.clusterService = clusterService;
        this.allocationService = allocationService;
    }
​
    @Override
    protected void doStart() {
    }
​
    @Override
    protected void doStop() {
    }
​
    @Override
    protected void doClose() {
    }
​
    /**
     * Initiates a reroute.
     */
    public final void reroute(String reason) {
        try {
            if (lifecycle.stopped()) {
                return;
            }
            if (rerouting.compareAndSet(false, true) == false) {
                logger.trace("already has pending reroute, ignoring {}", reason);
                return;
            }
            logger.trace("rerouting {}", reason);
            clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
                new ClusterStateUpdateTask(Priority.HIGH) {
                    @Override
                    public ClusterState execute(ClusterState currentState) {
                        rerouting.set(false);
                        return allocationService.reroute(currentState, reason);
                    }
​
                    @Override
                    public void onNoLongerMaster(String source) {
                        rerouting.set(false);
                        // no biggie
                    }
​
                    @Override
                    public void onFailure(String source, Exception e) {
                        rerouting.set(false);
                        ClusterState state = clusterService.state();
                        if (logger.isTraceEnabled()) {
                            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",
                                source, state), e);
                        } else {
                            logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
                                source, state.version()), e);
                        }
                    }
                });
        } catch (Exception e) {
            rerouting.set(false);
            ClusterState state = clusterService.state();
            logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
        }
    }
}
  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute

AllocationService.reroute

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

public class AllocationService {
    //......
​
    /**
     * Reroutes the routing table based on the live nodes.
     * <p>
     * If the same instance of ClusterState is returned, then no change has been made.
     */
    public ClusterState reroute(ClusterState clusterState, String reason) {
        return reroute(clusterState, reason, false);
    }
​
    /**
     * Reroutes the routing table based on the live nodes.
     * <p>
     * If the same instance of ClusterState is returned, then no change has been made.
     */
    protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {
        ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
​
        RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
        // shuffle the unassigned nodes, just so we won't have things like poison failed shards
        routingNodes.unassigned().shuffle();
        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
            clusterInfoService.getClusterInfo(), currentNanoTime());
        allocation.debugDecision(debug);
        reroute(allocation);
        if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
            return clusterState;
        }
        return buildResultAndLogHealthChange(clusterState, allocation, reason);
    }
​
    private void reroute(RoutingAllocation allocation) {
        assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
        assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :
            "auto-expand replicas out of sync with number of nodes in the cluster";
​
        // now allocate all the unassigned to available nodes
        if (allocation.routingNodes().unassigned().size() > 0) {
            removeDelayMarkers(allocation);
            gatewayAllocator.allocateUnassigned(allocation);
        }
​
        shardsAllocator.allocate(allocation);
        assert RoutingNodes.assertShardStats(allocation.routingNodes());
    }
​
    //......
}
  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)

BalancedShardsAllocator.allocate

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

public class BalancedShardsAllocator implements ShardsAllocator {
    //......
​
    public void allocate(RoutingAllocation allocation) {
        if (allocation.routingNodes().size() == 0) {
            /* with no nodes this is pointless */
            return;
        }
        final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
        balancer.allocateUnassigned();
        balancer.moveShards();
        balancer.balance();
    }
​
    //......
}
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

小结

  • RoutingService的构造器要求输入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托给allocationService.reroute
  • AllocationService的reroute方法主要是构建RoutingAllocation,然后在进行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
  • BalancedShardsAllocator的allocate方法,则创建Balancer,然后执行Balancer的allocateUnassigned()、moveShards()、balance()方法

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊dubbo的ExecuteLimitFilter

    dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/Ex...

    codecraft
  • 聊聊dubbo的ExecuteLimitFilter

    dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/Ex...

    codecraft
  • 聊聊Elasticsearch的Iterables

    elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/iterable/...

    codecraft
  • 【学习笔记】0309_仿豆瓣电影app学习笔记

    <!-- --> 讲课过程中会用来一些思维导图, 我这里用的是 xMind, http://www.xmindchina.net/ <!-- --> 今天的...

    web前端教室
  • 设计模式-委托模式

    在常用的23种设计模式中其实面没有委派模式(delegate)的影子,但是在 Spring 中委派模式确实用的比较多的一种模式,Spring MVC 框架中的D...

    MageByte
  • ASP.NET集群内容缓存工具NWebCache

    Alachisoft 发布的NWebCache,这是一个ASP.NET集群内容缓存工具。NWebCache缓存动态页面和根据数据库依赖,当数据库修改时保持同步。...

    张善友
  • 利用php url转发 - 解决空间不提供子目录绑定功能的问题

    由于很多新手都是使用的虚拟空间都是最便宜的那种,这空间一般不支持子目录绑定。但是很多朋友又想设置几个不同的二级域名访问不同的网站程序。于是大家找到了域名url转...

    用户6891928
  • Android 多线程误区,我不信你们都懂!

    前段时间在组内做了一下现有的代码分析,发现很多以前的legacy code多线程的使用都不算是最佳实践,而且坏事的地方在于,刚毕业的学生,因为没有别的参照物,往...

    Android技术干货分享
  • 结合 MultiType 实现加载更多

    MultiType 是一个分发管理类,帮助我们轻松实现复杂布局.建议大家阅读源码,作者的思路并不复杂但很巧妙.

    夏洛克的猫
  • 利用php url转发 - 解决空间不提供子目录绑定功能的问题

    由于很多新手都是使用的虚拟空间都是最便宜的那种,这空间一般不支持子目录绑定。但是很多朋友又想设置几个不同的二级域名访问不同的网站程序。于是大家找到了域名url转...

    ZhangXianSheng

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动