前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的log.file配置

聊聊flink的log.file配置

原创
作者头像
code4it
修改2018-11-23 14:57:17
5.5K0
修改2018-11-23 14:57:17
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的log.file配置

log4j.properties

flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file
​
# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO
​
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
​
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
​
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  • 这里使用log.file这个系统属性配置log4j.appender.file.file

MiniCluster

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java

    /**
     * Starts the mini cluster, based on the configured properties.
     *
     * @throws Exception This method passes on any exception that occurs during the startup of
     *                   the mini cluster.
     */
    public void start() throws Exception {
        synchronized (lock) {
            checkState(!running, "FlinkMiniCluster is already running");
​
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", miniClusterConfiguration);
​
            final Configuration configuration = miniClusterConfiguration.getConfiguration();
            final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
            final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
            final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
​
            try {
                initializeIOFormatClasses(configuration);
​
                LOG.info("Starting Metrics Registry");
                metricRegistry = createMetricRegistry(configuration);
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
                    metricRegistry,
                    "localhost");
​
                final RpcService jobManagerRpcService;
                final RpcService resourceManagerRpcService;
                final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
​
                // bring up all the RPC services
                LOG.info("Starting RPC Service(s)");
​
                // we always need the 'commonRpcService' for auxiliary calls
                commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
​
                // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
                final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
                metricRegistry.startQueryService(actorSystem, null);
​
                if (useSingleRpcService) {
                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = commonRpcService;
                    }
​
                    jobManagerRpcService = commonRpcService;
                    resourceManagerRpcService = commonRpcService;
​
                    this.resourceManagerRpcService = null;
                    this.jobManagerRpcService = null;
                    this.taskManagerRpcServices = null;
                }
                else {
                    // start a new service per component, possibly with custom bind addresses
                    final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
                    final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
                    final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
​
                    jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
                    resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
​
                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = createRpcService(
                                configuration, rpcTimeout, true, taskManagerBindAddress);
                    }
​
                    this.jobManagerRpcService = jobManagerRpcService;
                    this.taskManagerRpcServices = taskManagerRpcServices;
                    this.resourceManagerRpcService = resourceManagerRpcService;
                }
​
                // create the high-availability services
                LOG.info("Starting high-availability services");
                haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                    configuration,
                    commonRpcService.getExecutor());
​
                blobServer = new BlobServer(configuration, haServices.createBlobStore());
                blobServer.start();
​
                heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
​
                // bring up the ResourceManager(s)
                LOG.info("Starting ResourceManger");
                resourceManagerRunner = startResourceManager(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    resourceManagerRpcService,
                    new ClusterInformation("localhost", blobServer.getPort()),
                    jobManagerMetricGroup);
​
                blobCacheService = new BlobCacheService(
                    configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
                );
​
                // bring up the TaskManager(s) for the mini cluster
                LOG.info("Starting {} TaskManger(s)", numTaskManagers);
                taskManagers = startTaskManagers(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    blobCacheService,
                    numTaskManagers,
                    taskManagerRpcServices);
​
                // starting the dispatcher rest endpoint
                LOG.info("Starting dispatcher rest endpoint.");
​
                dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    DispatcherGateway.class,
                    DispatcherId::fromUuid,
                    20,
                    Time.milliseconds(20L));
                final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    ResourceManagerGateway.class,
                    ResourceManagerId::fromUuid,
                    20,
                    Time.milliseconds(20L));
​
                this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
                    RestServerEndpointConfiguration.fromConfiguration(configuration),
                    dispatcherGatewayRetriever,
                    configuration,
                    RestHandlerConfiguration.fromConfiguration(configuration),
                    resourceManagerGatewayRetriever,
                    blobServer.getTransientBlobService(),
                    WebMonitorEndpoint.createExecutorService(
                        configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
                        configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                        "DispatcherRestEndpoint"),
                    new AkkaQueryServiceRetriever(
                        actorSystem,
                        Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                    haServices.getWebMonitorLeaderElectionService(),
                    new ShutDownFatalErrorHandler());
​
                dispatcherRestEndpoint.start();
​
                restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
​
                // bring up the dispatcher that launches JobManagers when jobs submitted
                LOG.info("Starting job dispatcher(s) for JobManger");
​
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
​
                final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
​
                dispatcher = new StandaloneDispatcher(
                    jobManagerRpcService,
                    Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
                    configuration,
                    haServices,
                    resourceManagerRunner.getResourceManageGateway(),
                    blobServer,
                    heartbeatServices,
                    jobManagerMetricGroup,
                    metricRegistry.getMetricQueryServicePath(),
                    new MemoryArchivedExecutionGraphStore(),
                    Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                    new ShutDownFatalErrorHandler(),
                    dispatcherRestEndpoint.getRestBaseUrl(),
                    historyServerArchivist);
​
                dispatcher.start();
​
                resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
                dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
​
                resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
                dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
            }
            catch (Exception e) {
                // cleanup everything
                try {
                    close();
                } catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }
​
            // create a new termination future
            terminationFuture = new CompletableFuture<>();
​
            // now officially mark this as running
            running = true;
​
            LOG.info("Flink Mini Cluster started successfully");
        }
    }
  • 这里先创建了metricRegistry、commonRpcService、jobManagerRpcService、resourceManagerRpcService、haServices、blobServer、heartbeatServices、resourceManagerRunner、blobCacheService、taskManagers、dispatcherGatewayRetriever、dispatcherRestEndpoint、dispatcher、dispatcherLeaderRetriever

RestServerEndpoint

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/RestServerEndpoint.java

    /**
     * Starts this REST server endpoint.
     *
     * @throws Exception if we cannot start the RestServerEndpoint
     */
    public final void start() throws Exception {
        synchronized (lock) {
            Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
​
            log.info("Starting rest endpoint.");
​
            final Router router = new Router();
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
​
            List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);
​
            /* sort the handlers such that they are ordered the following:
             * /jobs
             * /jobs/overview
             * /jobs/:jobid
             * /jobs/:jobid/config
             * /:*
             */
            Collections.sort(
                handlers,
                RestHandlerUrlComparator.INSTANCE);
​
            handlers.forEach(handler -> {
                log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
                registerHandler(router, handler);
            });
​
            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
​
                @Override
                protected void initChannel(SocketChannel ch) {
                    RouterHandler handler = new RouterHandler(router, responseHeaders);
​
                    // SSL should be the first handler in the pipeline
                    if (sslEngineFactory != null) {
                        ch.pipeline().addLast("ssl",
                            new RedirectingSslHandler(restAddress, restAddressFuture, sslEngineFactory));
                    }
​
                    ch.pipeline()
                        .addLast(new HttpServerCodec())
                        .addLast(new FileUploadHandler(uploadDir))
                        .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
                        .addLast(new ChunkedWriteHandler())
                        .addLast(handler.getName(), handler)
                        .addLast(new PipelineErrorHandler(log, responseHeaders));
                }
            };
​
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker"));
​
            bootstrap = new ServerBootstrap();
            bootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(initializer);
​
            log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
            final ChannelFuture channel;
            if (restBindAddress == null) {
                channel = bootstrap.bind(restBindPort);
            } else {
                channel = bootstrap.bind(restBindAddress, restBindPort);
            }
            serverChannel = channel.syncUninterruptibly().channel();
​
            final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
            final String advertisedAddress;
            if (bindAddress.getAddress().isAnyLocalAddress()) {
                advertisedAddress = this.restAddress;
            } else {
                advertisedAddress = bindAddress.getAddress().getHostAddress();
            }
            final int port = bindAddress.getPort();
​
            log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
​
            final String protocol;
​
            if (sslEngineFactory != null) {
                protocol = "https://";
            } else {
                protocol = "http://";
            }
​
            restBaseUrl = protocol + advertisedAddress + ':' + port;
​
            restAddressFuture.complete(restBaseUrl);
​
            state = State.RUNNING;
​
            startInternal();
        }
    }
  • 这里调用了initializeHandlers来获取ChannelInboundHandler,initializeHandlers在子类DispatcherRestEndpoint中有实现

DispatcherRestEndpoint

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java

    @Override
    protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
        List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture);
​
        // Add the Dispatcher specific handlers
​
        final Time timeout = restConfiguration.getTimeout();
​
        JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
            restAddressFuture,
            leaderRetriever,
            timeout,
            responseHeaders,
            executor,
            clusterConfiguration);
​
        if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
            try {
                webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
                    leaderRetriever,
                    restAddressFuture,
                    timeout,
                    responseHeaders,
                    uploadDir,
                    executor,
                    clusterConfiguration);
​
                // register extension handlers
                handlers.addAll(webSubmissionExtension.getHandlers());
            } catch (FlinkException e) {
                if (log.isDebugEnabled()) {
                    log.debug("Failed to load web based job submission extension.", e);
                } else {
                    log.info("Failed to load web based job submission extension. " +
                        "Probable reason: flink-runtime-web is not in the classpath.");
                }
            }
        } else {
            log.info("Web-based job submission is not enabled.");
        }
​
        handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
​
        return handlers;
    }
  • 这里首先调用了父类的initializeHandlers,这里的父类为WebMonitorEndpoint(它是RestServerEndpoint的直接子类,而DispatcherRestEndpoint又继承了WebMonitorEndpoint)

WebMonitorEndpoint

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java

    @Override
    protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
        ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
​
        final Time timeout = restConfiguration.getTimeout();
​
        //......
​
        // TODO: Remove once the Yarn proxy can forward all REST verbs
        handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler));
        handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler));
​
        handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler));
​
        //......
​
        // load the log and stdout file handler for the main cluster component
        final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration);
​
        final ChannelInboundHandler logFileHandler = createStaticFileHandler(
            restAddressFuture,
            timeout,
            logFileLocation.logFile);
​
        final ChannelInboundHandler stdoutFileHandler = createStaticFileHandler(
            restAddressFuture,
            timeout,
            logFileLocation.stdOutFile);
​
        handlers.add(Tuple2.of(LogFileHandlerSpecification.getInstance(), logFileHandler));
        handlers.add(Tuple2.of(StdoutFileHandlerSpecification.getInstance(), stdoutFileHandler));
​
        // TaskManager log and stdout file handler
​
        final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval());
​
        final TaskManagerLogFileHandler taskManagerLogFileHandler = new TaskManagerLogFileHandler(
            restAddressFuture,
            leaderRetriever,
            timeout,
            responseHeaders,
            TaskManagerLogFileHeaders.getInstance(),
            resourceManagerRetriever,
            transientBlobService,
            cacheEntryDuration);
​
        final TaskManagerStdoutFileHandler taskManagerStdoutFileHandler = new TaskManagerStdoutFileHandler(
            restAddressFuture,
            leaderRetriever,
            timeout,
            responseHeaders,
            TaskManagerStdoutFileHeaders.getInstance(),
            resourceManagerRetriever,
            transientBlobService,
            cacheEntryDuration);
​
        handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
        handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));
​
        //......
​
    }
​
    @Nonnull
    private ChannelInboundHandler createStaticFileHandler(
            CompletableFuture<String> restAddressFuture,
            Time timeout,
            File fileToServe) {
​
        if (fileToServe == null) {
            return new ConstantTextHandler("(file unavailable)");
        } else {
            try {
                return new StaticFileServerHandler<>(
                    leaderRetriever,
                    restAddressFuture,
                    timeout,
                    fileToServe);
            } catch (IOException e) {
                log.info("Cannot load log file handler.", e);
                return new ConstantTextHandler("(log file unavailable)");
            }
        }
    }
  • 它初始化了一系列的ChannelInboundHandler,然后注册到handlers中
  • 对于JobManager的FileHandler,它先调用了WebMonitorUtils.LogFileLocation.find(clusterConfiguration),构建了logFileLocation,之后使用logFileLocation.logFile及logFileLocation.stdOutFile分别构造了logFileHandler、stdoutFileHandler,分别用于处理log及stdout文件的下载
  • 对于TaskManager的FileHandler,分别构造了TaskManagerLogFileHandler以及TaskManagerStdoutFileHandler来处理log及stdout文件的下载

JobManager FileHandler

WebMonitorUtils.LogFileLocation.find

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java

    /**
     * Singleton to hold the log and stdout file.
     */
    public static class LogFileLocation {
​
        public final File logFile;
        public final File stdOutFile;
​
        private LogFileLocation(File logFile, File stdOutFile) {
            this.logFile = logFile;
            this.stdOutFile = stdOutFile;
        }
​
        /**
         * Finds the Flink log directory using log.file Java property that is set during startup.
         */
        public static LogFileLocation find(Configuration config) {
            final String logEnv = "log.file";
            String logFilePath = System.getProperty(logEnv);
​
            if (logFilePath == null) {
                LOG.warn("Log file environment variable '{}' is not set.", logEnv);
                logFilePath = config.getString(WebOptions.LOG_PATH);
            }
​
            // not configured, cannot serve log files
            if (logFilePath == null || logFilePath.length() < 4) {
                LOG.warn("JobManager log files are unavailable in the web dashboard. " +
                    "Log file location not found in environment variable '{}' or configuration key '{}'.",
                    logEnv, WebOptions.LOG_PATH);
                return new LogFileLocation(null, null);
            }
​
            String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out");
​
            LOG.info("Determined location of main cluster component log file: {}", logFilePath);
            LOG.info("Determined location of main cluster component stdout file: {}", outFilePath);
​
            return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath));
        }
​
        /**
         * Verify log file location.
         *
         * @param logFilePath Path to log file
         * @return File or null if not a valid log file
         */
        private static File resolveFileLocation(String logFilePath) {
            File logFile = new File(logFilePath);
            return (logFile.exists() && logFile.canRead()) ? logFile : null;
        }
    }
  • 这里先从系统属性读取log.file属性,没有找到,则打印warning(Log file environment variable 'log.file' is not set.)
  • log.file没有配置的话,则从flink的Configuration读取WebOptions.LOG_PATH(web.log.path)配置,如果没有或者logFilePath.length()小于4,则打印warning(JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'.)
  • 这里之所以要logFilePath.length()大于等于4,主要是后面要使用logFilePath.substring(0, logFilePath.length() - 3).concat("out")来构建outFilePath;然后通过resolveFileLocation方法校验logFilePath及outFilePath,构建LogFileLocation返回

StaticFileServerHandler

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java

/**
 * Simple file server handler that serves requests to web frontend's static files, such as
 * HTML, CSS, or JS files.
 *
 * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
 * example.</p>
 */
@ChannelHandler.Sharable
public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {
​
    /** Timezone in which this server answers its "if-modified" requests. */
    private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");
​
    /** Date format for HTTP. */
    public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
​
    /** Be default, we allow files to be cached for 5 minutes. */
    private static final int HTTP_CACHE_SECONDS = 300;
​
    // ------------------------------------------------------------------------
​
    /** The path in which the static documents are. */
    private final File rootPath;
​
    public StaticFileServerHandler(
            GatewayRetriever<? extends T> retriever,
            CompletableFuture<String> localJobManagerAddressFuture,
            Time timeout,
            File rootPath) throws IOException {
​
        super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap());
​
        this.rootPath = checkNotNull(rootPath).getCanonicalFile();
    }
​
    // ------------------------------------------------------------------------
    //  Responses to requests
    // ------------------------------------------------------------------------
​
    @Override
    protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception {
        final HttpRequest request = routedRequest.getRequest();
        final String requestPath;
​
        // make sure we request the "index.html" in case there is a directory request
        if (routedRequest.getPath().endsWith("/")) {
            requestPath = routedRequest.getPath() + "index.html";
        }
        // in case the files being accessed are logs or stdout files, find appropriate paths.
        else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) {
            requestPath = "";
        } else {
            requestPath = routedRequest.getPath();
        }
​
        respondToRequest(channelHandlerContext, request, requestPath);
    }
​
    //......
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (ctx.channel().isActive()) {
            logger.error("Caught exception", cause);
            sendError(ctx, INTERNAL_SERVER_ERROR);
        }
    }
}
  • 对于/jobmanager/log以及/jobmanager/stdout它会重置一下requestPath,之后调用respondToRequest处理,它根据rootPath来传输文件

TaskManager FileHandler

TaskManagerLogFileHandler

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogFileHandler.java

/**
 * Rest handler which serves the log files from {@link TaskExecutor}.
 */
public class TaskManagerLogFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
​
    public TaskManagerLogFileHandler(
            @Nonnull CompletableFuture<String> localAddressFuture,
            @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
            @Nonnull Time timeout,
            @Nonnull Map<String, String> responseHeaders,
            @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
            @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
            @Nonnull TransientBlobService transientBlobService,
            @Nonnull Time cacheEntryDuration) {
        super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
    }
​
    @Override
    protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
        return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.LOG, timeout);
    }
}
  • 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.LOG类型

TaskManagerStdoutFileHandler.requestFileUpload

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerStdoutFileHandler.java

/**
 * Rest handler which serves the stdout file of the {@link TaskExecutor}.
 */
public class TaskManagerStdoutFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
​
    public TaskManagerStdoutFileHandler(
            @Nonnull CompletableFuture<String> localAddressFuture,
            @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
            @Nonnull Time timeout,
            @Nonnull Map<String, String> responseHeaders,
            @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
            @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
            @Nonnull TransientBlobService transientBlobService,
            @Nonnull Time cacheEntryDuration) {
        super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
    }
​
    @Override
    protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
        return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.STDOUT, timeout);
    }
}
  • 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.STDOUT类型

ResourceManager.requestTaskManagerFileUpload

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/resourcemanager/ResourceManager.java

    @Override
    public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time timeout) {
        log.debug("Request file {} upload from TaskExecutor {}.", fileType, taskManagerId);
​
        final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);
​
        if (taskExecutor == null) {
            log.debug("Requested file {} upload from unregistered TaskExecutor {}.", fileType, taskManagerId);
            return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
        } else {
            return taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout);
        }
    }
  • ResourceManager的requestTaskManagerFileUpload是通过TaskExecutor.requestFileUpload来实现的

TaskExecutor.requestFileUpload

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

    @Override
    public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) {
        log.debug("Request file {} upload.", fileType);
​
        final String filePath;
​
        switch (fileType) {
            case LOG:
                filePath = taskManagerConfiguration.getTaskManagerLogPath();
                break;
            case STDOUT:
                filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
                break;
            default:
                filePath = null;
        }
​
        if (filePath != null && !filePath.isEmpty()) {
            final File file = new File(filePath);
​
            if (file.exists()) {
                final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
                final TransientBlobKey transientBlobKey;
                try (FileInputStream fileInputStream = new FileInputStream(file)) {
                    transientBlobKey = transientBlobService.putTransient(fileInputStream);
                } catch (IOException e) {
                    log.debug("Could not upload file {}.", fileType, e);
                    return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
                }
​
                return CompletableFuture.completedFuture(transientBlobKey);
            } else {
                log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
                return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
            }
        } else {
            log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID());
            return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor."));
        }
    }
  • TaskExecutor的requestFileUpload会根据fileType来获取filePath,如果是LOG类型取的是taskManagerConfiguration.getTaskManagerLogPath();如果是STDOUT类型,取的是taskManagerConfiguration.getTaskManagerStdoutPath(),之后将文件传输过去

TaskManagerRunner.startTaskManager

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

    public static TaskExecutor startTaskManager(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            FatalErrorHandler fatalErrorHandler) throws Exception {
​
        checkNotNull(configuration);
        checkNotNull(resourceID);
        checkNotNull(rpcService);
        checkNotNull(highAvailabilityServices);
​
        LOG.info("Starting TaskManager with ResourceID: {}", resourceID);
​
        InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());
​
        TaskManagerServicesConfiguration taskManagerServicesConfiguration =
            TaskManagerServicesConfiguration.fromConfiguration(
                configuration,
                remoteAddress,
                localCommunicationOnly);
​
        TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
            taskManagerServicesConfiguration,
            resourceID,
            rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
            EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
            EnvironmentInformation.getMaxJvmHeapMemory());
​
        TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
            metricRegistry,
            taskManagerServices.getTaskManagerLocation(),
            taskManagerServices.getNetworkEnvironment());
​
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
​
        return new TaskExecutor(
            rpcService,
            taskManagerConfiguration,
            highAvailabilityServices,
            taskManagerServices,
            heartbeatServices,
            taskManagerMetricGroup,
            blobCacheService,
            fatalErrorHandler);
    }
  • TaskManagerRunner.startTaskManager通过TaskManagerConfiguration.fromConfiguration(configuration)构造了taskManagerConfiguration

TaskManagerConfiguration.fromConfiguration

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

    public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
        int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
​
        if (numberSlots == -1) {
            numberSlots = 1;
        }
​
        //......
​
        final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
        final String taskManagerStdoutPath;
​
        if (taskManagerLogPath != null) {
            final int extension = taskManagerLogPath.lastIndexOf('.');
​
            if (extension > 0) {
                taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
            } else {
                taskManagerStdoutPath = null;
            }
        } else {
            taskManagerStdoutPath = null;
        }
​
        return new TaskManagerConfiguration(
            numberSlots,
            tmpDirPaths,
            timeout,
            finiteRegistrationDuration,
            initialRegistrationPause,
            maxRegistrationPause,
            refusedRegistrationPause,
            configuration,
            exitOnOom,
            FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
            alwaysParentFirstLoaderPatterns,
            taskManagerLogPath,
            taskManagerStdoutPath);
    }
  • TaskManagerConfiguration.fromConfiguration里头首先根据ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(taskmanager.log.path)从flink的Configuration读取taskManagerLogPath,如果读取不到,则取系统属性log.file;如果读取到taskManagerLogPath不为null,则换个后缀构建taskManagerStdoutPath

小结

  • flink的log4j.properties里头配置了file appender,使用了系统属性log.file
  • flink的MiniCluster在start的时候会创建DispatcherRestEndpoint,它的start方法会使用initializeHandlers来初始化一系列的handlers,对于JobManager的fileHandler,使用WebMonitorUtils.LogFileLocation.find(clusterConfiguration)获取logFileLocation,它先从系统属性读取log.file属性,没有找到的话再从flink的Configuration读取WebOptions.LOG_PATH(web.log.path)配置;之后分别使用logFileLocation.logFile及logFileLocation.stdOutFile创建了两个StaticFileServerHandler
  • 对于TaskManager的fileHandler,则分别创建了TaskManagerLogFileHandler及TaskManagerStdoutFileHandler来处理log及stdout文件的下载,它们内部都是调用了ResourceManager.requestTaskManagerFileUpload方法,只是fileType不同,一个是LOG,一个是STDOUT;而ResourceManager.requestTaskManagerFileUpload方法最后是通过TaskExecutor.requestFileUpload来完成文件传输;TaskManagerRunner.startTaskManager在创建TaskExecutor的时候,构造了TaskManagerConfiguration,它里头先从flink的Configuration获取ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(taskmanager.log.path),如果没有则取系统属性log.file

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • log4j.properties
  • MiniCluster
  • RestServerEndpoint
    • DispatcherRestEndpoint
      • WebMonitorEndpoint
      • JobManager FileHandler
        • WebMonitorUtils.LogFileLocation.find
          • StaticFileServerHandler
          • TaskManager FileHandler
            • TaskManagerLogFileHandler
              • TaskManagerStdoutFileHandler.requestFileUpload
                • ResourceManager.requestTaskManagerFileUpload
                  • TaskExecutor.requestFileUpload
                    • TaskManagerRunner.startTaskManager
                      • TaskManagerConfiguration.fromConfiguration
                      • 小结
                      • doc
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档