天津建設(shè)工程信息網(wǎng)投標(biāo)信息系統(tǒng)登錄北京seo公司wyhseo
文章目錄
- 1、配置總內(nèi)存
- 2、JobManager 內(nèi)存模型
- 3、TaskManager 內(nèi)存模型
- 4、WebUI 展示內(nèi)存
- 5、Flink On YARN 模式下內(nèi)存分配
- 6、Flink On Yarn 集群消耗資源估算
- 6.1、資源分配
- 6.2、Flink 提交 Yarn 集群的相關(guān)命令
- 6.3、Flink On Yarn 集群的資源計(jì)算公式
1、配置總內(nèi)存
Flink JVM 進(jìn)程的進(jìn)程總內(nèi)存(Total Process Memory)包含了由 Flink 應(yīng)用使用的內(nèi)存(Flink 總內(nèi)存)以及由運(yùn)行 Flink 的 JVM 使用的內(nèi)存。 Flink 總內(nèi)存(Total Flink Memory)包括 JVM 堆內(nèi)存(Heap Memory)和堆外內(nèi)存(Off-Heap Memory)。 其中堆外內(nèi)存包括直接內(nèi)存(Direct Memory)和本地內(nèi)存(Native Memory)。詳細(xì)的配置參數(shù):https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/deployment/config.html
配置 Flink 進(jìn)程內(nèi)存最簡單的方法是指定以下兩個(gè)配置項(xiàng)中的任意一個(gè):
配置項(xiàng) | TaskManager 配置參數(shù) | JobManager 配置參數(shù) |
---|---|---|
Flink 總內(nèi)存 | taskmanager.memory.flink.size | jobmanager.memory.flink.size |
進(jìn)程總內(nèi)存 | taskmanager.memory.process.size | jobmanager.memory.process.size |
Flink 啟動需要明確配置:
TaskManager | JobManager |
---|---|
taskmanager.memory.flink.size | jobmanager.memory.flink.size |
taskmanager.memory.process.size | jobmanager.memory.process.size |
taskmanager.memory.task.heap.size 和 taskmanager.memory.managed.size | jobmanager.memory.heap.size |
不建議同時(shí)設(shè)置進(jìn)程總內(nèi)存和 Flink 總內(nèi)存。 這可能會造成內(nèi)存配置沖突,從而導(dǎo)致部署失敗。 額外配置其他內(nèi)存部分時(shí),同樣需要注意可能產(chǎn)生的配置沖突。
2、JobManager 內(nèi)存模型
如上圖所示,下表中列出了 Flink JobManager 內(nèi)存模型的所有組成部分,以及影響其大小的相關(guān)配置參數(shù)。
組成部分 | 配置參數(shù) | 描述 |
---|---|---|
JVM 堆內(nèi)存 | jobmanager.memory.heap.size | JobManager 的 JVM 堆內(nèi)存。 |
堆外內(nèi)存 | jobmanager.memory.off-heap.size | JobManager 的堆外內(nèi)存(直接內(nèi)存或本地內(nèi)存)。 |
JVM Metaspace | jobmanager.memory.jvm-metaspace.size | Flink JVM 進(jìn)程的 Metaspace。 |
JVM 開銷 | jobmanager.memory.jvm-overhead.min、jobmanager.memory.jvm-overhead.max、jobmanager.memory.jvm-overhead.fraction | 用于其他 JVM 開銷的本地內(nèi)存,例如??臻g、垃圾回收空間等。該內(nèi)存部分為基于進(jìn)程總內(nèi)存的受限的等比內(nèi)存部分。 |
如配置總內(nèi)存中所述,另一種配置 JobManager 內(nèi)存的方式是明確指定 JVM 堆內(nèi)存的大小(jobmanager.memory.heap.size)。 通過這種方式,用戶可以更好地掌控用于以下用途的 JVM 堆內(nèi)存大小。
3、TaskManager 內(nèi)存模型
如上圖所示,下表中列出了 Flink TaskManager 內(nèi)存模型的所有組成部分,以及影響其大小的相關(guān)配置參數(shù)。
組成部分 | 配置參數(shù) | 描述 |
---|---|---|
框架堆內(nèi)存(Framework Heap Memory) | taskmanager.memory.framework.heap.size | 用于 Flink 框架的 JVM 堆內(nèi)存(進(jìn)階配置)。 |
任務(wù)堆內(nèi)存(Task Heap Memory) | taskmanager.memory.task.heap.size | 用于 Flink 應(yīng)用的算子及用戶代碼的 JVM 堆內(nèi)存。 |
托管內(nèi)存(Managed memory) | taskmanager.memory.managed.size、taskmanager.memory.managed.fraction | 由 Flink 管理的用于排序、哈希表、緩存中間結(jié)果及 RocksDB State Backend 的本地內(nèi)存。 |
框架堆外內(nèi)存(Framework Off-heap Memory) | taskmanager.memory.framework.off-heap.size | 用于 Flink 框架的堆外內(nèi)存(直接內(nèi)存或本地內(nèi)存)(進(jìn)階配置)。 |
任務(wù)堆外內(nèi)存(Task Off-heap Memory) | taskmanager.memory.task.off-heap.size | 用于 Flink 應(yīng)用的算子及用戶代碼的堆外內(nèi)存(直接內(nèi)存或本地內(nèi)存)。 |
網(wǎng)絡(luò)內(nèi)存(Network Memory) | taskmanager.memory.network.min、taskmanager.memory.network.max、taskmanager.memory.network.fraction | 用于任務(wù)之間數(shù)據(jù)傳輸?shù)闹苯觾?nèi)存(例如網(wǎng)絡(luò)傳輸緩沖)。該內(nèi)存部分為基于 Flink 總內(nèi)存的受限的等比內(nèi)存部分。 |
JVM Metaspace | taskmanager.memory.jvm-metaspace.size | Flink JVM 進(jìn)程的 Metaspace。 |
JVM 開銷 | taskmanager.memory.jvm-overhead.min、taskmanager.memory.jvm-overhead.max、taskmanager.memory.jvm-overhead.fraction | 用于其他 JVM 開銷的本地內(nèi)存,例如??臻g、垃圾回收空間等。該內(nèi)存部分為基于進(jìn)程總內(nèi)存的受限的等比內(nèi)存部分。 |
我們可以看到,有些內(nèi)存部分的大小可以直接通過一個(gè)配置參數(shù)進(jìn)行設(shè)置,有些則需要根據(jù)多個(gè)參數(shù)進(jìn)行調(diào)整。通常情況下,不建議對框架堆內(nèi)存和框架堆外內(nèi)存進(jìn)行調(diào)整。 除非你非常肯定 Flink 的內(nèi)部數(shù)據(jù)結(jié)構(gòu)及操作需要更多的內(nèi)存。 這可能與具體的部署環(huán)境及作業(yè)結(jié)構(gòu)有關(guān),例如非常高的并發(fā)度。 此外,Flink 的部分依賴(例如 Hadoop)在某些特定的情況下也可能會需要更多的直接內(nèi)存或本地內(nèi)存。
4、WebUI 展示內(nèi)存
JobManager 內(nèi)存直觀展示
TaskManager 內(nèi)存直觀展示
樹狀圖表示:
5、Flink On YARN 模式下內(nèi)存分配
如果是 Flink On YARN 模式下:
taskmanager.memory.process.size = 4096 MB = 4G
taskmanager.memory.network.fraction = 0.15
taskmanager.memory.managed.fraction = 0.45
然后根據(jù)以上參數(shù),就可以計(jì)算得到各部分的內(nèi)存大小:
taskmanager.memory.jvm-overhead = 4096 * 0.1 = 409.6 MB
taskmanager.memory.flink.size = 4096 - 409.6 - 256 = 3430.4 MB
taskmanager.memory.network = 3430.4 * 0.15 = 514.56 MB
taskmanager.memory.managed = 3430.4 * 0.45 = 1543.68 MB
taskmanager.memory.task.heap.size = 3430.4 - 128 * 2 - 1543.68 - 514.56 = 1116.16 MB
6、Flink On Yarn 集群消耗資源估算
6.1、資源分配
- 每一個(gè) Flink Application 都包含 至少一個(gè) JobManager (若 HA 配置則可包含多個(gè) JobManagers)。若有多個(gè) JobManagers ,則 有且僅有一個(gè) JobManager 處于 Running 狀態(tài),其他的 JobManager 則處于 Standby 狀態(tài);
- 每一個(gè)處于 Running 狀態(tài)的 JobManager 管理著 一個(gè)或多個(gè) TaskManager。TaskManager 的本質(zhì)是一個(gè) JVM 進(jìn)程,可以執(zhí)行一個(gè)或多個(gè)線程。TaskManager 可以用于對 Memory 進(jìn)行隔離;
- 每一個(gè) TaskManager 可以執(zhí)行 一個(gè)或多個(gè) Slot。Slot 的本質(zhì)是由 JVM 進(jìn)程所生成的線程。每個(gè) Slot 可以將 TaskManager 管理的的 Total Memory 進(jìn)行平均分配,但不會對 CPU 進(jìn)行隔離。在同一個(gè) TaskManager 中的 Slots 共享 TCP 連接 (through multiplexing) 、心跳信息、數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu);
- 每一個(gè) Slot 內(nèi)部可以執(zhí)行 零個(gè)或一個(gè) Pipeline。 每一個(gè) Pipeline 中又可以包含 任意數(shù)量的 有前后關(guān)聯(lián)關(guān)系的 Tasks。注意一個(gè) Flink Cluster 所能達(dá)到的最大并行度數(shù)量等于所有 TaskManager 中全部 Slot 的數(shù)量的總和。
6.2、Flink 提交 Yarn 集群的相關(guān)命令
在使用 Yarn 作為集群資源管理器時(shí),時(shí)常會使用如下命令對 Flink Application 進(jìn)行提交,主要參數(shù)如下:
flink run -m yarn-cluster -ys 2 -p 1 -yjm 1G -ytm 2G
參數(shù) | 解釋 | 說明 |
---|---|---|
-yjm,–yarnjobManagerMemory | Memory for JobManager Container with optional unit (default: MB) | JobManager 內(nèi)存容量 (在一個(gè) Flink Application 中處于 Running 狀態(tài)的 JobManager 只有一個(gè)) |
-ytm,–yarntaskManagerMemory | Memory per TaskManager Container with optional unit (default: MB) | 每一個(gè) TaskManager 的內(nèi)存容量 |
-ys,–yarnslots | Number of slots per TaskManager | 每一個(gè) TaskManager 中的 Slot 數(shù)量 |
-p,–parallelism | The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. | 任務(wù)執(zhí)行的并行度 |
該命令的各個(gè)參數(shù)表示的含義如下 (使用 flink --help 命令即可閱讀)。
Flink 啟動參考配置參數(shù)(帶有 kerberos 認(rèn)證可根據(jù)實(shí)際情況需要?jiǎng)h減):
/home/dev/soft/flink/bin/flink run \-m yarn-cluster \-yD akka.ask.timeout='360 s' \-yD akka.framesize=20485760b \-yD blob.fetch.backlog=1000 \-yD blob.fetch.num-concurrent=500 \-yD blob.fetch.retries=50 \-yD blob.storage.directory=/data1/flinkdir \-yD env.java.opts.jobmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=512m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -XX:G1HeapWastePercent=5 -XX:G1ReservePercent=25 -Dfile.encoding=UTF-8' \-yD env.java.opts.taskmanager='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=50 -XX:+ExplicitGCInvokesConcurrent -XX:+AlwaysPreTouch -XX:AutoBoxCacheMax=20000 -Dsun.security.krb5.debug=false -Dfile.encoding=UTF-8' \-yD env.java.opts='-XX:ErrorFile=/tmp/java_error_%p.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=1024m -Dfile.encoding=UTF-8' \-yD execution.attached=false \-yD execution.buffer-timeout='1000 ms' \-yD execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \-yD execution.checkpointing.interval='30 min' \-yD execution.checkpointing.max-concurrent-checkpoints=1 \-yD execution.checkpointing.min-pause='2 min' \-yD execution.checkpointing.mode=EXACTLY_ONCE \-yD execution.checkpointing.timeout='28 min' \-yD execution.checkpointing.tolerable-failed-checkpoints=8 \-yD execution.checkpointing.unaligned=true \-yD execution.checkpointing.unaligned.forced=true \-yD heartbeat.interval=60000 \-yD heartbeat.rpc-failure-threshold=5 \-yD heartbeat.timeout=340000 \-yD io.tmp.dirs=/data1/flinkdir \-yD jobmanager.heap.size=1024m \-yD jobmanager.memory.jvm-metaspace.size=268435456b \-yD jobmanager.memory.jvm-overhead.max=1073741824b \-yD jobmanager.memory.jvm-overhead.min=1073741824b \-yD jobmanager.memory.network.fraction=0.2 \-yD jobmanager.memory.network.max=6GB \-yD jobmanager.memory.off-heap.size=134217728b \-yD jobmanager.memory.process.size='18360 mb' \-yD metrics.reporter.promgateway.deleteOnShutdown=true \-yD metrics.reporter.promgateway.factory.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory \-yD metrics.reporter.promgateway.filter.includes=\*:dqc\*,uptime,taskSlotsTotal,numRegisteredTaskManagers,taskSlotsAvailable,numberOfFailedCheckpoints,numRestarts,lastCheckpointDuration,Used,Max,Total,Count,Time:gauge,meter,counter,histogram \-yD metrics.reporter.promgateway.groupingKey="yarn=${yarn};hdfs=${hdfs};job_name=TEST-broadcast-${jobName//./-}-${provId}" \-yD metrics.reporter.promgateway.host=172.17.xxxx.xxxx \-yD metrics.reporter.promgateway.interval='60 SECONDS' \-yD metrics.reporter.promgateway.jobName="TEST-broadcast-${jobName//./-}-${provId}" \-yD metrics.reporter.promgateway.port=10080 \-yD metrics.reporter.promgateway.randomJobNameSuffix=true \-yD pipeline.name="TEST-broadcast-${jobName//./-}-${provId}" \-yD pipeline.object-reuse=true \-yD rest.flamegraph.enabled=true \-yD rest.server.numThreads=20 \-yD restart-strategy.failure-rate.delay='60 s' \-yD restart-strategy.failure-rate.failure-rate-interval='3 min' \-yD restart-strategy.failure-rate.max-failures-per-interval=3 \-yD restart-strategy=failure-rate \-yD security.kerberos.krb5-conf.path=/home/dev/kerberos/krb5.conf \-yD security.kerberos.login.contexts=Client,KafkaClient \-yD security.kerberos.login.keytab=/home/dev/kerberos/xxxx.keytab \-yD security.kerberos.login.principal=xxxx \-yD security.kerberos.login.use-ticket-cache=false \-yD state.backend.async=true \-yD state.backend=hashmap \-yD state.checkpoints.dir=hdfs://xxxx/flink/checkpoint/${jobName//.//}/$provId \-yD state.checkpoint-storage=filesystem \-yD state.checkpoints.num-retained=3 \-yD state.savepoints.dir=hdfs://xxxx/flink/savepoint/${jobName//.//}/$provId \-yD table.exec.hive.fallback-mapred-writer=false \-yD task.manager.memory.segment-size=4mb \-yD taskmanager.memory.framework.off-heap.size=1GB \-yD taskmanager.memory.managed.fraction=0.2 \-yD taskmanager.memory.network.fraction=0.075 \-yD taskmanager.memory.network.max=16GB \-yD taskmanager.memory.process.size='50 gb' \-yD taskmanager.network.netty.client.connectTimeoutSec=600 \-yD taskmanager.network.request-backoff.max=120000 \-yD taskmanager.network.retries=100 \-yD taskmanager.numberOfTaskSlots=10 \-yD web.timeout=900000 \-yD web.upload.dir=/data1/flinkdir \-yD yarn.application.name="TEST-broadcast-${jobName//./-}-${provId}" \-yD yarn.application.queue=$yarnQueue \-yD yarn.application-attempts=10 \
6.3、Flink On Yarn 集群的資源計(jì)算公式
-
JobManager 的內(nèi)存計(jì)算
JobManager 的數(shù)量 = 1 (固定,由于一個(gè) Flink Application 只能有一個(gè) JobManager)
JobManager 的內(nèi)存總量 = 1 * JobManager 的內(nèi)存大小 = 1 * yjm -
TaskManager 的內(nèi)存計(jì)算
TaskManager 的數(shù)量 = (設(shè)置的并行度總數(shù) / 每個(gè) TaskManager 的 Slot 數(shù)量) = (p / ys) (Ps: p / ys 有可能為非整數(shù),故需要向下取整)
TaskManager 的內(nèi)存總量 = TaskManager 的數(shù)量 * 每個(gè) TaskManager 的內(nèi)存容量 = TaskManager 的數(shù)量 * ytm -
Slot 所占用的內(nèi)存計(jì)算
每個(gè) Slot 的內(nèi)存容量 = 每個(gè) TaskManager 的內(nèi)存容量 / 每一個(gè) TaskManager 中的 Slot 數(shù)量 = ytm / ys
Slot 的總數(shù)量 = 最大并行度數(shù)量 = p
Slot 所占用的總內(nèi)存容量 = TaskManager 的內(nèi)存總量 = (p / ys) * ytm -
yarn vcore 總數(shù)量計(jì)算
yarn vcore 總數(shù)量 = Slot 的總數(shù)量 + JobManager 占用的 vcore 數(shù)量 (與 Yarn 的 minimum Allocation 有關(guān)) = p + m (不足則取 Yarn 的最小 vcore 分配數(shù)量) -
yarn container 的總數(shù)量計(jì)算
yarn container 的總數(shù)量 = TaskManager 的數(shù)量 + JobManager 的數(shù)量 = 1 + (p / ys) = (p / ys) + 1
yarn 的內(nèi)存總量 = JobManager 的數(shù)量 * yjm (與 Yarn 的 minimum Allocation 有關(guān)) + TaskManager 的數(shù)量 * ytm = 1 * yjm (不足則取 Yarn 的最小 Memory 分配數(shù)量) + (p / ys) * ytm
Ps: 根據(jù)實(shí)際應(yīng)用經(jīng)驗(yàn),一般 Yarn 的一個(gè) vcore 搭配 2G 內(nèi)存是最為有效率的配置方法。
實(shí)戰(zhàn)應(yīng)用:
flink-conf.yaml 配置項(xiàng):
# Total size of the JobManager (JobMaster / ResourceManager / Dispatcher) process.
jobmanager.memory.process.size: 2048m
# Total size of the TaskManager process.
taskmanager.memory.process.size: 20480m
# Managed Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.
taskmanager.memory.managed.size: 4096m
# The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
taskmanager.numberOfTaskSlots: 10
Yarn 集群相關(guān)配置項(xiàng):
Minimum Allocation <memory:4096, vCores:2>
Maximum Allocation <memory:163840, vCores:96>
假設(shè) Flink 流任務(wù)在 FlinkSQL 中設(shè)置的并行度為 10 (parallelism = 10)。根據(jù)計(jì)算公式:
JobManager 的數(shù)量 = 1
TaskManager 的數(shù)量 = (p / ys) = 1 (注:ys 配置是 taskmanager.numberOfTaskSlots = 10)
Slot 的總數(shù)量 = p = 10
yarn vcore 的總數(shù)量 = Slot 的總數(shù)量 + 1 = p + 1 = p + 2 (向上取至 Yarn 最小分配 vcore 數(shù)) = 12
yarn container 的總數(shù)量 = TaskManager 的數(shù)量 + JobManager 的數(shù)量 = 2
yarn 的內(nèi)存總量 = JobManager 的數(shù)量 * yjm + TaskManager 的數(shù)量 * ytm = 1 * 2048m + 1 * 20480m = 1 * 4096m (向上取至 Yarn 最小分配內(nèi)存數(shù)) + 1 * 20480m = 24576m