AgentResource类位于org.apache.ambari.server.agent.rest包下,如下图:
AgentResource类位于Ambari-Server下,它为Ambari-Agent提供API(REST API),以获取集群配置更改,以及报告在集群节点上运行的服务的节点属性和状态。
源代码如下(添加了部分注释)
package org.apache.ambari.server.agent.rest;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.ComponentsResponse;
import org.apache.ambari.server.agent.HeartBeat;
import org.apache.ambari.server.agent.HeartBeatHandler;
import org.apache.ambari.server.agent.HeartBeatResponse;
import org.apache.ambari.server.agent.Register;
import org.apache.ambari.server.agent.RegistrationResponse;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.inject.Inject;
import org.apache.ambari.server.agent.RegistrationStatus;
/**
* Agent Resource represents Ambari agent controller.
* 代理资源表示Ambari代理控制器。
* It provides API for Ambari agents to get the cluster configuration changes
* as well as report the node attributes and state of services running the on
* the cluster nodes
* 它为Ambari代理提供API(REST API),以获取集群配置更改,以及报告在集群节点上运行的服务的节点属性和状态
*/
@Path("/")
public class AgentResource {
private static HeartBeatHandler hh;
private static Log LOG = LogFactory.getLog(AgentResource.class);
@Inject
public static void init(HeartBeatHandler instance) {
hh = instance;
//hh.start();
}
/**
* Explicitly start HH
* 显式启动HH(HeartBeatHandler)
*/
public static void statHeartBeatHandler() {
hh.start();
}
/**
* Register information about the host (Internal API to be used for
* Ambari Agent)
* 注册有关主机的信息(用于Ambari代理的内部API)
*
* @response.representation.200.doc This API is invoked by Ambari agent running
* on a cluster to register with the server.
* 此API由集群上运行的Ambari代理调用以向服务器注册
*
* @response.representation.200.mediaType application/json
* @response.representation.406.doc Error in register message format
* @response.representation.408.doc Request Timed out
* @param message Register message
* @throws InvalidStateTransitionException
* @throws AmbariException
* @throws Exception
*/
@Path("register/{hostName}")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces({MediaType.APPLICATION_JSON})
public RegistrationResponse register(Register message,
@Context HttpServletRequest req)
throws WebApplicationException, InvalidStateTransitionException {
/* Call into the heartbeat handler 调用心跳处理程序 */
RegistrationResponse response = null;
try {
//注册并返回响应
response = hh.handleRegistration(message);
LOG.debug("Sending registration response " + response);
} catch (AmbariException ex) {
response = new RegistrationResponse();
response.setResponseId(-1);
response.setResponseStatus(RegistrationStatus.FAILED);
response.setExitstatus(1);
response.setLog(ex.getMessage());
return response;
}
return response;
}
/**
* Update state of the node (Internal API to be used by Ambari agent).
* 更新节点的状态(Ambari代理使用的内部API)。
*
* @response.representation.200.doc This API is invoked by Ambari agent running
* on a cluster to update the state of various services running on the node.
* 此API由集群上运行的Ambari代理调用,以更新节点上运行的各种服务的状态。
*
* @response.representation.200.mediaType application/json
* @response.representation.406.doc Error in heartbeat message format
* @response.representation.408.doc Request Timed out
* @param message Heartbeat message
* @throws Exception
*/
@Path("heartbeat/{hostName}")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces({MediaType.APPLICATION_JSON})
public HeartBeatResponse heartbeat(HeartBeat message)
throws WebApplicationException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received Heartbeat message " + message);
}
HeartBeatResponse heartBeatResponse;
try {
//处理心跳并返回响应
heartBeatResponse = hh.handleHeartBeat(message);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat response with response id " + heartBeatResponse.getResponseId());
LOG.debug("Response details " + heartBeatResponse);
}
} catch (Exception e) {
LOG.warn("Error in HeartBeat", e);
throw new WebApplicationException(500);
}
return heartBeatResponse;
}
/**
* Retrieves the components category map for stack used on cluster
* (Internal API to be used by Ambari agent).
* 检索集群上使用的堆栈的组件类别映射(Ambari代理使用的内部API)。
*
* @response.representation.200.doc This API is invoked by Ambari agent running
* on a cluster to update the components category map of stack used by this cluster
* 此API由集群上运行的Ambari代理调用,以更新此集群使用的堆栈的组件类别映射
*
* @response.representation.200.mediaType application/json
* @response.representation.408.doc Request Timed out
* @param clusterName of cluster
* @throws Exception
*/
@Path("components/{clusterName}")
@GET
@Produces({MediaType.APPLICATION_JSON})
public ComponentsResponse components(
@PathParam("clusterName") String clusterName) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received Components request for cluster " + clusterName);
}
ComponentsResponse componentsResponse;
try {
componentsResponse = hh.handleComponents(clusterName);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending components response");
LOG.debug("Response details " + componentsResponse);
}
} catch (Exception e) {
LOG.warn("Error in Components", e);
throw new WebApplicationException(500);
}
return componentsResponse;
}
}