我有一个Java应用程序设置,其中有多个与spymemcached
客户机通信的memcached
服务器节点。
我想知道是否可以在运行时添加或删除服务器节点,而不干扰所有现有的缓存节点(我知道有些节点应该更改)。
以下是我所知道的(或理解的):
可以在DefaultConnectionFactory
中设置自定义散列算法,这有助于我们使用一致的散列,甚至使用内置的KetamaConnectionFactory。
因此,我们应该能够添加或删除只对一个或几个现有节点进行更改的节点。
可以使用spymemcached
吗?
如果是,那么是如何实现的呢?
有谁能给我指个方向吗?
发布于 2013-08-14 16:57:17
看起来应该由NodeLocator.updateLocator(List<MemcachedNode> newNodes)
来完成这项工作。
但是连接MemcachedNode有点困难。您必须覆盖MemcachedClient
、MemcachedConnection
和DefaultConnectionFactory
。
您希望在MemcachedClient
中添加或删除客户端是合理的,因此您添加了remove(MemcachedNode node)
和add(MemcachedNode node)
方法。
在删除的情况下,您应该断开节点的连接(参见MemcachedConnection.shutdown()
),并将其从NodeLocator.getAll()
中删除,然后调用NodeLocator.updateLocator(List<MemcachedNode> newNodes)
。
在add的情况下,您应该通过MemcachedConnection.createConnections(最终集合a)连接节点,将其与NodeLocator.getAll()
合并并调用NodeLocator.updateLocator(List<MemcachedNode> newNodes)
。
嗯,我从来没有试过,所以它可能不会起作用。所以祝你好运!
ExtMemCachedConnection.java公共类ExtMemCachedConnection扩展了MemcachedConnection {
protected final OperationFactory opFact;
/**
* Construct a memcached connection.
*
* @param bufSize the size of the buffer used for reading from the server
* @param f the factory that will provide an operation queue
* @param a the addresses of the servers to connect to
* @throws java.io.IOException if a connection attempt fails early
*/
public ExtendableMemcachedConnection(int bufSize, ConnectionFactory f,
List<InetSocketAddress> a,
Collection<ConnectionObserver> obs,
FailureMode fm, OperationFactory opfactory)
throws IOException {
super(bufSize, f, a, obs, fm, opfactory);
this.opFact = opfactory;
}
public void add(InetSocketAddress nodeAddress) throws IOException {
final List<InetSocketAddress> nodeToAdd = new ArrayList<InetSocketAddress>(1);
nodeToAdd.add(nodeAddress);
List<MemcachedNode> newNodesList = createConnections(nodeToAdd);
newNodesList.addAll(getLocator().getAll());
getLocator().updateLocator(newNodesList);
}
//The node should be obtain from locator to ensure currentNode.equals(node) will return true
public void remove(MemcachedNode node) throws IOException {
for(MemcachedNode currentNode : getLocator().getAll()) {
if(currentNode.equals(node)) {
Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue();
if (currentNode.getChannel() != null) {
currentNode.getChannel().close();
currentNode.setSk(null);
if (currentNode.getBytesRemainingToWrite() > 0) {
getLogger().warn("Shut down with %d bytes remaining to write",
currentNode.getBytesRemainingToWrite());
}
getLogger().debug("Shut down channel %s", currentNode.getChannel());
}
//Unfortunatelly, redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation
redistributeOperations(notCompletedOperations);
}
}
}
protected void redistributeOperations(Collection<Operation> ops) {
for (Operation op : ops) {
if (op.isCancelled() || op.isTimedOut()) {
continue;
}
if (op instanceof KeyedOperation) {
KeyedOperation ko = (KeyedOperation) op;
int added = 0;
for (String k : ko.getKeys()) {
for (Operation newop : opFact.clone(ko)) {
addOperation(k, newop);
added++;
}
}
assert added > 0 : "Didn't add any new operations when redistributing";
} else {
// Cancel things that don't have definite targets.
op.cancel();
}
}
}
}
ExtMemcachedClient.java
public void add(InetSocketAddress nodeAddress) {
if(mconn instanceof ExtMemcachedConnection) {
((ExtMemcachedConnection)mconn).add(nodeAddress);
}
}
public boolean remove(MemcachedNode node) {
if(mconn instanceof ExtMemcachedConnection) {
((ExtMemcachedConnection)mconn).remove(nodeAddress);
}
}
ExtMemcachedConnectionfactory.java
@Override
public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
return new ExtendableMemcachedConnection(getReadBufSize(), this, addrs,
getInitialObservers(), getFailureMode(), getOperationFactory());
}
发布于 2015-03-31 04:56:18
这个问题很久以前就被问过了,对于spymemcached来说,但我认为这个答案可能会对其他寻找替代方案的人有所帮助。还有另一个memcached客户端,叫做xmemcached。该客户端支持动态添加/删除memcached服务器。
https://code.google.com/p/xmemcached/
https://github.com/killme2008/xmemcached
memcachedClient.addServer(host, port);
memcachedClient.removeServer(host);
https://stackoverflow.com/questions/17448311
复制相似问题