前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊Elasticsearch的RoundRobinSupplier

聊聊Elasticsearch的RoundRobinSupplier

作者头像
code4it
发布2019-06-03 18:39:37
3500
发布2019-06-03 18:39:37
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下Elasticsearch的RoundRobinSupplier

RoundRobinSupplier

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java

代码语言:javascript
复制
final class RoundRobinSupplier<S> implements Supplier<S> {

    private final AtomicBoolean selectorsSet = new AtomicBoolean(false);
    private volatile S[] selectors;
    private AtomicInteger counter = new AtomicInteger(0);

    RoundRobinSupplier() {
        this.selectors = null;
    }

    RoundRobinSupplier(S[] selectors) {
        this.selectors = selectors;
        this.selectorsSet.set(true);
    }

    @Override
    public S get() {
        S[] selectors = this.selectors;
        return selectors[counter.getAndIncrement() % selectors.length];
    }

    void setSelectors(S[] selectors) {
        if (selectorsSet.compareAndSet(false, true)) {
            this.selectors = selectors;
        } else {
            throw new AssertionError("Selectors already set. Should only be set once.");
        }
    }

    int count() {
        return selectors.length;
    }
}
  • RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值

NioSelectorGroup

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java

代码语言:javascript
复制
public class NioSelectorGroup implements NioGroup {

    private final List<NioSelector> dedicatedAcceptors;
    private final RoundRobinSupplier<NioSelector> acceptorSupplier;

    private final List<NioSelector> selectors;
    private final RoundRobinSupplier<NioSelector> selectorSupplier;

    private final AtomicBoolean isOpen = new AtomicBoolean(true);

//......

    public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory,
                            int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {
        dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
        selectors = new ArrayList<>(selectorCount);

        try {
            List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount);
            for (int i = 0; i < selectorCount; ++i) {
                RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>();
                suppliersToSet.add(supplier);
                NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
                selectors.add(selector);
            }
            for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) {
                supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
                assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
            }

            for (int i = 0; i < dedicatedAcceptorCount; ++i) {
                RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
                NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
                dedicatedAcceptors.add(acceptor);
            }

            if (dedicatedAcceptorCount != 0) {
                acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
            } else {
                acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
            }
            selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
            assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
            assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";

            startSelectors(selectors, selectorThreadFactory);
            startSelectors(dedicatedAcceptors, acceptorThreadFactory);
        } catch (Exception e) {
            try {
                close();
            } catch (Exception e1) {
                e.addSuppressed(e1);
            }
            throw e;
        }
    }

    public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
        throws IOException {
        ensureOpen();
        return factory.openNioServerSocketChannel(address, acceptorSupplier);
    }

    @Override
    public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
        ensureOpen();
        return factory.openNioChannel(address, selectorSupplier);
    }    

//......
}
  • NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)

ChannelFactory

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java

代码语言:javascript
复制
public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> {
//......

    public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException {
        ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
        NioSelector selector = supplier.get();
        ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);
        scheduleServerChannel(serverChannel, selector);
        return serverChannel;
    }

    public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException {
        SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
        NioSelector selector = supplier.get();
        Socket channel = internalCreateChannel(selector, rawChannel);
        scheduleChannel(channel, selector);
        return channel;
    }

//......
}
  • ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector

小结

  • RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值
  • NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)
  • ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector

doc

  • RoundRobinSupplier
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RoundRobinSupplier
  • NioSelectorGroup
  • ChannelFactory
  • 小结
  • doc
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档