前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink taskmanager的data.port与rpc.port

聊聊flink taskmanager的data.port与rpc.port

原创
作者头像
code4it
发布2019-03-11 10:21:14
1.8K0
发布2019-03-11 10:21:14
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink taskmanager的data.port与rpc.port

TaskManagerServices

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

代码语言:javascript
复制
public class TaskManagerServices {
    //......
​
    public static TaskManagerServices fromConfiguration(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            ResourceID resourceID,
            Executor taskIOExecutor,
            long freeHeapMemoryWithDefrag,
            long maxJvmHeapMemory) throws Exception {
​
        // pre-start checks
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
​
        final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
        network.start();
​
        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
            resourceID,
            taskManagerServicesConfiguration.getTaskManagerAddress(),
            network.getConnectionManager().getDataPort());
​
        // this call has to happen strictly after the network stack has been initialized
        final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);
​
        // start the I/O manager, it will create some temp directories.
        final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
​
        final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
​
        final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
​
        for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
            resourceProfiles.add(ResourceProfile.ANY);
        }
​
        final TimerService<AllocationID> timerService = new TimerService<>(
            new ScheduledThreadPoolExecutor(1),
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
​
        final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
​
        final JobManagerTable jobManagerTable = new JobManagerTable();
​
        final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
​
        final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
​
        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
​
        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }
​
        final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
            stateRootDirectoryFiles,
            taskIOExecutor);
​
        return new TaskManagerServices(
            taskManagerLocation,
            memoryManager,
            ioManager,
            network,
            broadcastVariableManager,
            taskSlotTable,
            jobManagerTable,
            jobLeaderService,
            taskStateManager);
    }
​
    private static NetworkEnvironment createNetworkEnvironment(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            long maxJvmHeapMemory) {
​
        NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();
​
        final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
        int segmentSize = networkEnvironmentConfiguration.networkBufferSize();
​
        // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
        final long numNetBuffersLong = networkBuf / segmentSize;
        if (numNetBuffersLong > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
                + ") corresponds to more than MAX_INT pages.");
        }
​
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(
            (int) numNetBuffersLong,
            segmentSize);
​
        ConnectionManager connectionManager;
        boolean enableCreditBased = false;
        NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
        if (nettyConfig != null) {
            connectionManager = new NettyConnectionManager(nettyConfig);
            enableCreditBased = nettyConfig.isCreditBasedEnabled();
        } else {
            connectionManager = new LocalConnectionManager();
        }
​
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
​
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
​
        QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
​
        int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
​
        int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
​
        final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getProxyPortRange(),
                numProxyServerNetworkThreads,
                numProxyServerQueryThreads,
                new DisabledKvStateRequestStats());
​
        int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
​
        int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
​
        final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getStateServerPortRange(),
                numStateServerNetworkThreads,
                numStateServerQueryThreads,
                kvStateRegistry,
                new DisabledKvStateRequestStats());
​
        // we start the network first, to make sure it can allocate its buffers first
        return new NetworkEnvironment(
            networkBufferPool,
            connectionManager,
            resultPartitionManager,
            taskEventDispatcher,
            kvStateRegistry,
            kvStateServer,
            kvClientProxy,
            networkEnvironmentConfiguration.ioMode(),
            networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
            networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
            networkEnvironmentConfiguration.networkBuffersPerChannel(),
            networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
            enableCreditBased);
    }
​
    //......
}
  • TaskManagerServices的fromConfiguration方法从taskManagerServicesConfiguration读取配置,然后创建NetworkEnvironment,之后创建TaskManagerLocation用到了NetworkEnvironment.getConnectionManager().getDataPort()
  • TaskExecutorToResourceManagerConnection及ConnectionID均从TaskManagerLocation获取了dataPort信息
  • createNetworkEnvironment方法从taskManagerServicesConfiguration获取NetworkEnvironmentConfiguration(它从配置文件读取taskmanager.data.port),如果它的nettyConfig不为null,则根据它创建了NettyConnectionManager

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java

代码语言:javascript
复制
public class NettyConnectionManager implements ConnectionManager {
​
    private final NettyServer server;
​
    private final NettyClient client;
​
    private final NettyBufferPool bufferPool;
​
    private final PartitionRequestClientFactory partitionRequestClientFactory;
​
    public NettyConnectionManager(NettyConfig nettyConfig) {
        this.server = new NettyServer(nettyConfig);
        this.client = new NettyClient(nettyConfig);
        this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
​
        this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
    }
​
    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
        NettyProtocol partitionRequestProtocol = new NettyProtocol(
            partitionProvider,
            taskEventDispatcher,
            client.getConfig().isCreditBasedEnabled());
​
        client.init(partitionRequestProtocol, bufferPool);
        server.init(partitionRequestProtocol, bufferPool);
    }
​
    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
            throws IOException, InterruptedException {
        return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
    }
​
    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {
        partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
    }
​
    @Override
    public int getNumberOfActiveConnections() {
        return partitionRequestClientFactory.getNumberOfActiveClients();
    }
​
    @Override
    public int getDataPort() {
        if (server != null && server.getLocalAddress() != null) {
            return server.getLocalAddress().getPort();
        } else {
            return -1;
        }
    }
​
    @Override
    public void shutdown() {
        client.shutdown();
        server.shutdown();
    }
​
    NettyClient getClient() {
        return client;
    }
​
    NettyServer getServer() {
        return server;
    }
​
    NettyBufferPool getBufferPool() {
        return bufferPool;
    }
}
  • NettyConnectionManager的构造器根据NettyConfig构造了NettyServer,而getDataPort则取的是server.getLocalAddress().getPort()

TaskManagerRunner

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

代码语言:javascript
复制
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
    //......
​
    public static RpcService createRpcService(
        final Configuration configuration,
        final HighAvailabilityServices haServices) throws Exception {
​
        checkNotNull(configuration);
        checkNotNull(haServices);
​
        String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);
​
        if (taskManagerHostname != null) {
            LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
        } else {
            Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
​
            InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
                haServices.getResourceManagerLeaderRetriever(),
                lookupTimeout);
​
            taskManagerHostname = taskManagerAddress.getHostName();
​
            LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
                taskManagerHostname, taskManagerAddress.getHostAddress());
        }
​
        final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
        return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
    }
​
    //......
}
  • TaskManagerRunner提供了createRpcService方法,其从配置文件读取taskmanager.rpc.port,然后调用AkkaRpcServiceUtils.createRpcService来创建RpcService

小结

  • TaskManagerServices的fromConfiguration方法从taskManagerServicesConfiguration读取配置,然后创建NetworkEnvironment,之后创建TaskManagerLocation用到了NetworkEnvironment.getConnectionManager().getDataPort();TaskExecutorToResourceManagerConnection及ConnectionID均从TaskManagerLocation获取了dataPort信息
  • TaskManagerServices的createNetworkEnvironment方法从taskManagerServicesConfiguration获取NetworkEnvironmentConfiguration(它从配置文件读取taskmanager.data.port),如果它的nettyConfig不为null,则根据它创建了NettyConnectionManager;NettyConnectionManager的构造器根据NettyConfig构造了NettyServer,而getDataPort则取的是server.getLocalAddress().getPort()
  • TaskManagerRunner提供了createRpcService方法,其从配置文件读取taskmanager.rpc.port,然后调用AkkaRpcServiceUtils.createRpcService来创建RpcService

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TaskManagerServices
  • NettyConnectionManager
  • TaskManagerRunner
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档