本文主要研究一下flink JobManager的heap大小设置
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving
public class JobManagerOptions {
    //......
    /**
     * JVM heap size for the JobManager with memory size.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
    public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
        key("jobmanager.heap.size")
        .defaultValue("1024m")
        .withDescription("JVM heap size for the JobManager.");
    /**
     * JVM heap size (in megabytes) for the JobManager.
     * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
     */
    @Deprecated
    public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
        key("jobmanager.heap.mb")
        .defaultValue(1024)
        .withDescription("JVM heap size (in megabytes) for the JobManager.");
    //......
}
  flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java
public class ConfigurationUtils {
    private static final String[] EMPTY = new String[0];
    /**
     * Get job manager's heap memory. This method will check the new key
     * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
     * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
     *
     * @param configuration the configuration object
     * @return the memory size of job manager's heap memory.
     */
    public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
        if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
            return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
        } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
            return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
        } else {
            //use default value
            return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
        }
    }
    //......
}
  flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
@PublicEvolving
public class MemorySize implements java.io.Serializable {
    private static final long serialVersionUID = 1L;
    // ------------------------------------------------------------------------
    /** The memory size, in bytes. */
    private final long bytes;
    /**
     * Constructs a new MemorySize.
     *
     * @param bytes The size, in bytes. Must be zero or larger.
     */
    public MemorySize(long bytes) {
        checkArgument(bytes >= 0, "bytes must be >= 0");
        this.bytes = bytes;
    }
    // ------------------------------------------------------------------------
    /**
     * Gets the memory size in bytes.
     */
    public long getBytes() {
        return bytes;
    }
    /**
     * Gets the memory size in Kibibytes (= 1024 bytes).
     */
    public long getKibiBytes() {
        return bytes >> 10;
    }
    /**
     * Gets the memory size in Mebibytes (= 1024 Kibibytes).
     */
    public int getMebiBytes() {
        return (int) (bytes >> 20);
    }
    /**
     * Gets the memory size in Gibibytes (= 1024 Mebibytes).
     */
    public long getGibiBytes() {
        return bytes >> 30;
    }
    /**
     * Gets the memory size in Tebibytes (= 1024 Gibibytes).
     */
    public long getTebiBytes() {
        return bytes >> 40;
    }
    // ------------------------------------------------------------------------
    @Override
    public int hashCode() {
        return (int) (bytes ^ (bytes >>> 32));
    }
    @Override
    public boolean equals(Object obj) {
        return obj == this ||
                (obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);
    }
    @Override
    public String toString() {
        return bytes + " bytes";
    }
    // ------------------------------------------------------------------------
    //  Parsing
    // ------------------------------------------------------------------------
    /**
     * Parses the given string as as MemorySize.
     *
     * @param text The string to parse
     * @return The parsed MemorySize
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static MemorySize parse(String text) throws IllegalArgumentException {
        return new MemorySize(parseBytes(text));
    }
    /**
     * Parses the given string with a default unit.
     *
     * @param text The string to parse.
     * @param defaultUnit specify the default unit.
     * @return The parsed MemorySize.
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {
        if (!hasUnit(text)) {
            return parse(text + defaultUnit.getUnits()[0]);
        }
        return parse(text);
    }
    /**
     * Parses the given string as bytes.
     * The supported expressions are listed under {@link MemorySize}.
     *
     * @param text The string to parse
     * @return The parsed size, in bytes.
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static long parseBytes(String text) throws IllegalArgumentException {
        checkNotNull(text, "text");
        final String trimmed = text.trim();
        checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
        final int len = trimmed.length();
        int pos = 0;
        char current;
        while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
            pos++;
        }
        final String number = trimmed.substring(0, pos);
        final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
        if (number.isEmpty()) {
            throw new NumberFormatException("text does not start with a number");
        }
        final long value;
        try {
            value = Long.parseLong(number); // this throws a NumberFormatException on overflow
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("The value '" + number +
                    "' cannot be re represented as 64bit number (numeric overflow).");
        }
        final long multiplier;
        if (unit.isEmpty()) {
            multiplier = 1L;
        }
        else {
            if (matchesAny(unit, BYTES)) {
                multiplier = 1L;
            }
            else if (matchesAny(unit, KILO_BYTES)) {
                multiplier = 1024L;
            }
            else if (matchesAny(unit, MEGA_BYTES)) {
                multiplier = 1024L * 1024L;
            }
            else if (matchesAny(unit, GIGA_BYTES)) {
                multiplier = 1024L * 1024L * 1024L;
            }
            else if (matchesAny(unit, TERA_BYTES)) {
                multiplier = 1024L * 1024L * 1024L * 1024L;
            }
            else {
                throw new IllegalArgumentException("Memory size unit '" + unit +
                        "' does not match any of the recognized units: " + MemoryUnit.getAllUnits());
            }
        }
        final long result = value * multiplier;
        // check for overflow
        if (result / multiplier != value) {
            throw new IllegalArgumentException("The value '" + text +
                    "' cannot be re represented as 64bit number of bytes (numeric overflow).");
        }
        return result;
    }
    private static boolean matchesAny(String str, MemoryUnit unit) {
        for (String s : unit.getUnits()) {
            if (s.equals(str)) {
                return true;
            }
        }
        return false;
    }
    //......
}
  flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
/**
     *  Enum which defines memory unit, mostly used to parse value from configuration file.
     *
     * <p>To make larger values more compact, the common size suffixes are supported:
     *
     * <ul>
     *     <li>q or 1b or 1bytes (bytes)
     *     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
     *     <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
     *     <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
     *     <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
     * </ul>
     *
     */
    public enum MemoryUnit {
        BYTES(new String[] { "b", "bytes" }),
        KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),
        MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),
        GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),
        TERA_BYTES(new String[] { "t", "tb", "tebibytes" });
        private String[] units;
        MemoryUnit(String[] units) {
            this.units = units;
        }
        public String[] getUnits() {
            return units;
        }
        public static String getAllUnits() {
            return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());
        }
        public static boolean hasUnit(String text) {
            checkNotNull(text, "text");
            final String trimmed = text.trim();
            checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
            final int len = trimmed.length();
            int pos = 0;
            char current;
            while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
                pos++;
            }
            final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
            return unit.length() > 0;
        }
        private static String concatenateUnits(final String[]... allUnits) {
            final StringBuilder builder = new StringBuilder(128);
            for (String[] units : allUnits) {
                builder.append('(');
                for (String unit : units) {
                    builder.append(unit);
                    builder.append(" | ");
                }
                builder.setLength(builder.length() - 3);
                builder.append(") / ");
            }
            builder.setLength(builder.length() - 3);
            return builder.toString();
        }
    }
  flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {
    //......
    private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
        if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
            LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
        }
        // TODO: The number of task manager should be deprecated soon
        final int numberTaskManagers;
        if (cmd.hasOption(container.getOpt())) {
            numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));
        } else {
            numberTaskManagers = 1;
        }
        // JobManager Memory
        final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
        // Task Managers memory
        final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
        return new ClusterSpecification.ClusterSpecificationBuilder()
            .setMasterMemoryMB(jobManagerMemoryMB)
            .setTaskManagerMemoryMB(taskManagerMemoryMB)
            .setNumberTaskManagers(numberTaskManagers)
            .setSlotsPerTaskManager(slotsPerTaskManager)
            .createClusterSpecification();
    }
    //......
}
  flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh
//......
DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary
//......
# Define FLINK_JM_HEAP if it is not already set
if [ -z "${FLINK_JM_HEAP}" ]; then
    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
fi
# Try read old config key, if new key not exists
if [ "${FLINK_JM_HEAP}" == 0 ]; then
    FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
fi
//......
if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
    FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
    # Remove leading and ending double quotes (if present) of value
    FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
fi
if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
    FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
    # Remove leading and ending double quotes (if present) of value
    FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"
fi
//......
# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
    JVM_ARGS=""
fi
//......
  flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh
#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
ENTRYPOINT=standalonesession
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
        echo "used deprecated key /`${KEY_JOBM_MEM_MB}/`, please replace with key /`${KEY_JOBM_MEM_SIZE}/`"
    else
        flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
        FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
    fi
    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
        echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
        exit 1
    fi
    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
    fi
    # Add JobManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
    # Startup parameters
    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
    if [ ! -z $HOST ]; then
        args+=("--host")
        args+=("${HOST}")
    fi
    if [ ! -z $WEBUIPORT ]; then
        args+=("--webui-port")
        args+=("${WEBUIPORT}")
    fi
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
  FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
)    依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS(      依据env.java.opts
)中    flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh
#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
case $SERVICE in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;
    (historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;
    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;
    (standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;
    (standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
    ;;
    (*)
        echo "Unknown service '${SERVICE}'. $USAGE."
        exit 1
    ;;
esac
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "/(.*/)/./(.*/)/..*"//1/2/; 1q')
# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
    if [ "$JAVA_VERSION" -lt 18 ]; then
        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
    fi
fi
echo "Starting $SERVICE as a console application on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
  FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
);如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM(      依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS(      依据env.java.opts
)中;jobmanager.sh最后调用flink-console.sh来启动相关类    
由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,要么指定FLINK_JM_HEAP环境变量(    比如FLINK_JM_HEAP=512m
),要么在flink-conf.yaml中指定jobmanager.heap.size或者env.java.opts以及env.java.opts.jobmanager