ambari-server开发自定义API

本文篇幅较长,但都是满满的干货。主要从Ambari-server详解、如何debug ambari-server源码、开发流程分析图、开发流程自定义示例四大部分入手,教读者如何玩转ambari-server

一、Ambari-server详解

1. 简介

Ambari-Server是一个WEB Server,提供统一的REST API接口,同时向web和agent开放了两个不同的端口(默认前者是8080, 后者是8440或者8441)。它是由Jetty Server容器构建,通过Spring Framework构建出来的WEB服务器,其中大量采用了google提供的Guice注解完成spring框架所需要的注入功能。 REST框架由JAX-RS标准来构建。

2. 目录

目录

描述

org.apache.ambari.server.api.services

对web接口的入口方法,处理/api/v1/* 的请求

org.apache.ambari.server.controller

对Ambari中cluster的管理处理,如新增host,更service、删除component等

org.apache.ambari.server.controller.internal

主要存放ResourceProvider和PropertyProvider;

org.apache.ambari.service.orm.*

对数据库的操作

org.apache.ambari.server.agent.rest

处理与Agent的接口的入口方法

org.apache.ambari.security

使用Spring Security来做权限管理

3. Resource

其中,每一种Resource都对应一个ResourceProvider,对应关系如下:

Resource.Type

ResourceProvider

Workflow

WorkflowResourceProvider

Job

JobResourceProvider

TaskAttempt

TaskAttemptResourceProvider

View

ViewResourceProvider

ViewInstance

ViewInstanceResourceProvider

Blueprint

BlueprintResourceProvider

Cluster

ClusterResourceProvider

Service

ServiceResourceProvider

Component

ComponentResourceProvider

Host

HostResourceProvider

HostComponent

HostComponentResourceProvider

Configuration

ConfigurationResourceProvider

Action

ActionResourceProvider

Request

RequestResourceProvider

Task

TaskResourceProvider

User

UserResourceProvider

Stack

StackResourceProvider

StackVersion

StackVersionResourceProvider

StackService

StackServiceResourceProvider

StackServiceComponent

StackServiceComponentResourceProvider

StackConfiguration

StackConfigurationResourceProvider

OperatingSystem

OperatingSystemResourceProvider

Repository

RepositoryResourceProvider

RootService

RootServiceResourceProvider

RootServiceComponent

RootServiceComponentResourceProvider

RootServiceHostComponent

RootServiceHostComponentResourceProvider

ConfigGroup

ConfigGroupResourceProvider

RequestSchedule

RequestScheduleResourceProvider

我们对数据的处理就是在xxxResourceProvider.java内实现。

4. 获取数据流程

(1) jersy接口接收到请求,创建一个ResourceInstance实例;

(2) 解析http请求构造一个Request对象,然后交给reques的process()方法来处理;

(3) reques解析url或http_body得到一个Predicate对象;

(4) 根据http类型获取handler,GET请求对应ReadHandler;

(5) handler向Query对象中添加分页、Render、Predicate等属性后,然后让query.execute();

(6) 根据Resource.Type获得对应的ResourceProvider对象,调用其getResources方法得到Set\;

(7) 调用对应的PropertyProvider填充Resource;

(8) 处理结果,返回json结果;

二、Ambari-server Debug模式

1. 停止ambari-server服务

ambari-server stop

2. 以debug的方式来启动ambari-server

java -server -Xdebug  -Xrunjdwp:transport=dt_socket,suspend=n,server=y,address=5005 -XX:NewRatio=3 -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -XX:CMSInitiatingOccupancyFraction=60 -XX:+CMSClassUnloadingEnabled -Dsun.zip.disableMemoryMapping=true -Xms1012m -Xmx3048m -XX:MaxPermSize=256m -Djava.security.auth.login.config=/etc/ambari-server/conf/krb5JAASLogin.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=false -cp /etc/ambari-server/conf:/usr/lib/ambari-server/*:/usr/share/java/mysql-connector-java-5.1.45-bin.jar     org.apache.ambari.server.controller.AmbariServer

如图所示:

3. 在IDEA中连接Ambari-Server

在你要运行的代码上打上断点,比如,我要看 http://172.16.0.142:8080/api/v1/users,就在user的代码某流程处打上断点:

点击 debug按钮,在XShell内输入: curl-u admin:admin http://172.16.0.142:8080/api/v1/users

debug模式下,这几个按钮比较常用。从上往下,从左往右描述,分别为:一键断点处、断点概览、取消全部断点、平行执行、跳入执行、跳出执行等。

当一个流程走通时,如果要关闭debug模式,只需要 Ctrl+c终止debug进程即可。这样ambari-server也就停掉了。

三、开发流程分析

以 GET /api/v1/users 为例进行。该接口用于获取所有用户。 资源请求类,一通百通。

四、开发流程示例

1. 编写实体类(Entity)

路径: src/main/java/org/apache/ambari/server/orm/entities/AuditlogEntity.java

对应的数据表为 auditlog_table,添加注解@Table、@Entity、@Id、@Column;添加属性:id和note;添加getter/setter方法。

这步的最后,还需要在 /src/main/resources/META-INF/persistence.xml里面添加 AuditlogEntity的信息。

2. 编写DAO层

路径: src/main/java/org/apache/ambari/server/orm/dao/AuditlogDAO.java

sql语句要遵循JPA规范。

createQuery()方法里的sql语句,为 SELECT auditlog FROMAuditlogEntityauditlog,其中AuditlogEntity要和实体Entity类名保持一致,前后的auditlog也要保持一致,这样最后程序会把sql语句翻译为 SELECT*FROM auditlog_table。如果要指定表的某一个属性,比如id,则id的表现形式为 auditlog.id

3. 编写service层

路径: src/main/java/org/apache/ambari/server/api/services/AuditlogService.java

3. 声明Resource类型

1) Resource.java

路径: org.apache.ambari.server.controller.spi.Resource.java

enum InternalType {
    ...,
    Auditlog,
    ...
}
final class Type implements Comparable<Type>{
    ...,
    public static final Type Auditlog = InternalType.Auditlog.getType();
    ...
}
2) AuditlogResourceDefinition.java

路径: org.apache.ambari.server.api.resources.AuditlogResourceDefinition.java

public class AuditlogResourceDefinition extends BaseResourceDefinition {

    /**
     * Constructor.
     *
     */
    public AuditlogResourceDefinition() {
        super(Resource.Type.Auditlog);
    }

    @Override
    public String getPluralName() {
        return "auditlogs";
    }

    @Override
    public String getSingularName() {
        return "auditlog";
    }
}
3) ResourceInstanceFactoryImpl.java

路径: org.apache.ambari.server.api.resources.ResourceInstanceFactoryImpl.java

case Auditlog:
    resourceDefinition = new AuditlogResourceDefinition();
    break;
4) AbstractControllerResourceProvider.java

路径: org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider.java

case Auditlog:
    return new AuditlogResourceProvider(propertyIds, keyPropertyIds, managementController);

4.在json文件内添加属性值

1) key_properties.json
# 路径 src/main/resources/key_properties.json
"Auditlog": {
    "Auditlog": "auditlog/id",
    "Stack": "auditlog/time",
    "User": "auditlog/user",
    "Host": "auditlog/status",
    "Task": "auditlog/operation",
    "Group": "auditlog/remoteIp",
    "Cluster": "auditlog/note"
},
# 说明:key值必须为Resource.type的值;value值为你想要的json的key值。
2) properties.json
# 路径 src/main/resources/properties.json
"Auditlog":[
    "auditlog/id",
    "auditlog/time",
    "auditlog/user",
    "auditlog/status",
    "auditlog/operation",
    "auditlog/remoteIp",
    "auditlog/note",
    "_"
],
# 说明:该数组的内容必须要少于等于key_properties.json的内容。

说明:key_properties.json的内容必须要包含properties.json的全部内容,在 ClusterControllerImpl.checkProperties()会做相应的判断。

5. 编写resourceProvider

路径: org.apache.ambari.server.controller.internal.AuditlogResourceProvider.java

1) 继承AbstractControllerResourceProvider类
public class AuditlogResourceProvider extends AbstractControllerResourceProvider {
    ...
}
2) 定义常量
private static final String AUDIT_ID_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "id");
private static final String AUDIT_TIME_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "time");
private static final String AUDIT_USER_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "user");
private static final String AUDIT_STATUS_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "status");
private static final String AUDIT_OPERATION_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "operation");
private static final String AUDIT_REMOTEIP_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "remoteIp");
private static final String AUDIT_NOTE_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "note");
private static Set<String> pkPropertyIds =
    new HashSet<String>(Arrays.asList(new String[]{AUDIT_ID_PROPERTY_ID}));
3) 创建构造器
AuditlogResourceProvider(Set<String> propertyIds,
                             Map<Resource.Type, String> keyPropertyIds,
                             AmbariManagementController managementController) {
    super(propertyIds, keyPropertyIds, managementController);

    setRequiredCreateAuthorizations(EnumSet.of(RoleAuthorization.AMBARI_MANAGE_USERS));
    setRequiredDeleteAuthorizations(EnumSet.of(RoleAuthorization.AMBARI_MANAGE_USERS));
}
4) getResource()
@Override
public Set<Resource> getResources(Request request, Predicate predicate)
    throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {

    final Set<AuditlogRequest> requests = new HashSet<AuditlogRequest>();

    if (predicate == null) {
        requests.add(getRequest(null));
    } else {
        for (Map<String, Object> propertyMap : getPropertyMaps(predicate)) {
            requests.add(getRequest(propertyMap));
        }
    }

    Set<AuditlogResponse> responses = getResources(new Command<Set<AuditlogResponse>>() {
        @Override
        public Set<AuditlogResponse> invoke() throws AmbariException, AuthorizationException {
            return getManagementController().getAuditlog(requests);
        }
    });

    if (LOG.isDebugEnabled()) {
        LOG.debug("Found clusters matching getClusters request"
                  + ", clusterResponseCount=" + responses.size());
    }

    Set<String>   requestedIds = getRequestPropertyIds(request, predicate);
    Set<Resource> resources    = new HashSet<Resource>();

    for (AuditlogResponse auditlogResponse : responses) {

        ResourceImpl resource = new ResourceImpl(Resource.Type.Auditlog);

        String note = auditlogResponse.getNote();
        String[] sourceNoteArray = note.split(", ");
        String time = sourceNoteArray[0].replace("T", " ").replaceAll("\\..*", "");
        String user = sourceNoteArray[1].replace("User(", "").replace(")", "");
        String status = note.split("Status\\(")[1].split("\\)")[0];

        boolean isOperation = note.contains("Operation");
        String operation = null;
        if (isOperation) {
            operation = note.split("Operation\\(")[1].split("\\),")[0];
        } else {
            operation = "";
        }

        boolean isRemoteIp = note.contains("RemoteIp");
        String remoteIp = null;
        if (isRemoteIp) {
            remoteIp = note.split("RemoteIp\\(")[1].split("\\)")[0];
        } else {
            remoteIp = "";
        }

        // 该方法是判断requestedIds是否包含AUDIT_ID_PROPERTY_ID,如果包含则返回true,将值存入resource中;反之则不存入。
        setResourceProperty(resource, AUDIT_ID_PROPERTY_ID,
                            auditlogResponse.getId(), requestedIds);
        setResourceProperty(resource, AUDIT_TIME_PROPERTY_ID,
                            time, requestedIds);
        setResourceProperty(resource, AUDIT_USER_PROPERTY_ID,
                            user, requestedIds);
        setResourceProperty(resource, AUDIT_STATUS_PROPERTY_ID,
                            status, requestedIds);
        setResourceProperty(resource, AUDIT_OPERATION_PROPERTY_ID,
                            operation, requestedIds);
        setResourceProperty(resource, AUDIT_REMOTEIP_PROPERTY_ID,
                            remoteIp, requestedIds);
        setResourceProperty(resource, AUDIT_NOTE_PROPERTY_ID,
                            note, requestedIds);

        resources.add(resource);
    }
    return resources;
}
5) 类内其他方法
/**
  * ----- ResourceProvider ------------------------------------------------
  */

@Override
protected Set<String> getPKPropertyIds() {
    return pkPropertyIds;
}

private AuditlogRequest getRequest(Map<String, Object> properties) {
    if (properties == null) {
        return new AuditlogRequest(null);
    }

    AuditlogRequest request = new AuditlogRequest (
        (String)properties.get(AUDIT_NOTE_PROPERTY_ID)
    );

    request.setId((Long) properties.get(AUDIT_ID_PROPERTY_ID));
    request.setNote((String) properties.get(AUDIT_NOTE_PROPERTY_ID));

    return request;
}

6. 新建AuditlogRequest类

路径: src/main/java/org/apache/ambari/server/controller/AuditlogRequest.java

7. 新建AuditlogResponse类

路径: src/main/java/org/apache/ambari/server/controller/AuditlogResponse.java

8. 添加getAuditlog()

# 在AmbariManagementController类内添加getAuditlog()
# 路径:org.apache.ambari.server.controller.AmbariManagementController.java
Set<AuditlogResponse> getAuditlog(Set<AuditlogRequest> requests)
    throws AmbariException, AuthorizationException;
# 在AmbariManagementControllerImpl类内实现getAuditlog()
# 路径:org.apache.ambari.server.controller.AmbariManagementControllerImpl.java
@Override
public Set<AuditlogResponse> getAuditlog(Set<AuditlogRequest> requests)
    throws AmbariException, AuthorizationException {
    Set<AuditlogResponse> responses = new HashSet<>();

    for (AuditlogRequest r : requests) {

        if (LOG.isDebugEnabled()) {
            LOG.debug("Received a getAuditlog request"
                      + ", auditlogRequest=" + r.toString());
        }

        String requestedNote = r.getNote();
        for (AuditlogEntity u : auditlogDAO.findAll()) {
            AuditlogResponse resp = new AuditlogResponse(u.getId(), u.getNote());
            responses.add(resp);
        }
    }
    return responses;
}

9. REST API展示形式

{
    href: "http://172.16.0.142:8080/api/v1/log/getAuditLog",
    items: [
        {
            href: "http://172.16.0.142:8080/api/v1/log/getAuditLog/2",
            auditlog: {
                id: 2,
                note: "2018-06-22T17:57:06.894-0700, User(admin), RemoteIp(172.16.0.167), Operation(User login), Roles( Ambari: 管理员 ), Status(Success)",
                operation: "User login",
                remoteIp: "172.16.0.167",
                status: "Success",
                time: "2018-06-22 17:57:06",
                user: "admin"
            }
        },
        {
            href: "http://172.16.0.142:8080/api/v1/log/getAuditLog/1855",
            auditlog: {
                id: 1855,
                note: "2018-07-12T18:51:00.846+0800, User(admin), RemoteIp(172.16.0.142), Operation(User login), Roles( Ambari: 管理员 ), Status(Success)",
                operation: "User login",
                remoteIp: "172.16.0.142",
                status: "Success",
                time: "2018-07-12 18:51:00",
                user: "admin"
            }
        }
    ]
}

本文分享自微信公众号 - 大数据实战演练(gh_f942bfc92d26)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-11-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励