欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)之Flink優(yōu)化

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)之Flink優(yōu)化

http://yzkb.51969.com/

文章目錄

導(dǎo)言:Flink調(diào)優(yōu)概覽第1章 資源配置調(diào)優(yōu)1.1 內(nèi)存設(shè)置1.1.1 TaskManager 內(nèi)存模型1.1.2 生產(chǎn)資源配置示例

1.2 合理利用 cpu 資源1.2.1 使用 DefaultResourceCalculator 策略1.2.2 使用 DominantResourceCalculator 策略1.2.3 使用DominantResourceCalculator策略并指定容器

1.3 并行度設(shè)置1.3.1 全局并行度計(jì)算1.3.2 Source 端并行度的配置1.3.3 Transform 端并行度的配置1.3.4 Sink 端并行度的配置

第2章 狀態(tài)及 Checkpoint 調(diào)優(yōu)2.1 RocksDB 大狀態(tài)調(diào)優(yōu)2.1.1 開啟State 訪問性能監(jiān)控2.1.2 開啟增量檢查點(diǎn)和本地恢復(fù)2.1.4 增大block 緩存2.1.5 增大 writebuffer 和 level 閾值大小2.1.6 增大 writebuffer 數(shù)量2.1.7 增大后臺(tái)線程數(shù)和writebuffer 合并數(shù)2.1.8 開啟分區(qū)索引功能

2.2 Checkpoint 設(shè)置

第3章 反壓處理3.1 概述3.1.1 反壓的理解3.1.2 反壓的危害

3.2 定位反壓節(jié)點(diǎn)3.2.1 利用 FlinkWebUI 定位3.2.2 利用 Metrics 定位1)根據(jù)指標(biāo)分析反壓2)可以進(jìn)一步分析數(shù)據(jù)傳輸

3.3 反壓的原因及處理3.3.1 查看是否數(shù)據(jù)傾斜3.3.2 使用火焰圖分析1)開啟火焰圖功能2)WebUI 查看火焰圖3)分析火焰圖看頂層的哪個(gè)函數(shù)占據(jù)的寬度最大。只要有"平頂"(plateaus),就表示該函數(shù)可能

3.3.3 分析GC 情況3.3.4 外部組件交互

第4章 數(shù)據(jù)傾斜4.1 判斷是否存在數(shù)據(jù)傾斜4.2 數(shù)據(jù)傾斜的解決4.2.1 keyBy 后的聚合操作存在數(shù)據(jù)傾斜1)為什么不能直接用二次聚合來處理2)使用 LocalKeyBy 的思想實(shí)現(xiàn)方式:3)DataStreamAPI 自定義實(shí)現(xiàn)的案例

4.2.2 keyBy 之前發(fā)生數(shù)據(jù)傾斜4.2.3 keyBy 后的窗口聚合操作存在數(shù)據(jù)傾斜1)實(shí)現(xiàn)思路:2)提交原始案例3)提交兩階段聚合的案例

第5章 Job 優(yōu)化5.1 使用 DataGen 造數(shù)據(jù)5.1.1 DataStream 的DataGenerator

5.2 算子指定 UUID1)提交案例:未指定uid2)提交案例:指定uid

5.3 鏈路延遲測(cè)量5.4 開啟對(duì)象重用5.5 細(xì)粒度滑動(dòng)窗口優(yōu)化1)細(xì)粒度滑動(dòng)的影響2)解決思路3)細(xì)粒度的滑動(dòng)窗口案例4)時(shí)間分片案例

第6章 FlinkSQL 調(diào)優(yōu)6.1 設(shè)置空閑狀態(tài)保留時(shí)間6.2 開啟MiniBatch? MiniBatch 默認(rèn)關(guān)閉,開啟方式如下:

6.3 開啟LocalGlobal6.3.1 原理概述? LocalGlobal 開啟方式:

6.3.2 提交案例:統(tǒng)計(jì)每天每個(gè) mid 出現(xiàn)次數(shù)6.3.3 提交案例:開啟miniBatch 和LocalGlobal

6.4 開啟 SplitDistinct6.4.1 原理概述6.4.2 提交案例:count(distinct)存在熱點(diǎn)問題6.4.3 提交案例:開啟splitdistinct

6.5 多維 DISTINCT 使用Filter6.5.1 原理概述6.5.2 提交案例:多維 Distinct

6.6 設(shè)置參數(shù)總結(jié)

第7章 常見故障排除7.1 非法配置異常7.2 Java 堆空間異常7.3 直接緩沖存儲(chǔ)器異常7.4 元空間異常7.5 網(wǎng)絡(luò)緩沖區(qū)數(shù)量不足7.6 超出容器內(nèi)存異常7.7 Checkpoint 失敗7.7.1 CheckpointDecline7.7.2 CheckpointExpire

7.8 Checkpoint 慢1)SourceTriggerCheckpoint 慢2)使用增量 Checkpoint3)作業(yè)存在反壓或者數(shù)據(jù)傾斜4)Barrier 對(duì)齊慢5)主線程太忙,導(dǎo)致沒機(jī)會(huì)做 snapshot6)同步階段做的慢6)異步階段做的慢

7.9 Kafka 動(dòng)態(tài)發(fā)現(xiàn)分區(qū)7.10 Watermark 不更新7.11 依賴沖突7.12 超出文件描述符限制7.13 臟數(shù)據(jù)導(dǎo)致數(shù)據(jù)轉(zhuǎn)發(fā)失敗7.14 通訊超時(shí)7.15 FlinkonYarn 其他常見錯(cuò)誤

導(dǎo)言:Flink調(diào)優(yōu)概覽

第1章 資源配置調(diào)優(yōu)

Flink 性能調(diào)優(yōu)的第一步,就是為任務(wù)分配合適的資源,在一定范圍內(nèi),增加資源的分配與性能的提升是成正比的,實(shí)現(xiàn)了最優(yōu)的資源配置后,在此基礎(chǔ)上再考慮進(jìn)行后面論述的性能調(diào)優(yōu)策略。 提交方式主要是yarn-per-job,資源的分配在使用腳本提交 Flink 任務(wù)時(shí)進(jìn)行指定。 ? 標(biāo)準(zhǔn)的 Flink 任務(wù)提交腳本(GenericCLI 模式) 從 1.11 開始,增加了通用客戶端模式,參數(shù)使用-D 指定

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \ 指定并行度

-Dyarn.application.queue=test \ 指定 yarn 隊(duì)列

-Djobmanager.memory.process.size=1024mb \ 指定 JM 的總進(jìn)程大小

-Dtaskmanager.memory.process.size=1024mb \ 指定每個(gè) TM 的總進(jìn)程大小

-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個(gè) TM 的 slot 數(shù)

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

參數(shù)列表: https://ci.apache.org/projects/flink/flink-docs-release-1.13/deployment/config.html

1.1 內(nèi)存設(shè)置

1.1.1 TaskManager 內(nèi)存模型

Flink內(nèi)存模型

1、內(nèi)存模型詳解

? JVM 特定內(nèi)存:JVM 本身使用的內(nèi)存,包含 JVM 的 metaspace 和 over-head 1)JVM metaspace:JVM 元空間 taskmanager.memory.jvm-metaspace.size,默認(rèn) 256mb 2) JVM over-head 執(zhí)行開銷:JVM 執(zhí)行時(shí)自身所需要的內(nèi)容,包括線程堆棧、IO、編譯緩存等所使用的內(nèi)存。 taskmanager.memory.jvm-overhead.fraction,默認(rèn) 0.1 taskmanager.memory.jvm-overhead.min,默認(rèn) 192mb taskmanager.memory.jvm-overhead.max,默認(rèn) 1gb 總進(jìn)程內(nèi)存*fraction,如果小于配置的 min(或大于配置的 max)大小,則使用 min/max 大小

? 框架內(nèi)存:Flink 框架,即 TaskManager 本身所占用的內(nèi)存,不計(jì)入 Slot 的資源中。 堆內(nèi):taskmanager.memory.framework.heap.size,默認(rèn) 128MB 堆外:taskmanager.memory.framework.off-heap.size,默認(rèn) 128MB ? Task 內(nèi)存:Task 執(zhí)行用戶代碼時(shí)所使用的內(nèi)存 堆內(nèi):taskmanager.memory.task.heap.size,默認(rèn) none,由 Flink 內(nèi)存扣除掉其他部分的內(nèi)存得到。 堆外:taskmanager.memory.task.off-heap.size,默認(rèn) 0,表示不使用堆外內(nèi)存 ? 網(wǎng)絡(luò)內(nèi)存:網(wǎng)絡(luò)數(shù)據(jù)交換所使用的堆外內(nèi)存大小,如網(wǎng)絡(luò)數(shù)據(jù)交換緩沖區(qū)堆外:taskmanager.memory.network.fraction,默認(rèn) 0.1 taskmanager.memory.network.min,默認(rèn) 64mb taskmanager.memory.network.max,默認(rèn) 1gb Flink 內(nèi)存fraction,如果小于配置的 min(或大于配置的 max)大小,則使用 min/max 大小 ? 托管內(nèi)存:用于 RocksDB State Backend 的本地內(nèi)存和批的排序、哈希表、緩存中間結(jié)果。堆外: taskmanager.memory.managed.fraction,默認(rèn) 0.4 taskmanager.memory.managed.size,默認(rèn) none 如果 size 沒指定,則等于Flink 內(nèi)存fraction 2、案例分析 基于Yarn 模式,一般參數(shù)指定的是總進(jìn)程內(nèi)存,taskmanager.memory.process.size,比如指定為 4G,每一塊內(nèi)存得到大小如下: (1) 計(jì)算 Flink 內(nèi)存 JVM 元空間 256m JVM 執(zhí)行開銷: 4g0.1=409.6m,在[192m,1g]之間,最終結(jié)果 409.6m Flink 內(nèi)存=4g-256m-409.6m=3430.4m (2) 網(wǎng)絡(luò)內(nèi)存=3430.4m0.1=343.04m,在[64m,1g]之間,最終結(jié)果 343.04m (3) 托管內(nèi)存=3430.4m*0.4=1372.16m (4) 框架內(nèi)存,堆內(nèi)和堆外都是 128m (5) Task 堆內(nèi)內(nèi)存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m

所以進(jìn)程內(nèi)存給多大,每一部分內(nèi)存需不需要調(diào)整,可以看內(nèi)存的使用率來調(diào)整。

1.1.2 生產(chǎn)資源配置示例

bin/flinkrun\-tyarn-per-job \-d\-p5 \ 指定并行度-Dyarn.application.queue=test \ 指定 yarn 隊(duì)列-Djobmanager.memory.process.size=2048mb \ JM2~4G 足夠-Dtaskmanager.memory.process.size=4096mb \ 單個(gè) TM2~8G 足夠-Dtaskmanager.numberOfTaskSlots=2 \ 與容器核數(shù) 1core:1slot 或 2core:1slot-ccom.atguigu.flink.tuning.UvDemo\/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

Flink 是實(shí)時(shí)流處理,關(guān)鍵在于資源情況能不能抗住高峰時(shí)期每秒的數(shù)據(jù)量,通常用 QPS/TPS 來描述數(shù)據(jù)情況。

1.2 合理利用 cpu 資源

Yarn 的容量調(diào)度器默認(rèn)情況下是使用“DefaultResourceCalculator”分配策略,只根據(jù)內(nèi)存調(diào)度資源,所以在 Yarn 的資源管理頁(yè)面上看到每個(gè)容器的 vcore 個(gè)數(shù)還是 1。

yarn.scheduler.capacity.resource-calculator

org.apache.hadoop.yarn.util.resource.DominantResourceCalculator |

可以修改策略為 DominantResourceCalculator,該資源計(jì)算器在計(jì)算資源的時(shí)候會(huì)綜合考慮 cpu 和內(nèi)存的情況。在 capacity-scheduler.xml 中修改屬性:

1.2.1 使用 DefaultResourceCalculator 策略

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=4096mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

可以看到一個(gè)容器只有一個(gè) vcore: [外鏈圖片轉(zhuǎn)存中…(img-g4c42dno-1708782286117)]

1.2.2 使用 DominantResourceCalculator 策略

修改后 yarn 配置后,分發(fā)配置并重啟 yarn,再次提交 flink 作業(yè):

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=4096mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

看到容器的 vcore 數(shù)變了: [外鏈圖片轉(zhuǎn)存中…(img-fbYnp8b4-1708782286117)]

JobManager1 個(gè),占用 1 個(gè)容器,vcore=1 TaskManager3 個(gè),占用 3 個(gè)容器,每個(gè)容器 vcore=2,總 vcore=2*3=6,因?yàn)槟J(rèn)單個(gè)容器的 vcore 數(shù)=單 TM 的slot 數(shù)

1.2.3 使用DominantResourceCalculator策略并指定容器

vcore 數(shù)

指定yarn 容器的 vcore 數(shù),提交:

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Dyarn.containers.vcores=3 \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=4096mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

[外鏈圖片轉(zhuǎn)存中…(img-OPO8qSUg-1708782286117)]

JobManager1 個(gè),占用 1 個(gè)容器,vcore=1 TaskManager3 個(gè),占用 3 個(gè)容器,每個(gè)容器vcore =3,總 vcore=3*3=9

1.3 并行度設(shè)置

1.3.1 全局并行度計(jì)算

開發(fā)完成后,先進(jìn)行壓測(cè)。任務(wù)并行度給 10 以下,測(cè)試單個(gè)并行度的處理上限。然后總 QPS/單并行度的處理能力 = 并行度 開發(fā)完 Flink 作業(yè),壓測(cè)的方式很簡(jiǎn)單,先在 kafka 中積壓數(shù)據(jù),之后開啟 Flink 任務(wù),出現(xiàn)反壓,就是處理瓶頸。相當(dāng)于水庫(kù)先積水,一下子泄洪。 不能只從 QPS 去得出并行度,因?yàn)橛行┳侄紊佟⑦壿嫼?jiǎn)單的任務(wù),單并行度一秒處理幾萬條數(shù)據(jù)。而有些數(shù)據(jù)字段多,處理邏輯復(fù)雜,單并行度一秒只能處理 1000 條數(shù)據(jù)。 最好根據(jù)高峰期的QPS 壓測(cè),并行度*1.2 倍,富余一些資源。

1.3.2 Source 端并行度的配置

數(shù)據(jù)源端是 Kafka,Source 的并行度設(shè)置為Kafka 對(duì)應(yīng) Topic 的分區(qū)數(shù)。 如果已經(jīng)等于 Kafka 的分區(qū)數(shù),消費(fèi)速度仍跟不上數(shù)據(jù)生產(chǎn)速度,考慮下 Kafka 要擴(kuò)大分區(qū),同時(shí)調(diào)大并行度等于分區(qū)數(shù)。 Flink 的一個(gè)并行度可以處理一至多個(gè)分區(qū)的數(shù)據(jù),如果并行度多于 Kafka 的分區(qū)數(shù),那么就會(huì)造成有的并行度空閑,浪費(fèi)資源。

1.3.3 Transform 端并行度的配置

? Keyby 之前的算子 一般不會(huì)做太重的操作,都是比如 map、filter、flatmap 等處理較快的算子,并行度可以和 source 保持一致。 ? Keyby 之后的算子 如果并發(fā)較大,建議設(shè)置并行度為 2 的整數(shù)次冪,例如:128、256、512; 小并發(fā)任務(wù)的并行度不一定需要設(shè)置成 2 的整數(shù)次冪; 大并發(fā)任務(wù)如果沒有 KeyBy,并行度也無需設(shè)置為 2 的整數(shù)次冪;

1.3.4 Sink 端并行度的配置

Sink 端是數(shù)據(jù)流向下游的地方,可以根據(jù) Sink 端的數(shù)據(jù)量及下游的服務(wù)抗壓能力進(jìn)行評(píng)估。如果 Sink 端是 Kafka,可以設(shè)為 Kafka 對(duì)應(yīng) Topic 的分區(qū)數(shù)。 Sink 端的數(shù)據(jù)量小,比較常見的就是監(jiān)控告警的場(chǎng)景,并行度可以設(shè)置的小一些。 Source 端的數(shù)據(jù)量是最小的,拿到 Source 端流過來的數(shù)據(jù)后做了細(xì)粒度的拆分,數(shù)據(jù)量不斷的增加,到 Sink 端的數(shù)據(jù)量就非常大。那么在 Sink 到下游的存儲(chǔ)中間件的時(shí)候就需要提高并行度。 另外 Sink 端要與下游的服務(wù)進(jìn)行交互,并行度還得根據(jù)下游的服務(wù)抗壓能力來設(shè)置,如果在 Flink Sink 這端的數(shù)據(jù)量過大的話,且 Sink 處并行度也設(shè)置的很大,但下游的服務(wù)完全撐不住這么大的并發(fā)寫入,可能會(huì)造成下游服務(wù)直接被寫掛,所以最終還是要在 Sink 處的并行度做一定的權(quán)衡。

第2章 狀態(tài)及 Checkpoint 調(diào)優(yōu)

2.1 RocksDB 大狀態(tài)調(diào)優(yōu)

RocksDB 是基于 LSM Tree 實(shí)現(xiàn)的(類似 HBase),寫數(shù)據(jù)都是先緩存到內(nèi)存中,所以 RocksDB 的寫請(qǐng)求效率比較高。RocksDB 使用內(nèi)存結(jié)合磁盤的方式來存儲(chǔ)數(shù)據(jù),每次獲取數(shù)據(jù)時(shí),先從內(nèi)存中 blockcache 中查找,如果內(nèi)存中沒有再去磁盤中查詢。使用 RocksDB 時(shí),狀態(tài)大小僅受可用磁盤空間量的限制,性能瓶頸主要在于 RocksDB 對(duì)磁盤的讀請(qǐng)求,每次讀寫操作都必須對(duì)數(shù)據(jù)進(jìn)行反序列化或者序列化。當(dāng)處理性能不夠時(shí),僅需要橫向擴(kuò)展并行度即可提高整個(gè) Job 的吞吐量。 [外鏈圖片轉(zhuǎn)存中…(img-pTkt3CuR-1708782286117)]

從 Flink1.10 開始,F(xiàn)link 默認(rèn)將 RocksDB 的內(nèi)存大小配置為每個(gè) taskslot 的托管內(nèi)存。調(diào)試內(nèi)存性能的問題主要是通過調(diào)整配置項(xiàng) taskmanager.memory.managed.size或者 taskmanager.memory.managed.fraction 以增加 Flink 的托管內(nèi)存(即堆外的托管內(nèi)存)。進(jìn)一步可以調(diào)整一些參數(shù)進(jìn)行高級(jí)性能調(diào)優(yōu),這些參數(shù)也可以在應(yīng)用程序中通過RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)指定。下面介紹 提高資源利用率的幾個(gè)重要配置:

2.1.1 開啟State 訪問性能監(jiān)控

Flink1.13 中引入了 State 訪問的性能監(jiān)控,即 latencytrackigstate。此功能不局限于 State Backend 的類型,自定義實(shí)現(xiàn)的 State Backend 也可以復(fù)用此功能。 [外鏈圖片轉(zhuǎn)存中…(img-cmqp8Uys-1708782286117)]

State 訪問的性能監(jiān)控會(huì)產(chǎn)生一定的性能影響,所以,默認(rèn)每100 次做一次取樣 (sample),對(duì)不同的 StateBackend 性能損失影響不同: 對(duì)于 RocksDBStateBackend,性能損失大概在 1% 左右 對(duì)于 Heap State Backend,性能損失最多可達(dá) 10%

state.backend.latency-track.keyed-state-enabled:true#啟用訪問狀態(tài)的性能監(jiān)控 state.backend.latency-track.sample-interval: 100 #采樣間隔state.backend.latency-track.history-size: 128 #保留的采樣數(shù)據(jù)個(gè)數(shù),越大越精確 state.backend.latency-track.state-name-as-variable: true #將狀態(tài)名作為變量

正常開啟第一個(gè)參數(shù)即可。

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=4096mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-Dstate.backend.latency-track.keyed-state-enabled=true \

-c com.atguigu.flink.tuning.RocksdbTuning \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

2.1.2 開啟增量檢查點(diǎn)和本地恢復(fù)

1)開啟增量檢查點(diǎn) RocksDB 是目前唯一可用于支持有狀態(tài)流處理應(yīng)用程序增量檢查點(diǎn)的狀態(tài)后端,可以修改參數(shù)開啟增量檢查點(diǎn):

state.backend.incremental: true #默認(rèn) false,改為true?;虼a中指定newEmbeddedRocksDBStateBackend(true)

2)開啟本地恢復(fù) 當(dāng) Flink 任務(wù)失敗時(shí),可以基于本地的狀態(tài)信息進(jìn)行恢復(fù)任務(wù),可能不需要從 hdfs 拉取數(shù)據(jù)。本地恢復(fù)目前僅涵蓋鍵控類型的狀態(tài)后端(RocksDB),MemoryStateBackend不支持本地恢復(fù)并忽略此選項(xiàng)。

state.backend.local-recovery:true

3)設(shè)置多目錄 如果有多塊磁盤,也可以考慮指定本地多目錄

state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

注意:不要配置單塊磁盤的多個(gè)目錄,務(wù)必將目錄配置到多塊不同的磁盤上,讓多塊磁盤來分擔(dān)壓力。

bin/flinkrun\-tyarn-per-job\-d\-p5\-Drest.flamegraph.enabled=true\-Dyarn.application.queue=test\-Djobmanager.memory.process.size=1024mb\-Dtaskmanager.memory.process.size=4096mb\-Dtaskmanager.numberOfTaskSlots=2\-Dstate.backend.incremental=true\-Dstate.backend.local-recovery=true\-Dstate.backend.latency-track.keyed-state-enabled=true\-ccom.atguigu.flink.tuning.RocksdbTuning\/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

注意:不要配置單塊磁盤的多個(gè)目錄,務(wù)必將目錄配置到多塊不同的磁盤上,讓多塊磁盤 到的一些參數(shù),如果調(diào)整預(yù)定義選項(xiàng)后還達(dá)不到預(yù)期,再去調(diào)整后面的 block、writebuffer等參數(shù)。 當(dāng) 前 支 持 的 預(yù) 定 義 選 項(xiàng) 有 DEFAULT 、 SPINNING_DISK_OPTIMIZED 、 SPINNING_DISK_OPTIMIZED_HIGH_MEM 或FLASH_SSD_OPTIMIZED。有條件上 SSD的,可以指定為 FLASH_SSD_OPTIMIZED

state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM #設(shè)置為機(jī)械硬盤+內(nèi)存模式

2.1.4 增大block 緩存

整個(gè) RocksDB 共享一個(gè) blockcache,讀數(shù)據(jù)時(shí)內(nèi)存的 cache 大小,該參數(shù)越大讀 數(shù)據(jù)時(shí)緩存命中率越高,默認(rèn)大小為8 MB,建議設(shè)置到64 ~ 256 MB。 state.backend.rocksdb.block.cache-size: 64m 默認(rèn)8m

2.1.5 增大 writebuffer 和 level 閾值大小

RocksDB 中,每個(gè) State 使用一個(gè) ColumnFamily,每個(gè) ColumnFamily 使用獨(dú)占的 write buffer,默認(rèn) 64MB,建議調(diào)大。 調(diào)整這個(gè)參數(shù)通常要適當(dāng)增加 L1 層的大小閾值 max-size-level-base,默認(rèn) 256m。該值太小會(huì)造成能存放的 SST 文件過少,層級(jí)變多造成查找困難,太大會(huì)造成文件過多,合并困難。建議設(shè)為 target_file_size_base(默認(rèn) 64MB) 的倍數(shù),且不能太小,例如 5~10倍,即 320~640MB。

state.backend.rocksdb.writebuffer.size: 128mstate.backend.rocksdb.compaction.level.max-size-level-base: 320m

2.1.6 增大 writebuffer 數(shù)量

每個(gè) ColumnFamily 對(duì)應(yīng)的 writebuffer 最大數(shù)量,這實(shí)際上是內(nèi)存中“只讀內(nèi)存表“的最大數(shù)量,默認(rèn)值是 2。對(duì)于機(jī)械磁盤來說,如果內(nèi)存足夠大,可以調(diào)大到 5 左右 state.backend.rocksdb.writebuffer.count:5

2.1.7 增大后臺(tái)線程數(shù)和writebuffer 合并數(shù)

1)增大線程數(shù) 用于后臺(tái) flush 和合并 sst 文件的線程數(shù),默認(rèn)為 1,建議調(diào)大,機(jī)械硬盤用戶可以改為 4 等更大的值 state.backend.rocksdb.thread.num: 4 2)增大writebuffer 最小合并數(shù) 將數(shù)據(jù)從 writebuffer 中 flush 到磁盤時(shí),需要合并的 writebuffer 最小數(shù)量,默認(rèn) 值為 1,可以調(diào)成 3。 state.backend.rocksdb.writebuffer.number-to-merge:3

2.1.8 開啟分區(qū)索引功能

Flink1.13 中對(duì) RocksDB 增加了分區(qū)索引功能,復(fù)用了 RocksDB 的 partitioned Index&filter 功能,簡(jiǎn)單來說就是對(duì) RocksDB 的 partitionedIndex 做了多級(jí)索引。 也就是將內(nèi)存中的最上層常駐,下層根據(jù)需要再 load 回來,這樣就大大降低了數(shù)據(jù) Swap競(jìng)爭(zhēng)。線上測(cè)試中,相對(duì)于內(nèi)存比較小的場(chǎng)景中,性能提升 10 倍左右。如果在內(nèi)存管控下 Rocksdb 性能不如預(yù)期的話,這也能成為一個(gè)性能優(yōu)化點(diǎn)。 state.backend.rocksdb.memory.partitioned-index-filters:true #默認(rèn)false

2.1.9 參數(shù)設(shè)定案例

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=4096mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-Dstate.backend.incremental=true \

-Dstate.backend.local-recovery=true \

-Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM \

-Dstate.backend.rocksdb.block.cache-size=64m \

-Dstate.backend.rocksdb.writebuffer.size=128m \

-Dstate.backend.rocksdb.compaction.level.max-size-level-base=320m \

-Dstate.backend.rocksdb.writebuffer.count=5 \

-Dstate.backend.rocksdb.thread.num=4 \

-Dstate.backend.rocksdb.writebuffer.number-to-merge=3 \

-Dstate.backend.rocksdb.memory.partitioned-index-filters=true \

-Dstate.backend.latency-track.keyed-state-enabled=true \

-c com.atguigu.flink.tuning.RocksdbTuning \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

2.2 Checkpoint 設(shè)置

一般需求,我們的 Checkpoint 時(shí)間間隔可以設(shè)置為分鐘級(jí)別(1~5 分鐘)。對(duì)于狀態(tài)很大的任務(wù)每次 Checkpoint 訪問 HDFS 比較耗時(shí),可以設(shè)置為 5~10 分鐘一次 Checkpoint,并且調(diào)大兩次 Checkpoint 之間的暫停間隔,例如設(shè)置兩次 Checkpoint 之間至少暫停 4 或 8 分鐘。同時(shí),也需要考慮時(shí)效性的要求,需要在時(shí)效性和性能之間做一個(gè)平衡,如果時(shí)效性要求高,結(jié)合 end- to-end 時(shí)長(zhǎng),設(shè)置秒級(jí)或毫秒級(jí)。如果 Checkpoint 語義配置為 EXACTLY_ONCE,那么在 Checkpoint 過程中還會(huì)存在 barrier 對(duì)齊的過程,可以通過 FlinkWebUI 的 Checkpoint 選項(xiàng)卡來查看 Checkpoint 過程中各階段的耗時(shí)情況,從而確定到底是哪個(gè)階段導(dǎo)致 Checkpoint 時(shí)間過長(zhǎng)然后針對(duì)性的解決問題。 RocksDB 相關(guān)參數(shù)在前面已說明,可以在 flink-conf.yaml 指定,也可以在 Job 的代碼中調(diào)用 API 單獨(dú)指定,這里不再列出。

// 使? RocksDBStateBackend 做為狀態(tài)后端,并開啟增量 Checkpoint

RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);

env.setStateBackend(rocksDBStateBackend);

// 開啟 Checkpoint,間隔為 3 分鐘

env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));

// 配置 Checkpoint

CheckpointConfig checkpointConf = env.getCheckpointConfig(); checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 最小間隔 4 分鐘

checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))

// 超時(shí)時(shí)間 10 分鐘

checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));

// 保存 checkpoint checkpointConf.enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

第3章 反壓處理

3.1 概述

Flink 網(wǎng)絡(luò)流控及反壓的介紹: https://flink-learning.org.cn/article/detail/138316d1556f8f9d34e517d04d670626

3.1.1 反壓的理解

簡(jiǎn)單來說,F(xiàn)link 拓?fù)渲忻總€(gè)節(jié)點(diǎn)(Task)間的數(shù)據(jù)都以阻塞隊(duì)列的方式傳輸,下游來不及消費(fèi)導(dǎo)致隊(duì)列被占滿后,上游的生產(chǎn)也會(huì)被阻塞,最終導(dǎo)致數(shù)據(jù)源的攝入被阻塞。 反壓(BackPressure)通常產(chǎn)生于這樣的場(chǎng)景:短時(shí)間的負(fù)載高峰導(dǎo)致系統(tǒng)接收數(shù)據(jù)的速率遠(yuǎn)高于它處理數(shù)據(jù)的速率。許多日常問題都會(huì)導(dǎo)致反壓,例如,垃圾回收停頓可能會(huì)導(dǎo)致流入的數(shù)據(jù)快速堆積,或遇到大促、秒殺活動(dòng)導(dǎo)致流量陡增。

3.1.2 反壓的危害

反壓如果不能得到正確的處理,可能會(huì)影響到 checkpoint 時(shí)長(zhǎng)和 state 大小,甚至可能會(huì)導(dǎo)致資源耗盡甚至系統(tǒng)崩潰。 1) 影響 checkpoint 時(shí)長(zhǎng):barrier 不會(huì)越過普通數(shù)據(jù),數(shù)據(jù)處理被阻塞也會(huì)導(dǎo)致 checkpointbarrier 流經(jīng)整個(gè)數(shù)據(jù)管道的時(shí)長(zhǎng)變長(zhǎng),導(dǎo)致 checkpoint 總體時(shí)間(Endto End Duration)變長(zhǎng)。 2)影響 state 大?。篵arrier 對(duì)齊時(shí),接受到較快的輸入管道的 barrier 后,它后面數(shù)據(jù)會(huì)被緩存起來但不處理,直到較慢的輸入管道的 barrier 也到達(dá),這些被緩存的數(shù)據(jù)會(huì)被放到 state 里面,導(dǎo)致 checkpoint 變大。 這兩個(gè)影響對(duì)于生產(chǎn)環(huán)境的作業(yè)來說是十分危險(xiǎn)的,因?yàn)?checkpoint 是保證數(shù)據(jù)一致性的關(guān)鍵,checkpoint 時(shí)間變長(zhǎng)有可能導(dǎo)致 checkpoint超時(shí)失敗,而 state 大小同樣可能拖慢 checkpoint 甚至導(dǎo)致 OOM(使用 Heap-basedStateBackend)或者物理內(nèi)存使用超出容器資源(使用 RocksDBStateBackend)的穩(wěn)定性問題。 因此,我們?cè)谏a(chǎn)中要盡量避免出現(xiàn)反壓的情況。

3.2 定位反壓節(jié)點(diǎn)

解決反壓首先要做的是定位到造成反壓的節(jié)點(diǎn),排查的時(shí)候,先把 operatorchain 禁用,方便定位到具體算子。 提交UvDemo:

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

3.2.1 利用 FlinkWebUI 定位

FlinkWebUI 的反壓監(jiān)控提供了 SubTask 級(jí)別的反壓監(jiān)控,1.13 版本以前是通過周 期性對(duì) Task 線程的棧信息采樣,得到線程被阻塞在請(qǐng)求 Buffe(r 意味著被下游隊(duì)列阻塞)的頻率來判斷該節(jié)點(diǎn)是否處于反壓狀態(tài)。默認(rèn)配置下,這個(gè)頻率在 0.1 以下則為 OK,0.1至 0.5 為 LOW,而超過 0.5 則為 HIGH。 Flink1.13 優(yōu)化了反壓檢測(cè)的邏輯(使用基于任務(wù) Mailbox 計(jì)時(shí),而不在再于堆棧采樣),并且重新實(shí)現(xiàn)了作業(yè)圖的 UI 展示:Flink 現(xiàn)在在 UI 上通過顏色和數(shù)值來展示繁忙和反壓的程度。

1)通過WebUI 看到Map 算子處于反壓:

3)分析瓶頸算子 如果處于反壓狀態(tài),那么有兩種可能性: (1) 該節(jié)點(diǎn)的發(fā)送速率跟不上它的產(chǎn)生數(shù)據(jù)速率。這一般會(huì)發(fā)生在一條輸入多條輸出的 Operator(比如 flatmap)。這種情況,該節(jié)點(diǎn)是反壓的根源節(jié)點(diǎn),它是從 SourceTask到 Sink Task 的第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn)。 (2) 下游的節(jié)點(diǎn)接受速率較慢,通過反壓機(jī)制限制了該節(jié)點(diǎn)的發(fā)送速率。這種情況,需要繼續(xù)排查下游節(jié)點(diǎn),一直找到第一個(gè)為OK 的一般就是根源節(jié)點(diǎn)。 總體來看,如果我們找到第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn),反壓根源要么是就這個(gè)節(jié)點(diǎn),要么是它緊接著的下游節(jié)點(diǎn)。 通常來講,第二種情況更常見。如果無法確定,還需要結(jié)合 Metrics 進(jìn)一步判斷。

3.2.2 利用 Metrics 定位

監(jiān)控反壓時(shí)會(huì)用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有關(guān),最為 有用的是以下幾個(gè) Metrics:

Metris描述outPoolUsage發(fā)送端 Buffer 的使用率inPoolUsage接收端 Buffer 的使用率floatingBuffersUsage(1.9 以上)接收端 FloatingBuffer 的使用率exclusiveBuffersUsage(1.9 以上)接收端 ExclusiveBuffer 的使用率

其中 inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage。

1)根據(jù)指標(biāo)分析反壓

分析反壓的大致思路是:如果一個(gè) Subtask 的發(fā)送端 Buffer 占用率很高,則表明它被下游反壓限速了;如果一個(gè) Subtask 的接受端 Buffer 占用很高,則表明它將反壓傳導(dǎo)至上游。反壓情況可以根據(jù)以下表格進(jìn)行對(duì)號(hào)入座(1.9 以上):

2)可以進(jìn)一步分析數(shù)據(jù)傳輸

Flink1.9 及以上版本,還可以根據(jù) floatingBuffersUsage/exclusiveBuffersUsage 以及其上游 Task 的 outPoolUsage 來進(jìn)行進(jìn)一步的分析一個(gè) Subtask 和其上游Subtask 的數(shù)據(jù)傳輸。 在流量較大時(shí),Channel 的 ExclusiveBuffer 可能會(huì)被寫滿,此時(shí) Flink 會(huì)向 BufferPool 申請(qǐng)剩余的 FloatingBuffer。這些 FloatingBuffer 屬于備用 Buffer。

總結(jié): 1)floatingBuffersUsage 為高,則表明反壓正在傳導(dǎo)至上游 2)同時(shí)exclusiveBuffersUsage 為低,則表明可能有傾斜 比如,floatingBuffersUsage 高、exclusiveBuffersUsage 低為有傾斜,因?yàn)樯贁?shù) channel 占用了大部分的 FloatingBuffer。

3.3 反壓的原因及處理

注意:反壓可能是暫時(shí)的,可能是由于負(fù)載高峰、CheckPoint 或作業(yè)重啟引起的數(shù)據(jù)積壓而導(dǎo)致反壓。如果反壓是暫時(shí)的,應(yīng)該忽略它。另外,請(qǐng)記住,斷斷續(xù)續(xù)的反壓會(huì)影響我們分析和解決問題。 定位到反壓節(jié)點(diǎn)后,分析造成原因的辦法主要是觀察 TaskThread。按照下面的順序,一步一步去排查。

3.3.1 查看是否數(shù)據(jù)傾斜

在實(shí)踐中,很多情況下的反壓是由于數(shù)據(jù)傾斜造成的,這點(diǎn)我們可以通過 WebUI 各 個(gè) SubTask 的 RecordsSent 和 RecordReceived 來確認(rèn),另外 Checkpointdetail 里不同 SubTask 的 Statesize 也是一個(gè)分析數(shù)據(jù)傾斜的有用指標(biāo)。

(關(guān)于數(shù)據(jù)傾斜的詳細(xì)解決方案,會(huì)在下一章節(jié)詳細(xì)討論)

3.3.2 使用火焰圖分析

如果不是數(shù)據(jù)傾斜,最常見的問題可能是用戶代碼的執(zhí)行效率問題(頻繁被阻塞或者性能問題),需要找到瓶頸算子中的哪部分計(jì)算邏輯消耗巨大。 最有用的辦法就是對(duì) TaskManager 進(jìn)行 CPU profile,從中我們可以分析到 Task Thread 是否跑滿一個(gè) CPU 核:如果是的話要分析 CPU 主要花費(fèi)在哪些函數(shù)里面;如果不是的話要看 Task Thread 阻塞在哪里,可能是用戶函數(shù)本身有些同步的調(diào)用,可能是 checkpoint 或者 GC 等系統(tǒng)活動(dòng)導(dǎo)致的暫時(shí)系統(tǒng)暫停。

1)開啟火焰圖功能

Flink1.13 直接在 WebUI 提供 JVM 的 CPU 火焰圖,這將大大簡(jiǎn)化性能瓶頸的分析,默認(rèn)是不開啟的,需要修改參數(shù): rest.flamegraph.enabled:true#默認(rèn)false 也可以在提交時(shí)指定:

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Drest.flamegraph.enabled=true \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

2)WebUI 查看火焰圖

火焰圖是通過對(duì)堆棧跟蹤進(jìn)行多次采樣來構(gòu)建的。每個(gè)方法調(diào)用都由一個(gè)條形表示,其中條形的長(zhǎng)度與其在樣本中出現(xiàn)的次數(shù)成正比。 ? On-CPU: 處于 [RUNNABLE, NEW]狀態(tài)的線程 ? Off-CPU: 處于 [TIMED_WAITING, WAITING, BLOCKED]的線程,用于查看在樣本中發(fā)現(xiàn)的阻塞調(diào)用。

3)分析火焰圖

顏色沒有特殊含義,具體查看: ? 縱向是調(diào)用鏈,從下往上,頂部就是正在執(zhí)行的函數(shù) ? 橫向是樣本出現(xiàn)次數(shù),可以理解為執(zhí)行時(shí)長(zhǎng)。

看頂層的哪個(gè)函數(shù)占據(jù)的寬度最大。只要有"平頂"(plateaus),就表示該函數(shù)可能

存在性能問題。 如果是 Flink1.13 以前的版本,可以手動(dòng)做火焰圖: 如何生成火焰圖:http://www.54tianzhisheng.cn/2020/10/05/flink-jvm-profiler/

3.3.3 分析GC 情況

TaskManager 的內(nèi)存以及 GC 問題也可能會(huì)導(dǎo)致反壓,包括 TaskManagerJVM 各區(qū)內(nèi)存不合理導(dǎo)致的頻繁 Full GC 甚至失聯(lián)。通常建議使用默認(rèn)的 G1 垃圾回收器。 可以通過打印 GC 日志(-XX:+PrintGCDetails),使用 GC 分析器(GCViewer 工具)來驗(yàn)證是否處于這種情況。 ? 在 Flink 提交腳本中,設(shè)置 JVM 參數(shù),打印 GC 日志:

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps" \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

? 下載 GC 日志的方式: 因?yàn)槭?on yarn 模式,運(yùn)行的節(jié)點(diǎn)一個(gè)一個(gè)找比較麻煩??梢源蜷_ WebUI,選擇 JobManager 或者 TaskManager,點(diǎn)擊 Stdout,即可看到 GC 日志,點(diǎn)擊下載按鈕即可將 GC 日志通過 HTTP 的方式下載下來。

? 分析 GC 日志: 通過 GC 日志分析出單個(gè) FlinkTaskmanager 堆總大小、年輕代、老年代分配的內(nèi)存空間、Full GC 后老年代剩余大小等,相關(guān)指標(biāo)定義可以去 Github 具體查看。 GCViewer 地址:https://github.com/chewiebug/GCViewer Linux 下分析: java -jar gcviewer_1.3.4.jar gc.log Windows 下分析: 直接雙擊gcviewer_1.3.4.jar,打開GUI界面,選擇gc的log打開 擴(kuò)展:最重要的指標(biāo)是FullGC 后,老年代剩余大小這個(gè)指標(biāo),按照《Java 性能優(yōu)化權(quán)威指南》這本書 Java 堆大小計(jì)算法則,設(shè) FullGC 后老年代剩余大小空間為 M,那么堆的大小建議 3~4 倍 M,新生代為 1~1.5 倍 M,老年代應(yīng)為 2~3 倍 M。

3.3.4 外部組件交互

如果發(fā)現(xiàn)我們的 Source 端數(shù)據(jù)讀取性能比較低或者 Sink 端寫入性能較差,需要檢查第三方組件是否遇到瓶頸,還有就是做維表join 時(shí)的性能問題。 例如: Kafka 集群是否需要擴(kuò)容,Kafka 連接器是否并行度較低 HBase 的 rowkey 是否遇到熱點(diǎn)問題,是否請(qǐng)求處理不過來 ClickHouse 并發(fā)能力較弱,是否達(dá)到瓶頸 …… 關(guān)于第三方組件的性能問題,需要結(jié)合具體的組件來分析,最常用的思路: 1)異步 io+熱緩存來優(yōu)化讀寫性能 2)先攢批再讀寫維表join 參考: https://flink-learning.org.cn/article/detail/b8df32fbc6542257a5b449114e137cc3

https://www.jianshu.com/p/a62fa483ff54

第4章 數(shù)據(jù)傾斜

4.1 判斷是否存在數(shù)據(jù)傾斜

相同 Task 的多個(gè) Subtask 中, 個(gè)別 Subtask 接收到的數(shù)據(jù)量明顯大于其他 Subtask 接收到的數(shù)據(jù)量,通過 FlinkWebUI 可以精確地看到每個(gè) Subtask 處理了多少數(shù)據(jù),即可判斷出 Flink 任務(wù)是否存在數(shù)據(jù)傾斜。通常,數(shù)據(jù)傾斜也會(huì)引起反壓。 [外鏈圖片轉(zhuǎn)存中…(img-JfOtB1YV-1708782286119)] 另外, 有時(shí) Checkpointdetail 里不同 SubTask 的 Statesize 也是一個(gè)分析數(shù)據(jù)傾斜的有用指標(biāo)。

4.2 數(shù)據(jù)傾斜的解決

4.2.1 keyBy 后的聚合操作存在數(shù)據(jù)傾斜

提交案例:

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SkewDemo1 \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--local-keyby false

查看webui: [外鏈圖片轉(zhuǎn)存中…(img-MbFj2rXE-1708782286119)]

1)為什么不能直接用二次聚合來處理

Flink 是實(shí)時(shí)流處理,如果keyby 之后的聚合操作存在數(shù)據(jù)傾斜,且沒有開窗口(沒攢批)的情況下,簡(jiǎn)單的認(rèn)為使用兩階段聚合,是不能解決問題的。因?yàn)檫@個(gè)時(shí)候Flink 是來一條處理一條,且向下游發(fā)送一條結(jié)果,對(duì)于原來 keyby 的維度(第二階段聚合)來講,數(shù)據(jù)量并沒有減少,且結(jié)果重復(fù)計(jì)算(非 FlinkSQL,未使用回撤流),如下圖所示:

2)使用 LocalKeyBy 的思想

在 keyBy 上游算子數(shù)據(jù)發(fā)送之前,首先在上游算子的本地對(duì)數(shù)據(jù)進(jìn)行聚合后,再發(fā)送到下游,使下游接收到的數(shù)據(jù)量大大減少,從而使得 keyBy 之后的聚合操作不再是任務(wù)的瓶頸。類似 MapReduce 中 Combiner 的思想,但是這要求聚合操作必須是多條數(shù)據(jù)或者一批數(shù)據(jù)才能聚合,單條數(shù)據(jù)沒有辦法通過聚合來減少數(shù)據(jù)量。從 FlinkLocalKeyBy 實(shí)現(xiàn)原理來講,必然會(huì)存在一個(gè)積攢批次的過程,在上游算子中必須攢夠一定的數(shù)據(jù)量,對(duì)這些數(shù)據(jù)聚合后再發(fā)送到下游。

實(shí)現(xiàn)方式:

? DataStreamAPI 需要自己寫代碼實(shí)現(xiàn) ? SQL 可以指定參數(shù),開啟miniBatch 和 LocalGlobal 功能(推薦,后續(xù)介紹)

3)DataStreamAPI 自定義實(shí)現(xiàn)的案例

以計(jì)算每個(gè) mid 出現(xiàn)的次數(shù)為例,keyby 之前,使用 flatMap 實(shí)現(xiàn) LocalKeyby 功能

import org.apache.flink.api.common.functions.RichFlatMapFunction;

import org.apache.flink.api.common.state.ListState;

import org.apache.flink.api.common.state.ListStateDescriptor;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.runtime.state.FunctionInitializationContext;

import org.apache.flink.runtime.state.FunctionSnapshotContext;

import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;

import org.apache.flink.util.Collector;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.atomic.AtomicInteger;

public class LocalKeyByFlatMapFunc extends RichFlatMapFunction, Tuple2> implements CheckpointedFunction {

//Checkpoint 時(shí)為了保證 Exactly Once,將 buffer 中的數(shù)據(jù)保存到該 ListState 中

private ListState> listState;

//本地 buffer,存放 local 端緩存的 mid 的 count 信息

private HashMap localBuffer;

//緩存的數(shù)據(jù)量大小,即:緩存多少數(shù)據(jù)再向下游發(fā)送 private int batchSize;

//計(jì)數(shù)器,獲取當(dāng)前批次接收的數(shù)據(jù)量 private AtomicInteger currentSize;

//構(gòu)造器,批次大小傳參

public LocalKeyByFlatMapFunc(int batchSize) {

this.batchSize = batchSize;

}

@Override

public void flatMap(Tuple2 value, Collector> out) throws Exception {

// 1、將新來的數(shù)據(jù)添加到 buffer 中

Long count = localBuffer.getOrDefault(value, 0L);

localBuffer.put(value.f0, count + 1);

// 2、如果到達(dá)設(shè)定的批次,則將 buffer 中的數(shù)據(jù)發(fā)送到下游

if (currentSize.incrementAndGet() >= batchSize) {

// 2.1 遍歷 Buffer 中數(shù)據(jù),發(fā)送到下游

for (Map.Entry midAndCount : localBuffer.entrySet()) {

out.collect(Tuple2.of(midAndCount.getKey(),

midAndCount.getValue()));

}

// 2.2 Buffer 清空,計(jì)數(shù)器清零 localBuffer.clear(); currentSize.set(0);

}

}

@Override

public void snapshotState(FunctionSnapshotContext context) throws Exception {

// 將 buffer 中的數(shù)據(jù)保存到狀態(tài)中,來保證 Exactly Once listState.clear();

for (Map.Entry midAndCount : localBuffer.entrySet()) {

listState.add(Tuple2.of(midAndCount.getKey(), midAndCount.getValue()));

}

}

@Override

public void initializeState(FunctionInitializationContext context) throws Exception {

// 從狀態(tài)中恢復(fù) buffer 中的數(shù)據(jù)

listState = context.getOperatorStateStore().getListState(new ListStateDescriptor>(

"localBufferState", Types.TUPLE(Types.STRING, Types.LONG)

)

);

localBuffer = new HashMap();

if (context.isRestored()) {

// 從狀態(tài)中恢復(fù)數(shù)據(jù)到 buffer 中

for (Tuple2 midAndCount : listState.get()) {

// 如果出現(xiàn) pv != 0,說明改變了并行度,ListState 中的數(shù)據(jù)會(huì)被均勻分發(fā)到新的 subtask 中

}

// 單個(gè) subtask 恢復(fù)的狀態(tài)中可能包含多個(gè)相同的 mid 的 count 數(shù)據(jù)

// 所以每次先取一下 buffer 的值,累加再 put

long count = localBuffer.getOrDefault(midAndCount.f0, 0L);

localBuffer.put(midAndCount.f0, count + midAndCount.f1);

// 從狀態(tài)恢復(fù)時(shí),默認(rèn)認(rèn)為 buffer 中數(shù)據(jù)量達(dá)到了 batchSize,需要向下游發(fā) currentSize = new AtomicInteger(batchSize);

} else {

currentSize = new AtomicInteger(0);

}

}

}

提交 localkeyby 案例:

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SkewDemo1 \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--local-keyby true

查看webui: [外鏈圖片轉(zhuǎn)存中…(img-u2NXOIBN-1708782286121)]

可以看到每個(gè)subtask 處理的數(shù)據(jù)量基本均衡,另外處理的數(shù)據(jù)量相比原先少了很多。

4.2.2 keyBy 之前發(fā)生數(shù)據(jù)傾斜

如果 keyBy 之前就存在數(shù)據(jù)傾斜,上游算子的某些實(shí)例可能處理的數(shù)據(jù)較多,某些實(shí)例可能處理的數(shù)據(jù)較少,產(chǎn)生該情況可能是因?yàn)閿?shù)據(jù)源的數(shù)據(jù)本身就不均勻,例如由于某些原因 Kafka 的 topic 中某些 partition 的數(shù)據(jù)量較大,某些 partition 的數(shù)據(jù)量較少。 對(duì)于不存在 keyBy 的 Flink 任務(wù)也會(huì)出現(xiàn)該情況。 這種情況,需要讓 Flink 任務(wù)強(qiáng)制進(jìn)行shuffle。使用 shuffle、rebalance 或 rescale 算子即可將數(shù)據(jù)均勻分配,從而解決數(shù)據(jù)傾斜的問題。

4.2.3 keyBy 后的窗口聚合操作存在數(shù)據(jù)傾斜

因?yàn)槭褂昧舜翱?,變成了有界?shù)據(jù)(攢批)的處理,窗口默認(rèn)是觸發(fā)時(shí)才會(huì)輸出一條結(jié)果發(fā)往下游,所以可以使用兩階段聚合的方式:

1)實(shí)現(xiàn)思路:

? 第一階段聚合:key 拼接隨機(jī)數(shù)前綴或后綴,進(jìn)行 keyby、開窗、聚合 注意:聚合完不再是 WindowedStream,要獲取 WindowEnd 作為窗口標(biāo)記作為第二階段分組依據(jù),避免不同窗口的結(jié)果聚合到一起) ? 第二階段聚合:按照原來的 key 及windowEnd 作keyby、聚合

2)提交原始案例

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SkewDemo2 \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--two-phase false

查看WebUI: [外鏈圖片轉(zhuǎn)存中…(img-F9tbscbQ-1708782286121)]

3)提交兩階段聚合的案例

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SkewDemo2 \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--two-phase true \

--random-num 16

查看WebUI:可以看到第一次打散的窗口聚合,比較均勻 [外鏈圖片轉(zhuǎn)存中…(img-KW7nLgYl-1708782286122)] 第二次聚合,也比較均勻: [外鏈圖片轉(zhuǎn)存中…(img-JmUUMuEJ-1708782286122)] 隨機(jī)數(shù)范圍,需要自己去測(cè),因?yàn)?keyby 的分區(qū)器是(兩次 hash*下游并行度/最大并行度) SQL 寫法參考:https://zhuanlan.zhihu.com/p/197299746

第5章 Job 優(yōu)化

5.1 使用 DataGen 造數(shù)據(jù)

開發(fā)完 Flink 作業(yè),壓測(cè)的方式很簡(jiǎn)單,先在 kafka 中積壓數(shù)據(jù),之后開啟 Flink 任務(wù),出現(xiàn)反壓,就是處理瓶頸。相當(dāng)于水庫(kù)先積水,一下子泄洪。 數(shù)據(jù)可以是自己造的模擬數(shù)據(jù),也可以是生產(chǎn)中的部分?jǐn)?shù)據(jù)。造測(cè)試數(shù)據(jù)的工具: DataFactory、datafaker 、DBMonster、Data-Processer 、Nexmark、Jmeter 等。 Flink 從 1.11 開始提供了一個(gè)內(nèi)置的 DataGen 連接器,主要是用于生成一些隨機(jī)數(shù),用于在沒有數(shù)據(jù)源的時(shí)候,進(jìn)行流任務(wù)的測(cè)試以及性能測(cè)試等。

5.1.1 DataStream 的DataGenerator

import com.atguigu.flink.tuning.bean.OrderInfo;

import com.atguigu.flink.tuning.bean.UserInfo;

import org.apache.commons.math3.random.RandomDataGenerator;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.configuration.RestOptions;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;

import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;

import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;

public class DataStreamDataGenDemo {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(1);

env.disableOperatorChaining();

SingleOutputStreamOperator orderInfoDS = env

.addSource(new DataGeneratorSource<>(new RandomGenerator() {

@Override

public OrderInfo next() {

return new OrderInfo(

random.nextInt(1, 100000),

random.nextLong(1, 1000000),

random.nextUniform(1, 1000), System.currentTimeMillis());

}

}))

.returns(Types.POJO(OrderInfo.class));

SingleOutputStreamOperator userInfoDS = env

.addSource(new DataGeneratorSource(

new SequenceGenerator(1, 1000000) {

final RandomDataGenerator random = new RandomDataGenerator();

@Override

public UserInfo next() {

return new UserInfo(

valuesToEmit.peek().intValue(), valuesToEmit.poll().longValue(), random.nextInt(1, 100),

random.nextInt(0, 1));

}

}

))

.returns(Types.POJO(UserInfo.class));

orderInfoDS.print("order>>");

userInfoDS.print("user>>");

env.execute();

}

}

5.1.2 SQL 的DataGenerator

import org.apache.flink.configuration.Configuration;

import org.apache.flink.configuration.RestOptions;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SQLDataGenDemo {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set(RestOptions.ENABLE_FLAMEGRAPH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(1);

env.disableOperatorChaining();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String orderSql = "CREATE TABLE order_info (\n" +

" id INT,\n" +

" user_id BIGINT,\n" +

" total_amount DOUBLE,\n" +

" create_time AS localtimestamp,\n" +

" WATERMARK FOR create_time AS create_time\n" + ") WITH (\n" +

" 'connector' = 'datagen',\n" +

" 'rows-per-second'='20000',\n" + " 'fields.id.kind'='sequence',\n" + " 'fields.id.start'='1',\n" +

" 'fields.id.end'='100000000',\n" +

" 'fields.user_id.kind'='random',\n" + " 'fields.user_id.min'='1',\n" +

" 'fields.user_id.max'='1000000',\n" +

" 'fields.total_amount.kind'='random',\n" + " 'fields.total_amount.min'='1',\n" +

" 'fields.total_amount.max'='1000'\n" +

)";

String userSql = "CREATE TABLE user_info (\n" + " id INT,\n" +

" user_id BIGINT,\n" + " age INT,\n" +

" sex INT\n" + ") WITH (\n" +

" 'connector' = 'datagen',\n" +

" 'rows-per-second'='20000',\n" + " 'fields.id.kind'='sequence',\n" + " 'fields.id.start'='1',\n" +

" 'fields.id.end'='100000000',\n" +

" 'fields.user_id.kind'='sequence',\n" + " 'fields.user_id.start'='1',\n" +

" 'fields.user_id.end'='1000000',\n" + " 'fields.age.kind'='random',\n" +

" 'fields.age.min'='1',\n" +

" 'fields.age.max'='100',\n" +

" 'fields.sex.kind'='random',\n" + " 'fields.sex.min'='0',\n" +

" 'fields.sex.max'='1'\n" +

")";

tableEnv.executeSql(orderSql);

tableEnv.executeSql(userSql);

tableEnv.executeSql("select * from order_info").print();

//tableEnv.executeSql("select * from user_info").print();

}

}

5.2 算子指定 UUID

對(duì)于有狀態(tài)的 Flink 應(yīng)用,推薦給每個(gè)算子都指定唯一用戶 ID(UUID)。 嚴(yán)格地說,僅需要給有狀態(tài)的算子設(shè)置就足夠了。但是因?yàn)?Flink 的某些內(nèi)置算子(如 window)是有狀態(tài)的,而有些是無狀態(tài)的,可能用戶不是很清楚哪些內(nèi)置算子是有狀態(tài)的,哪些不是。 所以從實(shí)踐經(jīng)驗(yàn)上來說,我們建議每個(gè)算子都指定上 UUID。 默認(rèn)情況下,算子 UID 是根據(jù) JobGraph 自動(dòng)生成的,JobGraph 的更改可能會(huì)導(dǎo)致 UUID 改變。手動(dòng)指定算子 UUID ,可以讓 Flink 有效地將算子的狀態(tài)從 savepoint 映射到作業(yè)修改后(拓?fù)鋱D可能也有改變)的正確的算子上。比如替換原來的 Operator 實(shí)現(xiàn)、增加新的Operator、刪除Operator 等等,至少我們有可能與Savepoint 中存儲(chǔ)的Operator狀態(tài)對(duì)應(yīng)上。這是 savepoint 在 Flink 應(yīng)用中正常工作的一個(gè)基本要素。 Flink 算子的 UUID 可以通過 uid(Stringuid) 方法指定,通常也建議指定name。

#算子.uid("指定 uid")

.reduce((value1, value2) -> Tuple3.of("uv", value2.f1, value1.f2 + value2.f2))

.uid("uv-reduce").name("uv-reduce")

1)提交案例:未指定uid

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

觸發(fā)保存點(diǎn):

//直接觸發(fā)

flink savepoint [targetDirectory] [-yid yarnAppId] #on yarn 模式需要指定-yid 參數(shù)

//cancel 觸發(fā)

flink cancel -s [targetDirectory] [-yid yarnAppId] #on yarn 模式需要指定-yid 參數(shù)

bin/flink cancel -s hdfs://hadoop1:8020/flink-tuning/sp 98acff568e8f0827a67ff37648a29d7f -yid application_1640503677810_0017

修改代碼,從savepoint 恢復(fù):

bin/flink run \

-t yarn-per-job \

-s hdfs://hadoop1:8020/flink-tuning/sp/savepoint-066c90-6edf948686f6 \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UvDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

報(bào)錯(cuò)如下:

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://hadoop1:8020/flink-tuning/sp/savepo

int-066c90-6edf948686f6. Cannot map checkpoint/savepoint state for operator ddb598ad156ed281023ba4eebbe487e3 to the new program,

because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredSt

ate option on the CLI.

臨時(shí)處理:在提交命令中添加–allowNonRestoredState(short: -n)跳過無法恢復(fù)的算子。

2)提交案例:指定uid

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UidDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

觸發(fā)保存點(diǎn):

//cancel 觸發(fā) savepoint

bin/flink cancel -s hdfs://hadoop1:8020/flink-tuning/sp 272e5d3321c5c1481cc327f6abe8cf9c

-yid application_1640268344567_0033

修改代碼,從保存點(diǎn)恢復(fù):

bin/flink run \

-t yarn-per-job \

-s hdfs://hadoop1:8020/flink-tuning/sp/savepoint-272e5d-d0c1097d23e0 \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.UidDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

5.3 鏈路延遲測(cè)量

對(duì)于實(shí)時(shí)的流式處理系統(tǒng)來說,我們需要關(guān)注數(shù)據(jù)輸入、計(jì)算和輸出的及時(shí)性,所以處理延遲是一個(gè)比較重要的監(jiān)控指標(biāo),特別是在數(shù)據(jù)量大或者軟硬件條件不佳的環(huán)境下。Flink提供了開箱即用的LatencyMarker 機(jī)制來測(cè)量鏈路延遲。開啟如下參數(shù): metrics.latency.interval:30000 #默認(rèn)0,表示禁用,單位毫秒 監(jiān)控的粒度,分為以下 3 檔: ? single:每個(gè)算子單獨(dú)統(tǒng)計(jì)延遲; ? operator(默認(rèn)值):每個(gè)下游算子都統(tǒng)計(jì)自己與 Source 算子之間的延遲; ? subtask:每個(gè)下游算子的 sub-task 都統(tǒng)計(jì)自己與 Source 算子的 sub-task 之間的延遲。 metrics.latency.granularity:operator#默認(rèn)operator 一般情況下采用默認(rèn)的 operator 粒度即可,這樣在 Sink 端觀察到的 latencymetric就是我們最想要的全鏈路(端到端)延遲。subtask 粒度太細(xì),會(huì)增大所有并行度的負(fù)擔(dān),不建議使用。 LatencyMarker 不會(huì)參與到數(shù)據(jù)流的用戶邏輯中的,而是直接被各算子轉(zhuǎn)發(fā)并統(tǒng)計(jì)。為了讓它盡量精確,有兩點(diǎn)特別需要注意: ? 保證Flink 集群內(nèi)所有節(jié)點(diǎn)的時(shí)區(qū)、時(shí)間是同步的:ProcessingTimeService 產(chǎn)生時(shí)間戳最終是靠 System.currentTimeMillis()方法,可以用 ntp 等工具來配置。 ? metrics.latency.interval 的時(shí)間間隔宜大不宜小:一般配置成 30000(30 秒)左右。一是因?yàn)檠舆t監(jiān)控的頻率可以不用太頻繁,二是因?yàn)?LatencyMarker 的處理也要消耗一定性能。 提交案例:

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-Dmetrics.latency.interval=30000 \

-c com.atguigu.flink.tuning.UidDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

可以通過下面的metric 查看結(jié)果: flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency 端到端延遲的 tag 只有 murmurhash 過的算子 ID(用 uid()方法設(shè)定的),并沒有算子名稱,(https://issues.apache.org/jira/browse/FLINK-8592)并且官方暫時(shí)不打算解 決這個(gè)問題,所以我們要么用最大值來表示,要么將作業(yè)中 Sink 算子的 ID 統(tǒng)一化。比如使用了 Prometheus 和 Grafana 來監(jiān)控,效果如下: [外鏈圖片轉(zhuǎn)存中…(img-yW3RmqtQ-1708782286122)]

5.4 開啟對(duì)象重用

[外鏈圖片轉(zhuǎn)存中…(img-J2fLQc9s-1708782286122)] 當(dāng)調(diào)用了 enableObjectReuse 方法后,F(xiàn)link 會(huì)把中間深拷貝的步驟都省略掉, SourceFunction 產(chǎn)生的數(shù)據(jù)直接作為 MapFunction 的輸入,可以減少 gc 壓力。但需要特別注意的是,這個(gè)方法不能隨便調(diào)用,必須要確保下游 Function 只有一種,或者下游的 Function 均不會(huì)改變對(duì)象內(nèi)部的值。否則可能會(huì)有線程安全的問題。

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-Dpipeline.object-reuse=true \

-Dmetrics.latency.interval=30000 \

-c com.atguigu.flink.tuning.UidDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

5.5 細(xì)粒度滑動(dòng)窗口優(yōu)化

1)細(xì)粒度滑動(dòng)的影響

當(dāng)使用細(xì)粒度的滑動(dòng)窗口(窗口長(zhǎng)度遠(yuǎn)遠(yuǎn)大于滑動(dòng)步長(zhǎng))時(shí),重疊的窗口過多,一個(gè)數(shù)據(jù)會(huì)屬于多個(gè)窗口,性能會(huì)急劇下降。

[外鏈圖片轉(zhuǎn)存中…(img-vYsxJm7s-1708782286123)]

我們經(jīng)常會(huì)碰到這種需求:以 3 分鐘的頻率實(shí)時(shí)計(jì)算 App 內(nèi)各個(gè)子模塊近 24 小時(shí)的 PV 和 UV。我們需要用粒度為 1440 / 3 = 480 的滑動(dòng)窗口來實(shí)現(xiàn)它,但是細(xì)粒度的滑動(dòng)窗口會(huì)帶來性能問題,有兩點(diǎn): ? 狀態(tài) 對(duì)于一個(gè)元素,會(huì)將其寫入對(duì)應(yīng)的(key, window)二元組所圈定的 windowState 狀態(tài)中。如果粒度為 480,那么每個(gè)元素到來,更新 windowState 時(shí)都要遍歷 480 個(gè)窗口并寫入,開銷是非常大的。在采用 RocksDB 作為狀態(tài)后端時(shí),checkpoint 的瓶頸也尤其明顯。 ? 定時(shí)器 每一個(gè)(key, window)二元組都需要注冊(cè)兩個(gè)定時(shí)器:一是觸發(fā)器注冊(cè)的定時(shí)器,用于決定窗口數(shù)據(jù)何時(shí)輸出;二是 registerCleanupTimer()方法注冊(cè)的清理定時(shí)器,用于在窗口徹底過期(如 allowedLateness 過期)之后及時(shí)清理掉窗口的內(nèi)部狀態(tài)。細(xì)粒度滑動(dòng)窗口會(huì)造成維護(hù)的定時(shí)器增多,內(nèi)存負(fù)擔(dān)加重。

2)解決思路

DataStreamAPI 中,自己解決(https://issues.apache.org/jira/browse/FLINK-7001)。 我們一般使用滾動(dòng)窗口+在線存儲(chǔ)+讀時(shí)聚合的思路作為解決方案: (1) 從業(yè)務(wù)的視角來看,往往窗口的長(zhǎng)度是可以被步長(zhǎng)所整除的,可以找到窗口長(zhǎng)度和窗口步長(zhǎng)的最小公約數(shù)作為時(shí)間分片(一個(gè)滾動(dòng)窗口的長(zhǎng)度); (2) 每個(gè)滾動(dòng)窗口將其周期內(nèi)的數(shù)據(jù)做聚合,存到下游狀態(tài)或打入外部在線存儲(chǔ)(內(nèi)存數(shù)據(jù)庫(kù)如Redis,LSM-based NoSQL 存儲(chǔ)如 HBase); (3) 掃描在線存儲(chǔ)中對(duì)應(yīng)時(shí)間區(qū)間(可以靈活指定)的所有行,并將計(jì)算結(jié)果返回給前端展示。

3)細(xì)粒度的滑動(dòng)窗口案例

提交案例:統(tǒng)計(jì)最近 1 小時(shí)的 uv,1 秒更新一次(滑動(dòng)窗口)

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SlideWindowDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--sliding-split false

[外鏈圖片轉(zhuǎn)存中…(img-G4mC63Cm-1708782286123)]

4)時(shí)間分片案例

提交案例:統(tǒng)計(jì)最近 1 小時(shí)的 uv,1 秒更新一次(滾動(dòng)窗口+狀態(tài)存儲(chǔ))

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SlideWindowDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--sliding-split true

[外鏈圖片轉(zhuǎn)存中…(img-0nQaUNTk-1708782286123)]

Flink1.13 對(duì) SQL 模塊的 WindowTVF 進(jìn)行了一系列的性能優(yōu)化,可以自動(dòng)對(duì)滑動(dòng)窗口進(jìn)行切片解決細(xì)粒度滑動(dòng)問題。 [外鏈圖片轉(zhuǎn)存中…(img-WGCPslpr-1708782286123)] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queri es/window-tvf

第6章 FlinkSQL 調(diào)優(yōu)

FlinkSQL 官網(wǎng)配置參數(shù): https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html

6.1 設(shè)置空閑狀態(tài)保留時(shí)間

Flink SQL 新手有可能犯的錯(cuò)誤,其中之一就是忘記設(shè)置空閑狀態(tài)保留時(shí)間導(dǎo)致狀態(tài)爆炸。列舉兩個(gè)場(chǎng)景: ? FlinkSQL 的regularjoin(inner、left、right),左右表的數(shù)據(jù)都會(huì)一直保存在狀態(tài)里,不會(huì)清理!要么設(shè)置 TTL,要么使用 FlinkSQL 的 interval join。 ? 使用 Top-N 語法進(jìn)行去重,重復(fù)數(shù)據(jù)的出現(xiàn)一般都位于特定區(qū)間內(nèi)(例如一小時(shí)或一天內(nèi)),過了這段時(shí)間之后,對(duì)應(yīng)的狀態(tài)就不再需要了。 FlinkSQL 可以指定空閑狀態(tài)(即未更新的狀態(tài))被保留的最小時(shí)間,當(dāng)狀態(tài)中某個(gè) key 對(duì)應(yīng)的狀態(tài)未更新的時(shí)間達(dá)到閾值時(shí),該條狀態(tài)被自動(dòng)清理:

#API 指定 tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1)); #參數(shù)指定

Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.state.ttl", "1 h");

6.2 開啟MiniBatch

MiniBatch 是微批處理,原理是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對(duì) State 的訪問,從而提升吞吐并減少數(shù)據(jù)的輸出量。MiniBatch 主要依靠在每個(gè) Task 上注冊(cè)的 Timer 線程來觸發(fā)微批,需要消耗一定的線程調(diào)度性能。

? MiniBatch 默認(rèn)關(guān)閉,開啟方式如下:

// 初始化 tableenvironment TableEnvironment tEnv = …// 獲取 tableEnv 的配置對(duì)象Configurationconfiguration=tEnv.getConfig().getConfiguration();// 設(shè)置參數(shù):// 開啟 miniBatchconfiguration.setString(“table.exec.mini-batch.enabled”,“true”);// 批量輸出的間隔時(shí)間configuration.setString(“table.exec.mini-batch.allow-latency”,“5s”);// 防止 OOM 設(shè)置每個(gè)批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條configuration.setString(“table.exec.mini-batch.size”,“20000”);

適用場(chǎng)景 微批處理通過增加延遲換取高吞吐,如果有超低延遲的要求,不建議開啟微批處理。通常對(duì)于聚合的場(chǎng)景,微批處理可以顯著的提升系統(tǒng)性能,建議開啟。 [外鏈圖片轉(zhuǎn)存中…(img-GlLuNuVb-1708782286123)]

? 注意事項(xiàng): 1)目前,key-value 配置項(xiàng)僅被 Blinkplanner支持。 2)1.12 之前的版本有 bug,開啟 miniBatch,不會(huì)清理過期狀態(tài),也就是說如果設(shè)置狀態(tài)的 TTL,無法清理過期狀態(tài)。1.12 版本才修復(fù)這個(gè)問題。 參考 ISSUE:https://issues.apache.org/jira/browse/FLINK-17096

6.3 開啟LocalGlobal

6.3.1 原理概述

LocalGlobal 優(yōu)化將原先的 Aggregate 分成 Local+Global 兩階段聚合, 即 MapReduce 模型中的 Combine+Reduce 處理模式。第一階段在上游節(jié)點(diǎn)本地?cái)€一批數(shù)據(jù)進(jìn)行聚合(localAgg),并輸出這次微批的增量值(Accumulator)。第二階段再將收到的 Accumulator 合并(Merge),得到最終的結(jié)果(GlobalAgg)。 LocalGlobal 本質(zhì)上能夠靠 LocalAgg 的聚合篩除部分傾斜數(shù)據(jù),從而降低 GlobalAgg的熱點(diǎn),提升性能。結(jié)合下圖理解 LocalGlobal 如何解決數(shù)據(jù)傾斜的問題。 [外鏈圖片轉(zhuǎn)存中…(img-NweLLWUQ-1708782286123)]

由上圖可知: l 未開啟 LocalGlobal 優(yōu)化,由于流中的數(shù)據(jù)傾斜,Key 為紅色的聚合算子實(shí)例需要處理更多的記錄,這就導(dǎo)致了熱點(diǎn)問題。 l 開啟 LocalGlobal 優(yōu)化后,先進(jìn)行本地聚合,再進(jìn)行全局聚合。可大大減少 GlobalAgg 的熱點(diǎn),提高性能。

? LocalGlobal 開啟方式:

1) LocalGlobal 優(yōu)化需要先開啟 MiniBatch,依賴于 MiniBatch 的參數(shù)。 2) table.optimizer.agg-phase-strategy: 聚合策略。默認(rèn) AUTO,支持參數(shù) AUTO、 TWO_PHASE(使用 LocalGlobal 兩階段聚合)、ONE_PHASE(僅使用 Global 一階段聚合)。

// 初始化 tableenvironment TableEnvironment tEnv = …

// 獲取 tableEnv 的配置對(duì)象 Configurationconfiguration=tEnv.getConfig().getConfiguration();

// 設(shè)置參數(shù): // 開啟 miniBatch configuration.setString(“table.exec.mini-batch.enabled”,“true”); // 批量輸出的間隔時(shí)間 configuration.setString(“table.exec.mini-batch.allow-latency”,“5s”); // 防止 OOM 設(shè)置每個(gè)批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條 configuration.setString(“table.exec.mini-batch.size”,“20000”); // 開啟 LocalGlobal configuration.setString(“table.optimizer.agg-phase-strategy”,“TWO_PHASE”); |

? 注意事項(xiàng): 1) 需要先開啟 MiniBatch 2) 開啟 LocalGlobal 需要 UDAF 實(shí)現(xiàn) Merge 方法。

6.3.2 提交案例:統(tǒng)計(jì)每天每個(gè) mid 出現(xiàn)次數(shù)

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SqlDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--demo count

[外鏈圖片轉(zhuǎn)存中…(img-bZAuzWF7-1708782286123)] 可以看到存在數(shù)據(jù)傾斜。

6.3.3 提交案例:開啟miniBatch 和LocalGlobal

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SqlDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--demo count \

--minibatch true \

--local-global true

從 WebUI 可以看到分組聚合變成了 Local 和 Global 兩部分,數(shù)據(jù)相對(duì)均勻,且沒有數(shù)據(jù)傾斜。

6.4 開啟 SplitDistinct

LocalGlobal 優(yōu)化針對(duì)普通聚合(例如SUM、COUNT、MAX、MIN 和AVG)有較好的效果,對(duì)于 DISTINCT 的聚合(如 COUNT DISTINCT)收效不明顯,因?yàn)?COUNT DISTINCT 在 Local 聚合時(shí),對(duì)于 DISTINCT KEY 的去重率不高,導(dǎo)致在 Global 節(jié)點(diǎn)仍然存在熱點(diǎn)。

6.4.1 原理概述

之前,為了解決 COUNTDISTINCT 的熱點(diǎn)問題,通常需要手動(dòng)改寫為兩層聚合(增加按 Distinct Key 取模的打散層)。 從 Flink1.9.0 版本開始, 提供了 COUNT DISTINCT 自動(dòng)打散功能, 通過 HASH_CODE(distinct_key) % BUCKET_NUM 打散,不需要手動(dòng)重寫。Split Distinct 和 LocalGlobal 的原理對(duì)比參見下圖。 [外鏈圖片轉(zhuǎn)存中…(img-mJegihAP-1708782286123)] Distinct 舉例:

SELECT a, COUNT(DISTINCT b) FROM T

GROUP BY a

手動(dòng)打散舉例:

SELECT a, SUM(cnt) FROM (

SELECT a, COUNT(DISTINCT b) as cnt FROM T

GROUP BY a, MOD(HASH_CODE(b), 1024)

)

GROUP BY a

? SplitDistinct 開啟方式 默認(rèn)不開啟,使用參數(shù)顯式開啟: l table.optimizer.distinct-agg.split.enabled:true,默認(rèn)false。 l table.optimizer.distinct-agg.split.bucket-num: SplitDistinct 優(yōu)化在第一層聚 合中,被打散的 bucket 數(shù)目。默認(rèn) 1024。

// 初始化 table environment TableEnvironment tEnv = ...

// 獲取 tableEnv 的配置對(duì)象

Configuration configuration = tEnv.getConfig().getConfiguration();

// 設(shè)置參數(shù):(要結(jié)合 minibatch 一起使用)

// 開啟 Split Distinct

configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

// 第一層打散的 bucket 數(shù)目

configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

? 注意事項(xiàng): (1) 目前不能在包含UDAF 的 FlinkSQL 中使用SplitDistinct 優(yōu)化方法。 (2) 拆分出來的兩個(gè) GROUP 聚合還可參與LocalGlobal 優(yōu)化。 (3) 該功能在 Flink1.9.0 版本及以上版本才支持。

6.4.2 提交案例:count(distinct)存在熱點(diǎn)問題

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SqlDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--demo distinct

[外鏈圖片轉(zhuǎn)存中…(img-LiawI497-1708782286124)] 可以看到存在熱點(diǎn)問題。

6.4.3 提交案例:開啟splitdistinct

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SqlDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--demo distinct \

--minibatch true \

--split-distinct true

[外鏈圖片轉(zhuǎn)存中…(img-pUkyQyIv-1708782286124)] 從WebUI 可以看到有兩次聚合,而且有 partialFinal 字樣,第二次聚合時(shí)已經(jīng)均勻。

6.5 多維 DISTINCT 使用Filter

6.5.1 原理概述

在某些場(chǎng)景下,可能需要從不同維度來統(tǒng)計(jì) count(distinct)的結(jié)果(比如統(tǒng)計(jì) uv、 app 端的 uv、web 端的 uv),可能會(huì)使用如下 CASE WHEN 語法。

SELECT

a,

COUNT(DISTINCT b) AS total_b,

COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b, COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b FROM T

GROUP BY a

在這種情況下,建議使用 FILTER 語法, 目前的 FlinkSQL 優(yōu)化器可以識(shí)別同一唯一鍵上的不同 FILTER 參數(shù)。如,在上面的示例中,三個(gè) COUNTDISTINCT 都作用在 b 列上。此時(shí),經(jīng)過優(yōu)化器識(shí)別后,F(xiàn)link 可以只使用一個(gè)共享狀態(tài)實(shí)例,而不是三個(gè)狀態(tài)實(shí)例,可減少狀態(tài)的大小和對(duì)狀態(tài)的訪問。 將上邊的 CASEWHEN 替換成 FILTER 后,如下所示:

SELECT

a,

COUNT(DISTINCT b) AS total_b,

COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,

COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b FROM T

GROUP BY a

6.5.2 提交案例:多維 Distinct

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SqlDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--demo dim-difcount

[外鏈圖片轉(zhuǎn)存中…(img-uOR0nHKf-1708782286124)]

6.5.3 提交案例:使用Filter

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Drest.flamegraph.enabled=true \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=1024mb \

-Dtaskmanager.memory.process.size=2048mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-c com.atguigu.flink.tuning.SqlDemo \

/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \

--demo dim-difcount-filter

[外鏈圖片轉(zhuǎn)存中…(img-4mCKlT7N-1708782286124)] 通過WebUI 對(duì)比前 10 次 Checkpoint 的大小,可以看到狀態(tài)有所減小。

6.6 設(shè)置參數(shù)總結(jié)

總結(jié)以上的調(diào)優(yōu)參數(shù),代碼如下:

// 初始化 table environment TableEnvironment tEnv = ...

// 獲取 tableEnv 的配置對(duì)象

Configuration configuration = tEnv.getConfig().getConfiguration();

// 設(shè)置參數(shù):

// 開啟 miniBatch

configuration.setString("table.exec.mini-batch.enabled", "true");

// 批量輸出的間隔時(shí)間

configuration.setString("table.exec.mini-batch.allow-latency", "5 s");

// 防止 OOM 設(shè)置每個(gè)批次最多緩存數(shù)據(jù)的條數(shù),可以設(shè)為 2 萬條

configuration.setString("table.exec.mini-batch.size", "20000");

// 開啟 LocalGlobal

configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

// 開啟 Split Distinct

configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

// 第一層打散的 bucket 數(shù)目

configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

// 指定時(shí)區(qū)

configuration.setString("table.local-time-zone", "Asia/Shanghai");

第7章 常見故障排除

7.1 非法配置異常

如果您看到從 TaskExecutorProcessUtils 或 JobManagerProcessUtils 拋出的IllegalConfigurationException,通常表明存在無效的配置值(例如負(fù)內(nèi)存大小、大于 1 的分?jǐn)?shù)等)或配置沖突。請(qǐng)重新配置內(nèi)存參數(shù)。

7.2 Java 堆空間異常

如果報(bào) **OutOfMemoryError: Javaheapspace **異常,通常表示 JVMHeap 太小??梢試L試通過增加總內(nèi)存來增加 JVM 堆大小。也可以直接為 TaskManager 增加任務(wù)堆內(nèi)存或?yàn)?JobManager 增加 JVM 堆內(nèi)存。 還可以為 TaskManagers 增加框架堆內(nèi)存,但只有在確定 Flink 框架本身需要更多內(nèi)存時(shí)才應(yīng)該更改此選項(xiàng)。

7.3 直接緩沖存儲(chǔ)器異常

如果報(bào) **OutOfMemoryError: Directbuffermemory **異常,通常表示 JVM 直接內(nèi)存限制太小或存在直接內(nèi)存泄漏。檢查用戶代碼或其他外部依賴項(xiàng)是否使用了 JVM 直接內(nèi)存,以及它是否被正確考慮??梢試L試通過調(diào)整直接堆外內(nèi)存來增加其限制??梢詤⒖既绾螢?TaskManagers、 JobManagers 和 Flink 設(shè)置的 JVM 參數(shù)配置堆外內(nèi)存。

7.4 元空間異常

如果報(bào) OutOfMemoryError: Metaspace異常,通常表示 JVM 元空間限制配置得 太小。您可以嘗試加大 JVM 元空間 TaskManagers 或JobManagers 選項(xiàng)。

7.5 網(wǎng)絡(luò)緩沖區(qū)數(shù)量不足

如果報(bào) IOException:Insufficientnumberofnetworkbuffers異常,這僅與TaskManager 相關(guān)。通常表示配置的網(wǎng)絡(luò)內(nèi)存大小不夠大。您可以嘗試增加網(wǎng)絡(luò)內(nèi)存。

7.6 超出容器內(nèi)存異常

如果 Flink 容器嘗試分配超出其請(qǐng)求大?。╕arn 或 Kubernetes)的內(nèi)存,這通常表明 Flink 沒有預(yù)留足夠的本機(jī)內(nèi)存。當(dāng)容器被部署環(huán)境殺死時(shí),可以通過使用外部監(jiān)控系統(tǒng)或從錯(cuò)誤消息中觀察到這一點(diǎn)。 如果在 JobManager 進(jìn)程中遇到這個(gè)問題,還可以通過設(shè)置排除可能的 JVMDirect Memory 泄漏的選項(xiàng)來開啟 JVMDirectMemory 的限制: jobmanager.memory.enable-jvm-direct-memory-limit:true 如果想手動(dòng)多分一部分內(nèi)存給 RocksDB 來防止超用,預(yù)防在云原生的環(huán)境因 OOM 被 K8Skill,可將 JVMOverHead 內(nèi)存調(diào)大。 之所以不調(diào)大 Task Off-Heap,是由于目前 Task Off-Heap 是和 Direct Memeory混在一起的,即使調(diào)大整體,也并不一定會(huì)分給 RocksDB 來做 Buffer,所以我們推薦通過調(diào)整 JVM OverHead 來解決內(nèi)存超用的問題。

7.7 Checkpoint 失敗

Checkpoint 失敗大致分為兩種情況:CheckpointDecline 和 Checkpoint Expire。

7.7.1 CheckpointDecline

我們能從 jobmanager.log 中看到類似下面的日志:

Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

我們可以在 jobmanager.log 中查找 executionid,找到被調(diào)度到哪個(gè) taskmanager 上,類似如下所示:

2019-09-02 16:26:20,972 INFO [ jobmanager-future-thread-61]

org.apache.flink.runtime.executiongraph.ExecutionGraph - XXXXXXXXXXX

(100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to DEPLOYING.

2019-09-02 16:26:20,972 INFO [ jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot

container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

從 上 面 的 日 志 我 們 知 道 該 execution 被 調(diào) 度到 hostnameABCDE container_e24_1566836790522_8088_04_013155_1slot上,接下來我們就可以到container container_e24_1566836790522_8088_04_013155 的 taskmanager.log中查找Checkpoint失敗的具體原因了。 另外對(duì)于 CheckpointDecline 的情況,有一種情況在這里單獨(dú)抽取出來進(jìn)行介紹: CheckpointCancel。 當(dāng)前Flink 中如果較小的Checkpoint 還沒有對(duì)齊的情況下, 收到了更大的 Checkpoint,則會(huì)把較小的 Checkpoint 給取消掉。我們可以看到類似下面的日志:

$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

這個(gè)日志表示,當(dāng)前 Checkpoint19 還在對(duì)齊階段,我們收到了 Checkpoint20 的 barrier。然后會(huì)逐級(jí)通知到下游的 task checkpoint 19 被取消了,同時(shí)也會(huì)通知 JM 當(dāng)前 Checkpoint 被 decline 掉了。 在下游 task 收到被 cancelBarrier 的時(shí)候,會(huì)打印類似如下的日志:

DEBUG

$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.

或者

DEBUG

$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.

或者

WARN

$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

上面三種日志都表示當(dāng)前 task 接收到上游發(fā)送過來的 barrierCancel 消息,從而取消了對(duì)應(yīng)的 Checkpoint。

7.7.2 CheckpointExpire

如果 Checkpoint 做的非常慢,超過了 timeout 還沒有完成,則整個(gè) Checkpoint也會(huì)失敗。當(dāng)一個(gè) Checkpoint 由于超時(shí)而失敗是,會(huì)在 jobmanager.log 中看到如下的日志:

Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178

completing.

表示 Chekpoint1 由于超時(shí)而失敗,這個(gè)時(shí)候可以可以看這個(gè)日志后面是否有類似下面的日志:

Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

可以按照 7.7.1 中的方法找到對(duì)應(yīng)的 taskmanager.log 查看具體信息。 我們按照下面的日志把 TM 端的 snapshot 分為三個(gè)階段:開始做 snapshot 前,同步階段,異步階段,需要開啟 DEBUG 才能看到: DEBUG Startingcheckpoint(6751)CHECKPOINTontasktaskNameWithSubtasks(4/4) 上面的日志表示 TM 端 barrier 對(duì)齊后,準(zhǔn)備開始做 Checkpoint。 [外鏈圖片轉(zhuǎn)存中…(img-jmh5pWT8-1708782286124)] 上面的日志表示當(dāng)前這個(gè) backend 的同步階段完成,共使用了 0ms。 [外鏈圖片轉(zhuǎn)存中…(img-EdNhfdtG-1708782286124)] 上面的日志表示異步階段完成,異步階段使用了 369ms 在現(xiàn)有的日志情況下,我們通過上面三個(gè)日志,定位 snapshot 是開始晚,同步階段做的慢,還是異步階段做的慢。然后再按照情況繼續(xù)進(jìn)一步排查問題。

7.8 Checkpoint 慢

Checkpoint 慢的情況如下:比如 Checkpoint interval 1 分鐘,超時(shí) 10 分鐘, Checkpoint 經(jīng)常需要做 9 分鐘(我們希望 1 分鐘左右就能夠做完),而且我們預(yù)期 state size 不是非常大。 對(duì)于 Checkpoint 慢的情況,我們可以按照下面的順序逐一檢查。

1)SourceTriggerCheckpoint 慢

這個(gè)一般發(fā)生較少,但是也有可能,因?yàn)?source 做 snapshot 并往下游發(fā)送 barrier 的時(shí)候,需要搶鎖(Flink1.10 開始,用 mailBox 的方式替代當(dāng)前搶鎖的方式,詳情參考 https://issues.apache.org/jira/browse/FLINK-12477)。如果一直搶不到鎖的話,則可能 導(dǎo)致 Checkpoint 一直得不到機(jī)會(huì)進(jìn)行。如果在 Source 所在的 taskmanager.log 中找不到開始做 Checkpoint 的 log,則可以考慮是否屬于這種情況,可以通過 jstack 進(jìn)行進(jìn)一步確認(rèn)鎖的持有情況。

2)使用增量 Checkpoint

現(xiàn)在 Flink 中 Checkpoint 有兩種模式,全量 Checkpoint 和 增量 Checkpoint,其中全量 Checkpoint 會(huì)把當(dāng)前的 state 全部備份一次到持久化存儲(chǔ), 而增量 Checkpoint,則只備份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上傳的內(nèi)容會(huì)相對(duì)更好,在速度上會(huì)有更大的優(yōu)勢(shì)。 現(xiàn)在 Flink 中僅在 RocksDBStateBackend 中支持增量 Checkpoint,如果你已經(jīng)使用 RocksDBStateBackend,可以通過開啟增量 Checkpoint 來加速。

3)作業(yè)存在反壓或者數(shù)據(jù)傾斜

task 僅在接受到所有的 barrier 之后才會(huì)進(jìn)行 snapshot,如果作業(yè)存在反壓,或者有數(shù)據(jù)傾斜,則會(huì)導(dǎo)致全部的 channel 或者某些 channel 的 barrier 發(fā)送慢,從而整體影響 Checkpoint 的時(shí)間。

4)Barrier 對(duì)齊慢

從前面我們知道 Checkpoint 在 task 端分為 barrier 對(duì)齊(收齊所有上游發(fā)送過來的 barrier),然后開始同步階段,再做異步階段。如果 barrier 一直對(duì)不齊的話,就不會(huì)開始做 snapshot。 barrier 對(duì)齊之后會(huì)有如下日志打印: [外鏈圖片轉(zhuǎn)存中…(img-tzW5e2bS-1708782286124)] 如果 taskmanager.log 中沒有這個(gè)日志,則表示 barrier 一直沒有對(duì)齊,接下來我們需要了解哪些上游的 barrier 沒有發(fā)送下來,如果你使用 AtLeastOnce 的話,可以觀察下面的日志: [外鏈圖片轉(zhuǎn)存中…(img-AfSJkxsb-1708782286124)] 表示該 task 收到了 channel5 來的 barrier,然后看對(duì)應(yīng) Checkpoint,再查看還剩哪些上游的 barrier 沒有接受到。

5)主線程太忙,導(dǎo)致沒機(jī)會(huì)做 snapshot

在 task 端,所有的處理都是單線程的,數(shù)據(jù)處理和 barrier 處理都由主線程處理,如果主線程在處理太慢(比如使用 RocksDBBackend,state 操作慢導(dǎo)致整體處理慢),導(dǎo)致 barrier 處理的慢,也會(huì)影響整體 Checkpoint 的進(jìn)度,可以通過火焰圖分析。

6)同步階段做的慢

同步階段一般不會(huì)太慢,但是如果我們通過日志發(fā)現(xiàn)同步階段比較慢的話,對(duì)于非 RocksDBBackend 我們可以考慮查看是否開啟了異步 snapshot , 如果開啟了異步 snapshot 還是慢,需要看整個(gè) JVM 在干嘛, 也可以使用火焰圖分析。對(duì)于 RocksDBBackend 來說,我們可以用 iostate 查看磁盤的壓力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的時(shí)間總共開銷多少。 RocksDB 開始 snapshot 的日志如下: [外鏈圖片轉(zhuǎn)存中…(img-udEbDqeA-1708782286124)] snapshot 結(jié)束的日志如下: [外鏈圖片轉(zhuǎn)存中…(img-Zp5ouTbK-1708782286125)]

6)異步階段做的慢

對(duì)于異步階段來說, tm 端主要將 state 備份到持久化存儲(chǔ)上, 對(duì)于非 RocksDBBackend 來說,主要瓶頸來自于網(wǎng)絡(luò),這個(gè)階段可以考慮觀察網(wǎng)絡(luò)的 metric,或者對(duì)應(yīng)機(jī)器上能夠觀察到網(wǎng)絡(luò)流量的情況(比如 iftop)。 對(duì)于 RocksDB 來說,則需要從本地讀取文件,寫入到遠(yuǎn)程的持久化存儲(chǔ)上,所以不僅需要考慮網(wǎng)絡(luò)的瓶頸,還需要考慮本地磁盤的性能。另外對(duì)于 RocksDBBackend 來說,如果覺得網(wǎng)絡(luò)流量不是瓶頸,但是上傳比較慢的話,還可以嘗試考慮開啟多線程上傳功能 (Flink1.13 開始,state.backend.rocksdb.checkpoint.transfer.thread.num 默認(rèn)值是 4)。

7.9 Kafka 動(dòng)態(tài)發(fā)現(xiàn)分區(qū)

當(dāng) FlinkKafkaConsumer 初始化時(shí),每個(gè) subtask 會(huì)訂閱一批 partition,但是當(dāng) Flink 任務(wù)運(yùn)行過程中,如果被訂閱的 topic 創(chuàng)建了新的 partition,F(xiàn)linkKafkaConsumer如何實(shí)現(xiàn)動(dòng)態(tài)發(fā)現(xiàn)新創(chuàng)建的 partition 并消費(fèi)呢? 在使用 FlinkKafkaConsumer 時(shí),可以開啟 partition 的動(dòng)態(tài)發(fā)現(xiàn)。通過 Properties 指定參數(shù)開啟(單位是毫秒): FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 該參數(shù)表示間隔多久檢測(cè)一次是否有新創(chuàng)建的 partition。默認(rèn)值是 Long 的最小值,表示不開啟,大于 0 表示開啟。開啟時(shí)會(huì)啟動(dòng)一個(gè)線程根據(jù)傳入的 interval 定期獲取Kafka最新的元數(shù)據(jù),新 partition 對(duì)應(yīng)的那一個(gè) subtask 會(huì)自動(dòng)發(fā)現(xiàn)并從earliest 位置開始消費(fèi),新創(chuàng)建的 partition 對(duì)其他 subtask 并不會(huì)產(chǎn)生影響。 代碼如下所示:

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVA L_MILLIS, 30 * 1000 + "");

7.10 Watermark 不更新

如果數(shù)據(jù)源中的某一個(gè)分區(qū)/ 分片在一段時(shí)間內(nèi)未發(fā)送事件數(shù)據(jù), 則意味著 WatermarkGenerator 也不會(huì)獲得任何新數(shù)據(jù)去生成 watermark。我們稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時(shí)候就會(huì)出現(xiàn)問題。比如Kafka 的 Topic 中,由于某些原因,造成個(gè)別Partition 一直沒有新的數(shù)據(jù)。由于下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會(huì)發(fā)生變化,導(dǎo)致窗口、定時(shí)器等不會(huì)被觸發(fā)。 為了解決這個(gè)問題,你可以使用 WatermarkStrategy 來檢測(cè)空閑輸入并將其標(biāo)記為空閑狀態(tài)。

ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties=new Properties();

properties.setProperty("bootstrap.servers","hadoop1:9092,hadoop2:9092,hadoop3:9092");

properties.setProperty("group.id","fffffffffff");

FlinkKafkaConsumer kafkaSourceFunction=new FlinkKafkaConsumer<>(

"flinktest",

new SimpleStringSchema(),properties

)

kafkaSourceFunction.assignTimestampsAndWatermarks(

WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofMinutes(2))

.withIdleness(Duration.ofMinutes(5))

);

env.addSource(kafkaSourceFunction

7.11 依賴沖突

ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...

一般都是因?yàn)橛脩粢蕾嚨谌桨陌姹九c Flink 框架依賴的版本有沖突導(dǎo)致。根據(jù)報(bào)錯(cuò)信息中的類名,定位到?jīng)_突的jar 包,idea 可以借助mavenhelper 插件查找沖突的有哪些。打包插件建議使用maven-shade-plugin。

7.12 超出文件描述符限制

java.io.IOException: Too many open files

首先檢查 Linux 系統(tǒng) ulimit -n 的文件描述符限制,再注意檢查程序內(nèi)是否有資源(如各種連接池的連接)未及時(shí)釋放。值得注意的是,低版本 Flink 使用 RocksDB 狀態(tài)后端也有 可 能 會(huì) 拋 出 這 個(gè) 異 常 , 此 時(shí) 需 修 改 flink-conf.yaml 中 的state.backend.rocksdb.files.open 參數(shù),如果不限制,可以改為-1(1.13 默認(rèn)就是-1)。

7.13 臟數(shù)據(jù)導(dǎo)致數(shù)據(jù)轉(zhuǎn)發(fā)失敗

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

該異常幾乎都是由于程序業(yè)務(wù)邏輯有誤,或者數(shù)據(jù)流里存在未處理好的臟數(shù)據(jù)導(dǎo)致的,繼續(xù)向下追溯異常棧一般就可以看到具體的出錯(cuò)原因,比較常見的如 POJO 內(nèi)有空字段,或者抽取事件時(shí)間的時(shí)間戳為 null 等。

7.14 通訊超時(shí)

akka.pattern.AskTimeoutException:Asktimedouton[Actor[akka://…]]after[10000ms] Akka 超時(shí)導(dǎo)致,一般有兩種原因:一是集群負(fù)載比較大或者網(wǎng)絡(luò)比較擁塞,二是業(yè)務(wù) 邏輯同步調(diào)用耗時(shí)的外部服務(wù)。如果負(fù)載或網(wǎng)絡(luò)問題無法徹底緩解, 需考慮調(diào)大 akka.ask.timeout 參數(shù)的值(默認(rèn)只有 10 秒);另外,調(diào)用外部服務(wù)時(shí)盡量異步操作(Async I/O)。

7.15 FlinkonYarn 其他常見錯(cuò)誤

https://developer.aliyun.com/article/719703

備注:本文來自于尚硅谷,若侵權(quán),請(qǐng)聯(lián)系作者刪除~

柚子快報(bào)邀請(qǐng)碼778899分享:大數(shù)據(jù)之Flink優(yōu)化

http://yzkb.51969.com/

相關(guān)鏈接

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19244107.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄