首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用spymemcached动态添加memcached节点

如何使用spymemcached动态添加memcached节点
EN

Stack Overflow用户
提问于 2013-07-03 20:42:32
回答 2查看 2.2K关注 0票数 4

我有一个Java应用程序设置,其中有多个与spymemcached客户机通信的memcached服务器节点。

我想知道是否可以在运行时添加或删除服务器节点,而不干扰所有现有的缓存节点(我知道有些节点应该更改)。

以下是我所知道的(或理解的):

可以在DefaultConnectionFactory中设置自定义散列算法,这有助于我们使用一致的散列,甚至使用内置的KetamaConnectionFactory。

因此,我们应该能够添加或删除只对一个或几个现有节点进行更改的节点。

可以使用spymemcached吗?

如果是,那么是如何实现的呢?

有谁能给我指个方向吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2013-08-14 16:57:17

看起来应该由NodeLocator.updateLocator(List<MemcachedNode> newNodes)来完成这项工作。

但是连接MemcachedNode有点困难。您必须覆盖MemcachedClientMemcachedConnectionDefaultConnectionFactory

您希望在MemcachedClient中添加或删除客户端是合理的,因此您添加了remove(MemcachedNode node)add(MemcachedNode node)方法。

在删除的情况下,您应该断开节点的连接(参见MemcachedConnection.shutdown()),并将其从NodeLocator.getAll()中删除,然后调用NodeLocator.updateLocator(List<MemcachedNode> newNodes)

在add的情况下,您应该通过MemcachedConnection.createConnections(最终集合a)连接节点,将其与NodeLocator.getAll()合并并调用NodeLocator.updateLocator(List<MemcachedNode> newNodes)

嗯,我从来没有试过,所以它可能不会起作用。所以祝你好运!

ExtMemCachedConnection.java公共类ExtMemCachedConnection扩展了MemcachedConnection {

代码语言:javascript
运行
复制
  protected final OperationFactory opFact;

  /**
   * Construct a memcached connection.
   *
   * @param bufSize the size of the buffer used for reading from the server
   * @param f       the factory that will provide an operation queue
   * @param a       the addresses of the servers to connect to
   * @throws java.io.IOException if a connection attempt fails early
   */
  public ExtendableMemcachedConnection(int bufSize, ConnectionFactory f,
                                       List<InetSocketAddress> a,
                                       Collection<ConnectionObserver> obs,
                                       FailureMode fm, OperationFactory opfactory)
      throws IOException {
    super(bufSize, f, a, obs, fm, opfactory);
    this.opFact = opfactory;
  }

  public void add(InetSocketAddress nodeAddress) throws IOException {
    final List<InetSocketAddress> nodeToAdd = new ArrayList<InetSocketAddress>(1);
    nodeToAdd.add(nodeAddress);
    List<MemcachedNode> newNodesList = createConnections(nodeToAdd);
    newNodesList.addAll(getLocator().getAll());
    getLocator().updateLocator(newNodesList);
  }

  //The node should be obtain from locator to ensure currentNode.equals(node) will return true
  public void remove(MemcachedNode node) throws IOException {
    for(MemcachedNode currentNode : getLocator().getAll()) {
      if(currentNode.equals(node)) {
        Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue();
        if (currentNode.getChannel() != null) {
          currentNode.getChannel().close();
          currentNode.setSk(null);
          if (currentNode.getBytesRemainingToWrite() > 0) {
            getLogger().warn("Shut down with %d bytes remaining to write",
                             currentNode.getBytesRemainingToWrite());
          }
          getLogger().debug("Shut down channel %s", currentNode.getChannel());
        }
        //Unfortunatelly,  redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation
        redistributeOperations(notCompletedOperations);
      }
    }
  }

  protected void redistributeOperations(Collection<Operation> ops) {
    for (Operation op : ops) {
      if (op.isCancelled() || op.isTimedOut()) {
        continue;
      }
      if (op instanceof KeyedOperation) {
        KeyedOperation ko = (KeyedOperation) op;
        int added = 0;
        for (String k : ko.getKeys()) {
          for (Operation newop : opFact.clone(ko)) {
            addOperation(k, newop);
            added++;
          }
        }
        assert added > 0 : "Didn't add any new operations when redistributing";
      } else {
        // Cancel things that don't have definite targets.
        op.cancel();
      }
    }
  }


}

ExtMemcachedClient.java

代码语言:javascript
运行
复制
  public void add(InetSocketAddress nodeAddress) {
    if(mconn instanceof ExtMemcachedConnection) {
      ((ExtMemcachedConnection)mconn).add(nodeAddress);  
    }
  }

  public boolean remove(MemcachedNode node) {
    if(mconn instanceof ExtMemcachedConnection) {
      ((ExtMemcachedConnection)mconn).remove(nodeAddress);
    }
  }

ExtMemcachedConnectionfactory.java

代码语言:javascript
运行
复制
  @Override
  public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
    return new ExtendableMemcachedConnection(getReadBufSize(), this, addrs,
                                             getInitialObservers(), getFailureMode(), getOperationFactory());
  }
票数 5
EN

Stack Overflow用户

发布于 2015-03-31 04:56:18

这个问题很久以前就被问过了,对于spymemcached来说,但我认为这个答案可能会对其他寻找替代方案的人有所帮助。还有另一个memcached客户端,叫做xmemcached。该客户端支持动态添加/删除memcached服务器。

https://code.google.com/p/xmemcached/

https://github.com/killme2008/xmemcached

代码语言:javascript
运行
复制
memcachedClient.addServer(host, port);

memcachedClient.removeServer(host);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/17448311

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档