前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink源码实战(一)】给Flink增加一个REST API

【Flink源码实战(一)】给Flink增加一个REST API

原创
作者头像
皮皮熊
修改2021-06-10 13:26:20
3.2K0
修改2021-06-10 13:26:20
举报

本文参考了flink committer tison的文章,基于flink 1.13版本源码改动实现。

一、概述

https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/

Flink官方实现了大量的REST API接口,有用于Flink UI展示数据、也用于各自监控面板。这些REST API的webserver作为JobManager的一部分在运行。默认端口是8081,可以通过flink-conf.yamlrest.port参数进行配置。

在有多个JobManager的情况下(HA场景下),每个JobManager将运行自己的REST API实例,而由被选为leader的JobManager实例提供有关已完成和正在运行的作业的信息。

二、开发指南

REST API 位于flink-runtime项目下,核心实现org.apache.flink.runtime.webmonitor.WebMonitorEndpoint (因为Flink早期REST API都是用于监控,所以命名是WebMonitorEndpoint。现在其工作职能还包含一些任务启停等非监控场景),其主要是负责server实现和请求路由。

image.png
image.png

(主要:2个pierre package是笔者下面自定义REST API的地方)

当然Flink REST API实现是基于NettyNetty Router ,因为实现比较轻量,所以性能还是比较好的。

而完整的REST API则需要这四大模块:

image.png
image.png

三、开发自己的REST API!

0、设计与规划

1)需求

向http链接 http://${jobmaster-host}:8081/pierre/foo 发起get请求,返回一个json串{"response":"bar"}

2)实现规划

当我们要新增加一个REST API的时候,我们至少需要:

  • 实现一个MessageHeaders,作为新请求的接口
  • 实现一个ResponseBody,作为返回结果的Body
  • 实现一个AbstractRestHandler,根据添加的MessageHeaders类处理请求
  • 将handler注册到org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()

1、实现MessageHeaders

代码语言:txt
复制
package org.apache.flink.runtime.rest.messages.pierre;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

public class FooHeaders
        implements MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {

    // 单实例模式
    private static final FooHeaders INSTANCE = new FooHeaders();

    public static FooHeaders getInstance() {
        return INSTANCE;
    }

    @Override
    public Class<BarResponseBody> getResponseClass() {
        return BarResponseBody.class;
    }

    @Override
    public HttpResponseStatus HttpResponseStatus() {
        return HttpResponseStatus.OK;
    }

    @Override
    public String getDescription() {
        return "pierre foobar service";
    }

    @Override
    public Class<EmptyRequestBody> getRequestClass() {
        return EmptyRequestBody.class;
    }

    // 解析url里面的参数
    @Override
    public EmptyMessageParameters getUnresolvedMessageParameters() {
        return EmptyMessageParameters.getInstance();
    }

    @Override
    public HttpMethodWrapper getHttpMethod() {
        return HttpMethodWrapper.GET;
    }

	// URL路由信息
    @Override
    public String getTargetRestEndpointURL() {
        return "/pierre/foo";
    }
}

这里注意:

  • 必须是单实例模式
  • HttpResponseStatusgetResponseClass等均不能return null,否则会有NullPointerException
    image.png
    image.png

2、实现ResponseBody

代码语言:txt
复制
package org.apache.flink.runtime.rest.messages.pierre;

import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

public class BarResponseBody implements ResponseBody {
    private static final String FIELD_BAR = "response";

    @JsonProperty(FIELD_BAR)
    public final String response = "bar";

    private static final BarResponseBody INSTANCE = new BarResponseBody();

    public static BarResponseBody getInstance() {
        return INSTANCE;
    }
}

这里使用到了 jackson注解,需要import FLINK shaded的版本,避免冲突。

3、实现`AbstractRestHandler

代码语言:txt
复制
package org.apache.flink.runtime.rest.handler.pierre;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.pierre.BarResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class FooHandler
        extends AbstractRestHandler<
                RestfulGateway, EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {

    public FooHandler(
            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
            Time timeout,
            Map<String, String> responseHeaders,
            MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters>
                    messageHeaders) {
        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
    }

    @Override
    protected CompletableFuture<BarResponseBody> handleRequest(
            @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
            @Nonnull RestfulGateway gateway)
            throws RestHandlerException {
        return CompletableFuture.completedFuture(BarResponseBody.getInstance());
    }
}

4、注册handler

代码语言:txt
复制
    // 自己的handler
    final FooHandler fooHandler =
                new FooHandler(leaderRetriever, timeout, responseHeaders, FooHeaders.getInstance());
    ……
	handlers.add(Tuple2.of(fooHandler.getMessageHeaders(), fooHandler));

5、编译打包

初次改造Flink代码,不是特别熟悉,列了一下步骤供大家参考:

  • maven-checkstyle-pluginfailOnViolation设置为false,因为我们的一些小改动不完全符合flink的代码工程规范。当然如果是要给Flink正式贡献代码,肯定还是要符合规范的。
  • mvn spotless:apply 会自动进行代码格式化的工作
  • mvn clean package -DskipTests 进入漫长的package中

预计十分钟:flink-dist/target 目录下即可生成最新的可执行文件

image.png
image.png

六、效果

  • 启动一个本地的loacl集群 ./bin/start-cluster.sh
  • 请求http://${jobmaster-host}:8081/pierre/foo
image.png
image.png

大功告成,完美第一步!

更多精彩:https://github.com/pierre94/flink-notes

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
  • 二、开发指南
  • 三、开发自己的REST API!
    • 0、设计与规划
      • 1)需求
      • 2)实现规划
    • 1、实现MessageHeaders
      • 2、实现ResponseBody
        • 3、实现`AbstractRestHandler
          • 4、注册handler
            • 5、编译打包
              • 六、效果
              相关产品与服务
              流计算 Oceanus
              流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档