前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink TaskManager的managed memory

聊聊flink TaskManager的managed memory

原创
作者头像
code4it
修改2019-02-20 12:17:46
2.8K0
修改2019-02-20 12:17:46
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink TaskManager的managed memory

TaskManagerOptions

flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java

代码语言:javascript
复制
@PublicEvolving
public class TaskManagerOptions {
    //......
​
    /**
     * JVM heap size for the TaskManagers with memory size.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
    public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
            key("taskmanager.heap.size")
            .defaultValue("1024m")
            .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
                    " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
                    " YARN container, minus a certain tolerance value.");
​
    /**
     * Amount of memory to be allocated by the task manager's memory manager. If not
     * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
     */
    public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
            key("taskmanager.memory.size")
            .defaultValue("0")
            .withDescription("Amount of memory to be allocated by the task manager's memory manager." +
                " If not set, a relative fraction will be allocated.");
​
    /**
     * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
     * not set.
     */
    public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
            key("taskmanager.memory.fraction")
            .defaultValue(0.7f)
            .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" +
                " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." +
                " For example, a value of `0.8` means that a task manager reserves 80% of its memory" +
                " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
                " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
                " is not set.");
​
    /**
     * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
     * as well as the network buffers.
     **/
    public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
            key("taskmanager.memory.off-heap")
            .defaultValue(false)
            .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" +
                " TaskManager as well as the network buffers.");
​
    /**
     * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
     */
    public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
            key("taskmanager.memory.preallocate")
            .defaultValue(false)
            .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.");
​
    //......
}
  • taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory

TaskManagerServices.calculateHeapSizeMB

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

代码语言:javascript
复制
public class TaskManagerServices {
    //......
​
    /**
     * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
     * based on the total memory to use and the given configuration parameters.
     *
     * @param totalJavaMemorySizeMB
     *      overall available memory to use (heap and off-heap)
     * @param config
     *      configuration object
     *
     * @return heap memory to use (in megabytes)
     */
    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
​
        // subtract the Java memory used for network buffers (always off-heap)
        final long networkBufMB =
            calculateNetworkBufferMemory(
                totalJavaMemorySizeMB << 20, // megabytes to bytes
                config) >> 20; // bytes to megabytes
        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
​
        // split the available Java memory between heap and off-heap
​
        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
​
        final long heapSizeMB;
        if (useOffHeap) {
​
            long offHeapSize;
            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
                try {
                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException(
                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
                }
            } else {
                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
            }
​
            if (offHeapSize <= 0) {
                // calculate off-heap section via fraction
                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
            }
​
            TaskManagerServicesConfiguration
                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
                    "Managed memory size too large for " + networkBufMB +
                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
                        " MB JVM memory");
​
            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
        } else {
            heapSizeMB = remainingJavaMemorySizeMB;
        }
​
        return heapSizeMB;
    }
​
    //......
}
  • taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
  • 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
  • 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize

TaskManagerServices.createMemoryManager

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

代码语言:javascript
复制
public class TaskManagerServices {
    //......
​
    /**
     * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
     *
     * @param taskManagerServicesConfiguration to create the memory manager from
     * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
     * @param maxJvmHeapMemory the maximum JVM heap size
     * @return Memory manager
     * @throws Exception
     */
    private static MemoryManager createMemoryManager(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            long freeHeapMemoryWithDefrag,
            long maxJvmHeapMemory) throws Exception {
        // computing the amount of memory to use depends on how much memory is available
        // it strictly needs to happen AFTER the network stack has been initialized
​
        // check if a value has been configured
        long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
​
        MemoryType memType = taskManagerServicesConfiguration.getMemoryType();
​
        final long memorySize;
​
        boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
​
        if (configuredMemory > 0) {
            if (preAllocateMemory) {
                LOG.info("Using {} MB for managed memory." , configuredMemory);
            } else {
                LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
            }
            memorySize = configuredMemory << 20; // megabytes to bytes
        } else {
            // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
            float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
​
            if (memType == MemoryType.HEAP) {
                // network buffers allocated off-heap -> use memoryFraction of the available heap:
                long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
                        memoryFraction , relativeMemSize >> 20);
                } else {
                    LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
                        "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
                }
                memorySize = relativeMemSize;
            } else if (memType == MemoryType.OFF_HEAP) {
                // The maximum heap memory has been adjusted according to the fraction (see
                // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
                // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
                // directMemorySize = jvmTotalNoNet * memoryFraction
                long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);
                if (preAllocateMemory) {
                    LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
                        memoryFraction, directMemorySize >> 20);
                } else {
                    LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
                        " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
                }
                memorySize = directMemorySize;
            } else {
                throw new RuntimeException("No supported memory type detected.");
            }
        }
​
        // now start the memory manager
        final MemoryManager memoryManager;
        try {
            memoryManager = new MemoryManager(
                memorySize,
                taskManagerServicesConfiguration.getNumberOfSlots(),
                taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
                memType,
                preAllocateMemory);
        } catch (OutOfMemoryError e) {
            if (memType == MemoryType.HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() +
                    ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
            } else if (memType == MemoryType.OFF_HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() +
                    ") while allocating the TaskManager off-heap memory (" + memorySize +
                    " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
            } else {
                throw e;
            }
        }
        return memoryManager;
    }
​
    //......
}
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
  • 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
  • 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)

TaskManagerServicesConfiguration

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

代码语言:javascript
复制
public class TaskManagerServicesConfiguration {
    //......
​
    /**
     * Utility method to extract TaskManager config parameters from the configuration and to
     * sanity check them.
     *
     * @param configuration The configuration.
     * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
     * @param localCommunication True, to skip initializing the network stack.
     *                                      Use only in cases where only one task manager runs.
     * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
     */
    public static TaskManagerServicesConfiguration fromConfiguration(
            Configuration configuration,
            InetAddress remoteAddress,
            boolean localCommunication) throws Exception {
​
        // we need this because many configs have been written with a "-1" entry
        int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
        if (slots == -1) {
            slots = 1;
        }
​
        final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
        String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);
​
        if (localStateRootDir.length == 0) {
            // default to temp dirs.
            localStateRootDir = tmpDirs;
        }
​
        boolean localRecoveryMode = configuration.getBoolean(
            CheckpointingOptions.LOCAL_RECOVERY.key(),
            CheckpointingOptions.LOCAL_RECOVERY.defaultValue());
​
        final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
            configuration,
            localCommunication,
            remoteAddress,
            slots);
​
        final QueryableStateConfiguration queryableStateConfig =
                parseQueryableStateConfiguration(configuration);
​
        // extract memory settings
        long configuredMemory;
        String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
        if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
            try {
                configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
            } catch (IllegalArgumentException e) {
                throw new IllegalConfigurationException(
                    "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
            }
        } else {
            configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
        }
​
        checkConfigParameter(
            configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||
                configuredMemory > 0, configuredMemory,
            TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
            "MemoryManager needs at least one MB of memory. " +
                "If you leave this config parameter empty, the system automatically " +
                "pick a fraction of the available memory.");
​
        // check whether we use heap or off-heap memory
        final MemoryType memType;
        if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
            memType = MemoryType.OFF_HEAP;
        } else {
            memType = MemoryType.HEAP;
        }
​
        boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
​
        float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
        checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
            TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
            "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
​
        long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();
​
        return new TaskManagerServicesConfiguration(
            remoteAddress,
            tmpDirs,
            localStateRootDir,
            localRecoveryMode,
            networkConfig,
            queryableStateConfig,
            slots,
            configuredMemory,
            memType,
            preAllocateMemory,
            memoryFraction,
            timerServiceShutdownTimeout,
            ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
    }
​
    //......
}
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

小结

  • TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TaskManagerOptions
  • TaskManagerServices.calculateHeapSizeMB
  • TaskManagerServices.createMemoryManager
  • TaskManagerServicesConfiguration
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档