本文参考了flink committer tison的文章,基于flink 1.13版本源码改动实现。
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/
Flink官方实现了大量的REST API接口,有用于Flink UI展示数据、也用于各自监控面板。这些REST API的webserver作为JobManager
的一部分在运行。默认端口是8081,可以通过flink-conf.yaml
的rest.port
参数进行配置。
在有多个JobManager
的情况下(HA场景下),每个JobManager
将运行自己的REST API实例,而由被选为leader的JobManager
实例提供有关已完成和正在运行的作业的信息。
REST API 位于flink-runtime
项目下,核心实现org.apache.flink.runtime.webmonitor.WebMonitorEndpoint
(因为Flink早期REST API都是用于监控,所以命名是WebMonitorEndpoint。现在其工作职能还包含一些任务启停等非监控场景),其主要是负责server实现和请求路由。
(主要:2个pierre package是笔者下面自定义REST API的地方)
当然Flink REST API实现是基于Netty
和Netty Router
,因为实现比较轻量,所以性能还是比较好的。
而完整的REST API则需要这四大模块:
向http链接 http://${jobmaster-host}:8081/pierre/foo
发起get请求,返回一个json串{"response":"bar"}
当我们要新增加一个REST API的时候,我们至少需要:
MessageHeaders
,作为新请求的接口ResponseBody
,作为返回结果的BodyAbstractRestHandler
,根据添加的MessageHeaders类处理请求org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()
MessageHeaders
package org.apache.flink.runtime.rest.messages.pierre;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
public class FooHeaders
implements MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {
// 单实例模式
private static final FooHeaders INSTANCE = new FooHeaders();
public static FooHeaders getInstance() {
return INSTANCE;
}
@Override
public Class<BarResponseBody> getResponseClass() {
return BarResponseBody.class;
}
@Override
public HttpResponseStatus HttpResponseStatus() {
return HttpResponseStatus.OK;
}
@Override
public String getDescription() {
return "pierre foobar service";
}
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}
// 解析url里面的参数
@Override
public EmptyMessageParameters getUnresolvedMessageParameters() {
return EmptyMessageParameters.getInstance();
}
@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}
// URL路由信息
@Override
public String getTargetRestEndpointURL() {
return "/pierre/foo";
}
}
这里注意:
HttpResponseStatus
、getResponseClass
等均不能return null,否则会有NullPointerException
ResponseBody
package org.apache.flink.runtime.rest.messages.pierre;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
public class BarResponseBody implements ResponseBody {
private static final String FIELD_BAR = "response";
@JsonProperty(FIELD_BAR)
public final String response = "bar";
private static final BarResponseBody INSTANCE = new BarResponseBody();
public static BarResponseBody getInstance() {
return INSTANCE;
}
}
这里使用到了 jackson注解,需要import FLINK shaded的版本,避免冲突。
package org.apache.flink.runtime.rest.handler.pierre;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.pierre.BarResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class FooHandler
extends AbstractRestHandler<
RestfulGateway, EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {
public FooHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters>
messageHeaders) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
}
@Override
protected CompletableFuture<BarResponseBody> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
@Nonnull RestfulGateway gateway)
throws RestHandlerException {
return CompletableFuture.completedFuture(BarResponseBody.getInstance());
}
}
// 自己的handler
final FooHandler fooHandler =
new FooHandler(leaderRetriever, timeout, responseHeaders, FooHeaders.getInstance());
……
handlers.add(Tuple2.of(fooHandler.getMessageHeaders(), fooHandler));
初次改造Flink代码,不是特别熟悉,列了一下步骤供大家参考:
maven-checkstyle-plugin
的failOnViolation
设置为false
,因为我们的一些小改动不完全符合flink的代码工程规范。当然如果是要给Flink正式贡献代码,肯定还是要符合规范的。mvn spotless:apply
会自动进行代码格式化的工作mvn clean package -DskipTests
进入漫长的package中预计十分钟:flink-dist/target
目录下即可生成最新的可执行文件
./bin/start-cluster.sh
http://${jobmaster-host}:8081/pierre/foo
大功告成,完美第一步!
更多精彩:https://github.com/pierre94/flink-notes
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。