本文主要研究一下elasticsearch的NodesFaultDetection
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
public class NodesFaultDetection extends AbstractComponent {
public static interface Listener {
void onNodeFailure(DiscoveryNode node, String reason);
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final boolean connectOnNetworkDisconnect;
private final TimeValue pingInterval;
private final TimeValue pingRetryTimeout;
private final int pingRetryCount;
// used mainly for testing, should always be true
private final boolean registerConnectionListener;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
private final FDConnectionListener connectionListener;
private volatile DiscoveryNodes latestNodes = EMPTY_NODES;
private volatile boolean running = false;
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);
logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler());
this.connectionListener = new FDConnectionListener();
if (registerConnectionListener) {
transportService.addConnectionListener(connectionListener);
}
}
public NodesFaultDetection start() {
if (running) {
return this;
}
running = true;
return this;
}
public NodesFaultDetection stop() {
if (!running) {
return this;
}
running = false;
return this;
}
public void close() {
stop();
transportService.removeHandler(PingRequestHandler.ACTION);
transportService.removeConnectionListener(connectionListener);
}
//......
}
默认true
)、ping_interval(默认1s
)、ping_timeout(默认30s
)、ping_retries(默认为3
)、register_connection_listener(默认true
)配置,然后给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListenerelasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {
public static final String ACTION = "discovery/zen/fd/ping";
@Override
public PingRequest newInstance() {
return new PingRequest();
}
@Override
public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
// if we are not the node we are supposed to be pinged, send an exception
// this can happen when a kill -9 is sent, and another node is started using the same port
if (!latestNodes.localNodeId().equals(request.nodeId)) {
throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
}
channel.sendResponse(new PingResponse());
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
static class PingRequest extends TransportRequest {
// the (assumed) node id we are pinging
private String nodeId;
PingRequest() {
}
PingRequest(String nodeId) {
this.nodeId = nodeId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
}
}
private static class PingResponse extends TransportResponse {
private PingResponse() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
private void handleTransportDisconnect(DiscoveryNode node) {
if (!latestNodes.nodeExists(node.id())) {
return;
}
NodeFD nodeFD = nodesFD.remove(node);
if (nodeFD == null) {
return;
}
if (!running) {
return;
}
nodeFD.running = false;
if (connectOnNetworkDisconnect) {
try {
transportService.connectToNode(node);
nodesFD.put(node, new NodeFD());
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));
} catch (Exception e) {
logger.trace("[node ] [{}] transport disconnected (with verified connect)", node);
notifyNodeFailure(node, "transport disconnected (with verified connect)");
}
} else {
logger.trace("[node ] [{}] transport disconnected", node);
notifyNodeFailure(node, "transport disconnected");
}
}
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {
listener.onNodeFailure(node, reason);
}
}
});
}
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
//......
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.pingService = pingService;
// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);
this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data", false);
logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);
this.electMaster = new ElectMasterService(settings);
nodeSettingsService.addListener(new ApplySettings());
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD.addListener(new NodeFailureListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());
}
protected void doStart() throws ElasticSearchException {
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
String nodeId = UUID.randomBase64UUID();
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
nodesFD.updateNodes(latestDiscoNodes);
pingService.start();
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
asyncJoinCluster();
}
public void publish(ClusterState clusterState) {
if (!master) {
throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");
}
latestDiscoNodes = clusterState.nodes();
nodesFD.updateNodes(clusterState.nodes());
publishClusterState.publish(clusterState);
}
private class NodeFailureListener implements NodesFaultDetection.Listener {
@Override
public void onNodeFailure(DiscoveryNode node, String reason) {
handleNodeFailure(node, reason);
}
}
private void handleNodeFailure(final DiscoveryNode node, String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
return;
}
if (!master) {
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.putAll(currentState.nodes())
.remove(node.id());
latestDiscoNodes = builder.build();
currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
@Override
public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
}
//......
}
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
public class NodesFaultDetection extends AbstractComponent {
//......
public void updateNodes(DiscoveryNodes nodes) {
DiscoveryNodes prevNodes = latestNodes;
this.latestNodes = nodes;
if (!running) {
return;
}
DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
for (DiscoveryNode newNode : delta.addedNodes()) {
if (newNode.id().equals(nodes.localNodeId())) {
// no need to monitor the local node
continue;
}
if (!nodesFD.containsKey(newNode)) {
nodesFD.put(newNode, new NodeFD());
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
}
}
for (DiscoveryNode removedNode : delta.removedNodes()) {
nodesFD.remove(removedNode);
}
}
//......
}
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
private class SendPingRequest implements Runnable {
private final DiscoveryNode node;
private SendPingRequest(DiscoveryNode node) {
this.node = node;
}
@Override
public void run() {
if (!running) {
return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() {
@Override
public PingResponse newInstance() {
return new PingResponse();
}
@Override
public void handleResponse(PingResponse response) {
if (!running) {
return;
}
NodeFD nodeFD = nodesFD.get(node);
if (nodeFD != null) {
if (!nodeFD.running) {
return;
}
nodeFD.retryCount = 0;
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);
}
}
@Override
public void handleException(TransportException exp) {
// check if the master node did not get switched on us...
if (!running) {
return;
}
if (exp instanceof ConnectTransportException) {
// ignore this one, we already handle it by registering a connection listener
return;
}
NodeFD nodeFD = nodesFD.get(node);
if (nodeFD != null) {
if (!nodeFD.running) {
return;
}
int retryCount = ++nodeFD.retryCount;
logger.trace("[node ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.debug("[node ] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout);
// not good, failure
if (nodesFD.remove(node) != null) {
notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
}
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
options().withHighType().withTimeout(pingRetryTimeout), this);
}
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行
);如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法;notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。