欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:【大數(shù)據(jù)專題】Flink題庫

柚子快報激活碼778899分享:【大數(shù)據(jù)專題】Flink題庫

http://yzkb.51969.com/

1 . 簡述什么是Apache Flink ?

Apache Flink 是一個開源的基于流的有狀態(tài)計算框架。它是分布式地執(zhí)行的,具備低延遲、高吞吐的優(yōu)秀性能,并且非常擅長處理有狀態(tài)的復(fù)雜計算邏輯場景

2 . 簡述Flink 的核心概念 ?

Flink 的核心概念主要有四個:Event Streams、State、Time 和 Snapshots。

(1)Event Streams:即事件流,事件流可以是實時的也可以是歷史的。Flink 是基于流的,但它不止能處理流,也能處理批,而流和批的輸入都是事件流,差別在于實時與批量。 (2)State:Flink 擅長處理有狀態(tài)的計算。通常的復(fù)雜業(yè)務(wù)邏輯都是有狀態(tài)的,它不僅要處理單一的事件,而且需要記錄一系列歷史的信息,然后進行計算或者判斷。 (3)Time:最主要處理的問題是數(shù)據(jù)亂序的時候,一致性如何保證。 (4)Snapshots:實現(xiàn)了數(shù)據(jù)的快照、故障的恢復(fù),保證數(shù)據(jù)一致性和作業(yè)的升級遷移等。

3 . 簡述Flink運行時的架構(gòu)組件 ?

Flink運行時架構(gòu)主要包括四個不同的組件,它們會在運行流處理應(yīng)用程序時協(xié)同工作:作業(yè)管理器 (JobManager)、資源管理器(ResourceManager)、任務(wù)管理器(TaskManager),以及分發(fā)器 (Dispatcher)。因為Flink是用Java和Scala實現(xiàn)的,所以所有組件都會運行在Java虛擬機上。每個組件的職責(zé)如下: 1)作業(yè)管理器(JobManager) 控制一個應(yīng)用程序執(zhí)行的主進程,也就是說,每個應(yīng)用程序都會被一個不同的JobManager所控制執(zhí)行。 JobManager會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括:作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖 (logical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。JobManager會把JobGraph轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖,這個圖被叫做“執(zhí)行圖”(ExecutionGraph),包含了所有可以并發(fā)執(zhí)行的任 務(wù)。JobManager會向資源管理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源,也就是任務(wù)管理器 (TaskManager)上的插槽(slot)。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運行它們的 TaskManager上。而在運行過程中,JobManager會負責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點 (checkpoints)的協(xié)調(diào)。 2)資源管理器(ResourceManager) 主要負責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定義的處理資源單 元。Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARN、Mesos、K8s,以及 standalone部署。當(dāng)JobManager申請插槽資源時,ResourceManager會將有空閑插槽的TaskManager分配 給JobManager。如果ResourceManager沒有足夠的插槽來滿足JobManager的請求,它還可以向資源提供 平臺發(fā)起會話,以提供啟動TaskManager進程的容器。另外,ResourceManager還負責(zé)終止空閑的TaskManager,釋放計算資源。 3)任務(wù)管理器(TaskManager) Flink中的工作進程。通常在Flink中會有多個TaskManager運行,每一個TaskManager都包含了一定數(shù)量的 插槽(slots)。插槽的數(shù)量限制了TaskManager能夠執(zhí)行的任務(wù)數(shù)量。啟動之后,TaskManager會向資源 管理器注冊它的插槽;收到資源管理器的指令后,TaskManager就會將一個或者多個插槽提供給 JobManager調(diào)用。JobManager就可以向插槽分配任務(wù)(tasks)來執(zhí)行了。在執(zhí)行過程中,一個 TaskManager可以跟其它運行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)。 4)分發(fā)器(Dispatcher) 可以跨作業(yè)運行,它為應(yīng)用提交提供了REST接口。當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用 移交給一個JobManager。由于是REST接口,所以Dispatcher可以作為集群的一個HTTP接入點,這樣就能 夠不受防火墻阻擋。Dispatcher也會啟動一個Web UI,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息。 Dispatcher在架構(gòu)中可能并不是必需的,這取決于應(yīng)用提交運行的方式。

4 . 簡述Flink任務(wù)提交流程 ?

作業(yè)提交流程 (1) 一般情況下,由客戶端(App)通過分發(fā)器提供的 REST 接口,將作業(yè)提交給JobManager。 (2)由分發(fā)器啟動 JobMaster,并將作業(yè)(包含 JobGraph)提交給 JobMaster。 (3)JobMaster 將 JobGraph 解析為可執(zhí)行的 ExecutionGraph,得到所需的資源數(shù)量,然后向資源管理器請求資源(slots) (4)資源管理器判斷當(dāng)前是否有足夠的可用資源;如果沒有,啟動新的 TaskManager。 (5)TaskManager 啟動之后,向 ResourceManager 注冊自己的可用任務(wù)槽(slots)。 (6)資源管理器通知 TaskManager 為新的作業(yè)提供 slots。 (7)TaskManager 連接到對應(yīng)的 JobMaster,提供 slots。 (8)JobMaster 將需要執(zhí)行的任務(wù)分發(fā)給 TaskManager。 (9)TaskManager 執(zhí)行任務(wù),互相之間可以交換數(shù)據(jù)。 獨立模式 獨立模式下,由于TaskManager 是手動啟動的,所以當(dāng) ResourceManager 收到 JobMaster 的請求時,會直接要求 TaskManager 提供資源,因此第(4)步與提交流程不同,不會啟動新的TaskManager YARN模式 會話模式 在會話模式下,YARN session創(chuàng)建Flink集群 作業(yè)提交流程如下: (1)客戶端通過 REST 接口,將作業(yè)提交給分發(fā)器。 (2)分發(fā)器啟動 JobMaster,并將作業(yè)(包含 JobGraph)提交給 JobMaster。 (3)JobMaster 向資源管理器請求資源(slots)。 (4)資源管理器向 YARN 的資源管理器請求 container 資源。 (5)YARN 啟動新的 TaskManager 容器。 (6)TaskManager 啟動之后,向 Flink 的資源管理器注冊自己的可用任務(wù)槽。 (7)資源管理器通知 TaskManager 為新的作業(yè)提供 slots。 (8)TaskManager 連接到對應(yīng)的 JobMaster,提供 slots。 (9)JobMaster 將需要執(zhí)行的任務(wù)分發(fā)給 TaskManager,執(zhí)行任務(wù)。 可以看到在YARN的session模式下,請求資源時要“上報”YARN 的資源管理器 單作業(yè)模式 (1)客戶端將作業(yè)提交給 YARN 的資源管理器,這一步中會同時將 Flink 的 Jar 包和配置上傳到 HDFS,以便后續(xù)啟動 Flink 相關(guān)組件的容器。 (2)YARN 的資源管理器分配 Container 資源,啟動 Flink JobManager,并將作業(yè)提交給JobMaster。這里省略了 Dispatcher 組件。 (3)JobMaster 向資源管理器請求資源(slots)。 (4)資源管理器向 YARN 的資源管理器請求 container 資源。 (5)YARN 啟動新的 TaskManager 容器。 (6)TaskManager 啟動之后,向 Flink 的資源管理器注冊自己的可用任務(wù)槽。 (7)資源管理器通知 TaskManager 為新的作業(yè)提供 slots。 (8)TaskManager 連接到對應(yīng)的 JobMaster,提供 slots。 (9)JobMaster 將需要執(zhí)行的任務(wù)分發(fā)給 TaskManager,執(zhí)行任務(wù)。 可見,區(qū)別只在于 JobManager 的啟動方式,以及省去了分發(fā)器。當(dāng)?shù)?2 步作業(yè)提交給JobMaster,之后的流程就與會話模式完全一樣了 應(yīng)用模式 應(yīng)用模式與單作業(yè)模式的區(qū)別在于:初始時提交給YARN資源管理器的不是一個作業(yè),而是一個應(yīng)用,應(yīng)用中可以包含多個作業(yè),每個作業(yè)都會啟動相應(yīng)的JobMaster;

5 . 簡述Flink的窗口了解哪些,都有什么區(qū)別,有哪幾種?如何定義 ?

1、Window概述 streaming流式計算是一種被設(shè)計用于處理無限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無限數(shù)據(jù)集是指一種不斷增長 的本質(zhì)上無限的數(shù)據(jù)集,而window是一種切割無限數(shù)據(jù)為有限塊進行處理的手段。 Window是無限數(shù)據(jù)流處理的核心,Window將一個無限的stream拆分成有限大小的“buckets”桶,我們可 以在這些桶上做計算操作。

2、Window類型 Window可以分成兩類: CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個Window,與時間無關(guān)。 TimeWindow:按照時間生成Window。 對于TimeWindow,可以根據(jù)窗口實現(xiàn)原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口 (Sliding Window)和會話窗口(Session Window)。 1)滾動窗口(Tumbling Windows) 將數(shù)據(jù)依據(jù)固定的窗口長度對數(shù)據(jù)進行切片。 特點:時間對齊,窗口長度固定,沒有重疊。 滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,并且不會 出現(xiàn)重疊。 例如:如果你指定了一個5分鐘大小的滾動窗口,窗口的創(chuàng)建如下圖所示:

適用場景:適合做BI統(tǒng)計等(做每個時間段的聚合計算)。 2)滑動窗口(Sliding Windows) 滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。 特點:時間對齊,窗口長度固定,可以有重疊。 滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數(shù)來配 置,另一個窗口滑動參數(shù)控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數(shù)小于窗口大小的話, 窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。 例如,你有10分鐘的窗口和5分鐘的滑動,那么每個窗口中5分鐘的窗口里包含著上個10分鐘產(chǎn)生的數(shù)據(jù)

適用場景:對最近一個時間段內(nèi)的統(tǒng)計(求某接口最近5min的失敗率來決定是否要報警)。 3)會話窗口(Session Window) 由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應(yīng)用的session,也就是一段時間沒 有接收到新數(shù)據(jù)就會生成新的窗口。 特點:時間無對齊。 session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有 重疊和固定的開始時間和結(jié)束時間的情況,相反,當(dāng)它在一個固定的時間周期內(nèi)不再收到元素,即非活 動間隔產(chǎn)生,那個這個窗口就會關(guān)閉。一個session窗口通過一個session間隔來配置,這個session間隔定 義了非活躍周期的長度,當(dāng)這個非活躍周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到 新的session窗口中去。

3、Window API 1)TimeWindow TimeWindow是將指定時間范圍內(nèi)的所有數(shù)據(jù)組成一個window,一次對一個window里面的所有數(shù)據(jù)進行 計算。 (1)滾動窗口 Flink 默認的時間窗口根據(jù) Processing Time 進行窗口的劃分,將 Flink 獲取到的數(shù)據(jù)根據(jù)進入 Flink 的時間劃分到不同的窗口中。

時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。 (2)滑動窗口 滑動窗口和滾動窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時需要傳入兩個參數(shù),一個是 window_size,一個是 sliding_size。 下面代碼中的 sliding_size 設(shè)置為了 5s,也就是說,窗口每 5s 就計算一次,每一次計算的 window 范圍是 15s 內(nèi)的所有元素。

時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。2)CountWindow CountWindow根據(jù)窗口中相同 key 元素的數(shù)量來觸發(fā)執(zhí)行,執(zhí)行時只計算元素數(shù)量達到窗口大小的 key 對應(yīng)的結(jié)果。 注意:CountWindow 的 window_size 指的是相同 Key 的元素的個數(shù),不是輸入的所有元素的總數(shù)。 (1)滾動窗口 默認的 CountWindow 是一個滾動窗口,只需要指定窗口大小即可,當(dāng)元素數(shù)量達到窗口大小時,就會觸發(fā)窗口的執(zhí)行。

(2)滑動窗口 滑動窗口和滾動窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時需要傳入兩個參數(shù),一個是 window_size,一個是 sliding_size。 下面代碼中的 sliding_size 設(shè)置為了 2,也就是說,每收到兩個相同 key 的數(shù)據(jù)就計算一次,每一次計算的 window 范圍是 5 個元素。

4、窗口函數(shù) window function 定義了要對窗口中收集的數(shù)據(jù)做的計算操作,主要可以分為兩類: 增量聚合函數(shù)(incremental aggregation functions) 每條數(shù)據(jù)到來就進行計算,保持一個簡單的狀態(tài)。典型的增量聚合函數(shù)有ReduceFunction, AggregateFunction。 全窗口函數(shù)(full window functions) 先把窗口所有數(shù)據(jù)收集起來,等到計算的時候會遍歷所有數(shù)據(jù)。ProcessWindowFunction 就是一個全窗口函數(shù)。 5、其它可選API trigger() —— 觸發(fā)器 定義 window 什么時候關(guān)閉,觸發(fā)計算并輸出結(jié)果 evitor() —— 移除器 定義移除某些數(shù)據(jù)的邏輯 allowedLateness() —— 允許處理遲到的數(shù)據(jù)sideOutputLateData() —— 將遲到的數(shù)據(jù)放入側(cè)輸出流getSideOutput() —— 獲取側(cè)輸出流

6 . 簡述Flink 的容錯機制(checkpoint) ?

Checkpoint容錯機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現(xiàn)故障時,能夠?qū)⒄麄€應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保證應(yīng)用流圖狀態(tài)的一致性。Flink的Checkpoint機制原理來自“Chandy-Lamport algorithm”算法。

每個需要Checkpoint的應(yīng)用在啟動時,F(xiàn)link的JobManager為其創(chuàng)建一個 CheckpointCoordinator(檢查點協(xié)調(diào)器),CheckpointCoordinator全權(quán)負責(zé)本應(yīng)用的快照制作。

CheckpointCoordinator(檢查點協(xié)調(diào)器),CheckpointCoordinator全權(quán)負責(zé)本應(yīng)用的快照制作。

CheckpointCoordinator(檢查點協(xié)調(diào)器) 周期性的向該流應(yīng)用的所有source算子發(fā)送 barrier(屏障)。 當(dāng)某個source算子收到一個barrier時,便暫停數(shù)據(jù)處理過程,然后將自己的當(dāng)前狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自己快照制作情況,同時向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理 下游算子收到barrier之后,會暫停自己的數(shù)據(jù)處理過程,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自身快照情況,同時向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理。 每個算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。 當(dāng)CheckpointCoordinator收到所有算子的報告之后,認為該周期的快照制作成功; 否則,如果在規(guī)定的時間內(nèi)沒有收到所有算子的報告,則認為本周期快照制作失敗

7 . 簡述checkpoint機制詳細 ?

1、窗口函數(shù)(window function) window function 定義了要對窗口中收集的數(shù)據(jù)做的計算操作,主要可以分為兩類: 增量聚合函數(shù)(incremental aggregation functions) 每條數(shù)據(jù)到來就進行計算,保持一個簡單的狀態(tài)。典型的增量聚合函數(shù)有ReduceFunction, AggregateFunction。 全窗口函數(shù)(full window functions) 先把窗口所有數(shù)據(jù)收集起來,等到計算的時候會遍歷所有數(shù)據(jù)。ProcessWindowFunction 就是一個全窗口函數(shù)。 2、時間語義

在Flink的流式處理中,會涉及到時間的不同概念 Event Time:是事件創(chuàng)建的時間。它通常由事件中的時間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會記錄自己的生成時間,F(xiàn)link通過時間戳分配器訪問事件時間戳。 Ingestion Time:是數(shù)據(jù)進入Flink的時間。 Processing Time:是每一個執(zhí)行基于時間操作的算子的本地系統(tǒng)時間,與機器相關(guān),默認的時間屬性就是Processing Time。 一個例子——電影《星球大戰(zhàn)》:

例如,一條日志進入Flink的時間為2017-11-12 10:00:00.123,到達Window的系統(tǒng)時間為2017-11-12 10:00:01.234,日志的內(nèi)容如下: 2017-11-02 18:37:15.624 INFO Fail over to rm2 對于業(yè)務(wù)來說,要統(tǒng)計1min內(nèi)的故障日志個數(shù),哪個時間是最有意義的?—— eventTime,因為我們要根據(jù)日志的生成時間進行統(tǒng)計。 1)EventTime的引入 在Flink的流式處理中,絕大部分的業(yè)務(wù)都會使用eventTime,一般只在eventTime無法使用時,才會被迫 使用ProcessingTime或者IngestionTime。 如果要使用EventTime,那么需要引入EventTime的時間屬性,引入方式如下所示 1 val env = StreamExecutionEnvironment.getExecutionEnvironment 2 // 從調(diào)用時刻開始給env創(chuàng)建的每一個stream追加時間特征 3 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

9 . 簡述介紹下Flink的watermark(水位線),watermark需要實現(xiàn)哪個實現(xiàn)類, 在何處定義?有什么作用 ?

1、Watermark介紹及作用 我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的,雖然大部 分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等 原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。

那么此時出現(xiàn)一個問題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運行,我們不能明確數(shù)據(jù) 是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發(fā) window去進行計算了,這個特別的機制,就是Watermark。 Watermark是一種衡量Event Time進展的機制。 Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結(jié)合window來 實現(xiàn)。 數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達了,因此,window 的執(zhí)行也是由Watermark觸發(fā)的。 Watermark可以理解成一個延遲觸發(fā)機制,我們可以設(shè)置Watermark的延時時長t,每次系統(tǒng)會校驗 已經(jīng)到達的數(shù)據(jù)中最大的maxEventTime,然后認定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達,如果有窗口的停止時間等于maxEventTime – t,那么這個窗口被觸發(fā)執(zhí)行。

當(dāng)Flink接收到數(shù)據(jù)時,會按照一定的規(guī)則去生成Watermark,這條Watermark就等于當(dāng)前所有到達數(shù)據(jù)中 的maxEventTime - 延遲時長,也就是說,Watermark是基于數(shù)據(jù)攜帶的時間戳生成的,一旦Watermark比當(dāng)前未觸發(fā)的窗口的停止時間要晚,那么就會觸發(fā)相應(yīng)窗口的執(zhí)行。由于event time是由數(shù)據(jù)攜帶的, 因此,如果運行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠都不被觸發(fā)。 上圖中,我們設(shè)置的允許最大延遲到達時間為2s,所以時間戳為7s的事件對應(yīng)的Watermark是5s,時間 戳為12s的事件的Watermark是10s,如果我們的窗口1是1s5s,窗口2是6s10s,那么時間戳為7s的事件到達 時的Watermarker恰好觸發(fā)窗口1,時間戳為12s的事件到達時的Watermark恰好觸發(fā)窗口2。 Watermark 就是觸發(fā)前一窗口的“關(guān)窗時間”,一旦觸發(fā)關(guān)門那么以當(dāng)前時刻為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會收入窗中。 只要沒有達到水位那么不管現(xiàn)實中的時間推進了多久都不會觸發(fā)關(guān)窗。 2、Watermark的使用 Watermark的兩種生成方式 1) SourceFunction中產(chǎn)生,將Timestamp的分配(也就是上文提到的離散化)和watermark的生成放在上 游,同時sourceFunction中也有兩個方法生成watermark 通過collectwithTimestamp方法發(fā)送數(shù)據(jù),和調(diào)用emitWatermark產(chǎn)生watermark,我們可以看到,調(diào)用 collectwithTimestamp需要傳入兩個參數(shù),第一個參數(shù)就是數(shù)據(jù),第二次參數(shù)就是數(shù)據(jù)對應(yīng)的時間戳,這 樣就完成了timestamp的分配,調(diào)用emitWatermark生成watermark。 override def run(ctx: SourceContext[MyType]): Unit = { while (/* condition */) { val next: MyType = getNext() ctx.collectWithTimestamp(next if (next.hasWatermarkTime) { ctx.emitWatermark(new Watermark(next.getWatermarkTime)) } } }

2) DataStream API指定,調(diào)用assignTimestampsAndWatermarks方法,用于某些sourceFunction不支持的情況,它能夠接收不同的timestamp和watermark生成器,說白了就是函數(shù)里面參數(shù)不同。 定期生成:

val resultData = logData.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Long, String, Long)] { val maxOutOfOrderness = 10000L var currentMaxTimestamp: Long = _

override def getCurrentWatermark: Watermark = { new Watermark(currentMaxTimestamp - maxOutOfOrderness) } // 根據(jù)數(shù)據(jù)本身的 Event time 來獲取 override def extractTimestamp(element: (Long, String, Long), previousElementTimestamp: Long): Long = { val timestamp = element._1 currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) timestamp } })

標(biāo)記生成: class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] { // 1 min in ms val bound: Long = 60 * 1000 class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] { // 1 min in ms val bound: Long = 60 * 1000 } 區(qū)別:定期指的是定時調(diào)用邏輯生成watermark,而標(biāo)記不是根據(jù)時間,而是看到特殊記錄表示接下來 的數(shù)據(jù)可能發(fā)不過來了,分配timestamp 調(diào)用用戶實現(xiàn)的watermark方法。 建議:越靠近源端處理更容易進行判斷。

10 . 簡述Flink的窗口(實現(xiàn))機制 ?

1.窗口概述 在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個消息就處理一次,但有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個時間窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進行計算。所以窗口就算將無限數(shù)據(jù)切割成有限的“數(shù)據(jù)塊”進行處理。

流式計算是一種被設(shè)計用于處理無限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無限數(shù)據(jù)集是指一種不斷增長的本質(zhì)上無限的數(shù)據(jù)集,而Window窗口是一種切割無限數(shù)據(jù)為有限塊進行處理的手段。

在Flink中, 窗口(window)是處理無界流的核心,窗口把流切割成有限大小的多個"存儲桶"(bucket), 我們在這些桶上進行計算

2.窗口分類 窗口分為兩大類:

基于時間的窗口 時間窗口以時間點到來定義窗口的開始(start)和結(jié)束(end),所以截取出的就是某一時間段的數(shù)據(jù)。到達時間時,窗口不再收集數(shù)據(jù),觸發(fā)計算輸出結(jié)果,并將窗口關(guān)閉銷毀 窗口大小 = 結(jié)束時間 - 開始時間 基于元素個數(shù) 基于元素的個數(shù)來截取數(shù)據(jù),到達固定的個數(shù)時就觸發(fā)計算并關(guān)閉窗口 只需指定窗口大小,就可以把數(shù)據(jù)分配到對應(yīng)的窗口中

2-1.基于時間的窗口(時間驅(qū)動) 時間窗口包含一個開始時間戳和結(jié)束時間戳(前閉后開), 這兩個時間戳一起限制了窗口的尺寸。

在代碼中, Flink使用TimeWindow這個類來表示基于時間的窗口。這個類提供了key查詢開始時間戳和結(jié)束時間戳的方法,還提供了針對給定的窗口獲取它允許的最大時間戳的方法maxTimestamp() 時間窗口有分為滾動窗口,滑動窗口,會話窗口。

2-1-1.滾動窗口(Tumbling Windows) 滾動窗口有固定的大小, 窗口與窗口之間不會重疊也沒有縫隙。例如指定一個長度為5分鐘的滾動窗口,當(dāng)前窗口開始計算,每5分鐘啟動一個新的窗口。 滾動窗口能將數(shù)據(jù)流切分成不重疊的窗口,每一個事件只能屬于一個窗口。

tumbling-window:滾動窗口:size=slide,如:每隔10s統(tǒng)計最近10s的數(shù)據(jù)

代碼示例:實驗使用工具類BigdataUtil

package com.zenitera.bigdata.util; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List;

public class BigdataUtil { public static List toList(Iterable it) { List list = new ArrayList<>(); for (T t : it) { list.add(t); } return list; }

public static String toDateTime(long ts) { return new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(ts); } } 代碼示例:Time - Tumbling Windows

package com.zenitera.bigdata.window; import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;

import java.util.List;

/** ?Time - Tumbling Windows */ public class Flink01_Window_Time_01 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);

env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) );

}) .keyBy(WaterSensor::getId) // 定義一個長度為5的滾動窗口 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction() { //ProcessWindowFunction

@Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {

List list = BigdataUtil.toList(elements);

String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd());

out.collect("窗口: " + stt + " " + edt + “, key:” + key + " " + list);

} }) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

} }

/* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 p1,3,10 w1,5,20 w1,5,20 w1,5,20 w1,5,20

窗口: 2023-03-22 14:52:05 2023-03-22 14:52:10, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] 窗口: 2023-03-22 14:52:20 2023-03-22 14:52:25, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)] 窗口: 2023-03-22 14:52:25 2023-03-22 14:52:30, key:p1 [WaterSensor(id=p1, ts=3, vc=10)] 窗口: 2023-03-22 14:52:55 2023-03-22 14:53:00, key:w1 [WaterSensor(id=w1, ts=5, vc=20)] 窗口: 2023-03-22 14:53:00 2023-03-22 14:53:05, key:w1 [WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20)] */ 2-1-2.滑動窗口(Sliding Windows) 與滾動窗口一樣, 滑動窗口也是有固定的長度。另外一個參數(shù)我們叫滑動步長,用來控制滑動窗口啟動的頻率。

如果滑動步長小于窗口長度,滑動窗口會重疊, 這種情況下,一個元素可能會被分配到多個窗口中。

例如滑動窗口長度10分鐘,滑動步長5分鐘, 則每5分鐘會得到一個包含最近10分鐘的數(shù)據(jù)。 sliding-window:滑動窗口:size>slide,如:每隔5s統(tǒng)計最近10s的數(shù)據(jù) 代碼示例:Time - Sliding Windows package com.zenitera.bigdata.window;

import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;

import java.util.List;

/** ?Time - Sliding Windows */ public class Flink01_Window_Time_02 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);

env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) );

}) .keyBy(WaterSensor::getId) //定義一個滑動窗口: 長度是5s, 滑動是2秒 .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))) .process(new ProcessWindowFunction() { //ProcessWindowFunction

@Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {

List list = BigdataUtil.toList(elements);

String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd());

out.collect("窗口: " + stt + " " + edt + “, key:” + key + " " + list);

} }) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

} } /* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10

窗口: 2023-03-22 14:59:26 2023-03-22 14:59:31, key:a1 [WaterSensor(id=a1, ts=1, vc=3)] 窗口: 2023-03-22 14:59:28 2023-03-22 14:59:33, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] 窗口: 2023-03-22 14:59:30 2023-03-22 14:59:35, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] 窗口: 2023-03-22 14:59:32 2023-03-22 14:59:37, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] 窗口: 2023-03-22 14:59:38 2023-03-22 14:59:43, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)] 窗口: 2023-03-22 14:59:40 2023-03-22 14:59:45, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)] 窗口: 2023-03-22 14:59:42 2023-03-22 14:59:47, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)] 窗口: 2023-03-22 14:59:52 2023-03-22 14:59:57, key:p1 [WaterSensor(id=p1, ts=3, vc=10)] 窗口: 2023-03-22 14:59:54 2023-03-22 14:59:59, key:p1 [WaterSensor(id=p1, ts=3, vc=10)] 窗口: 2023-03-22 15:00:04 2023-03-22 15:00:09, key:p1 [WaterSensor(id=p1, ts=3, vc=10)] 窗口: 2023-03-22 15:00:06 2023-03-22 15:00:11, key:p1 [WaterSensor(id=p1, ts=3, vc=10)] 窗口: 2023-03-22 15:00:08 2023-03-22 15:00:13, key:p1 [WaterSensor(id=p1, ts=3, vc=10)] */ 2-1-3.會話窗口(Session Windows) 會話窗口分配器會根據(jù)活動的元素進行分組。會話窗口不會有重疊,與滾動窗口和滑動窗口相比,會話窗口也沒有固定的開啟和關(guān)閉時間。

如果會話窗口有一段時間沒有收到數(shù)據(jù),會話窗口會自動關(guān)閉,這段沒有收到數(shù)據(jù)的時間就是會話窗口的gap(間隔)。

我們可以配置靜態(tài)的gap,也可以通過一個gap extractor 函數(shù)來定義gap的長度。當(dāng)時間超過了這個gap,當(dāng)前的會話窗口就會關(guān)閉,后序的元素會被分配到一個新的會話窗口。

創(chuàng)建原理: 因為會話窗口沒有固定的開啟和關(guān)閉時間,所以會話窗口的創(chuàng)建和關(guān)閉與滾動,滑動窗口不同。在Flink內(nèi)部,每到達一個新的元素都會創(chuàng)建一個新的會話窗口,如果這些窗口彼此相距比較定義的gap小,則會對他們進行合并。為了能夠合并,會話窗口算子需要合并觸發(fā)器和合并窗口函數(shù): ReduceFunction, AggregateFunction, or ProcessWindowFunction

代碼示例:Time - Session Windows package com.zenitera.bigdata.window; import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;

import java.util.List;

/** ?Time - Session Windows */ public class Flink01_Window_Time_03 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);

env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) );

}) .keyBy(WaterSensor::getId) // 定義一個session窗口: gap是3s .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))) .process(new ProcessWindowFunction() {

@Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {

List list = BigdataUtil.toList(elements);

String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd());

out.collect("窗口: " + stt + " " + edt + “, key:” + key + " " + list);

} }) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

} } /* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10 p1,3,10

窗口: 2023-03-22 15:04:59 2023-03-22 15:05:04, key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] 窗口: 2023-03-22 15:05:07 2023-03-22 15:05:12, key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)] 窗口: 2023-03-22 15:05:16 2023-03-22 15:05:22, key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)] 窗口: 2023-03-22 15:05:23 2023-03-22 15:05:26, key:p1 [WaterSensor(id=p1, ts=3, vc=10)]

Process finished with exit code -1 */

2-2.基于元素個數(shù)的窗口(數(shù)據(jù)驅(qū)動) 按照指定的數(shù)據(jù)條數(shù)生成一個Window,與時間無關(guān) 2-2-1.滾動窗口 默認的CountWindow是一個滾動窗口,只需要指定窗口大小即可,當(dāng)元素數(shù)量達到窗口大小時,就會觸發(fā)窗口的執(zhí)行。

代碼示例:

package com.zenitera.bigdata.window;

import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector;

import java.util.List; /** ?基于元素個數(shù) - 滾動窗口 */ public class Flink02_Window_Count_01 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”);

return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) // 定義長度為3的基于個數(shù)的滾動窗口 .countWindow(3) .process(new ProcessWindowFunction() { @Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {

List list = BigdataUtil.toList(elements); out.collect(" key:" + key + " " + list);

} }) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

} }

/* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10 p1,3,10 p1,3,10 p1,3,10 w1,5,20 w1,5,20

key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)] key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)] key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)] */

2-2-2.滑動窗口 滑動窗口和滾動窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時需要傳入兩個參數(shù),一個是window_size,一個是sliding_size。下面代碼中的sliding_size設(shè)置為了2,也就是說,每收到兩個相同key的數(shù)據(jù)就計算一次,每一次計算的window范圍最多是3個元素

代碼示例:

package com.zenitera.bigdata.window;

import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector;

import java.util.List;

/** ?基于元素個數(shù) - 滑動窗口 */ public class Flink02_Window_Count_02 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);

env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”);

return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) // 定義長度為3(窗口內(nèi)元素的最大個數(shù)), 滑動步長為2的的基于個數(shù)的滑動窗口 .countWindow(3, 2) .process(new ProcessWindowFunction() { @Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {

List list = BigdataUtil.toList(elements); out.collect(" key:" + key + " " + list);

} }) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

} }

/* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10 p1,3,10 w1,5,20 w1,5,20 w2,6,22

key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] key:a1 [WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3), WaterSensor(id=a1, ts=1, vc=3)] key:u1 [WaterSensor(id=u1, ts=2, vc=4), WaterSensor(id=u1, ts=2, vc=4)] key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)] key:p1 [WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10), WaterSensor(id=p1, ts=3, vc=10)] key:w1 [WaterSensor(id=w1, ts=5, vc=20), WaterSensor(id=w1, ts=5, vc=20)] */

2-3.全局窗口(Global Windows)(自定義觸發(fā)器) 全局窗口分配器會分配相同key的所有元素進入同一個 Global window。這種窗口機制只有指定自定義的觸發(fā)器時才有用。否則不會做任何計算,因為這種窗口沒有能夠處理聚集在一起元素的結(jié)束點。

3.窗口函數(shù) 前面指定了窗口的分配器,接著我們需要來指定如何計算,這事由window function來負責(zé)。一旦窗口關(guān)閉,window function 去計算處理窗口中的每個元素。 window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一種。 ReduceFunction,AggregateFunction更加高效,原因就是Flink可以對到來的元素進行增量聚合。ProcessWindowFunction 可以得到一個包含這個窗口中所有元素的迭代器,以及這些元素所屬窗口的一些元數(shù)據(jù)信息。 ProcessWindowFunction不能被高效執(zhí)行的原因是Flink在執(zhí)行這個函數(shù)之前,需要在內(nèi)部緩存這個窗口上所有的元素。

3-1ProcessWindowFunction 代碼示例:

package com.zenitera.bigdata.window;

import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;

/** ?ProcessWindowFunction */ public class Flink03_Window_ProcessFunction { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);

env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”);

return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce( (ReduceFunction) (value1, value2) -> { value1.setVc(value1.getVc() + value2.getVc()); return value1; }, new ProcessWindowFunction() { @Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception { WaterSensor result = elements.iterator().next();

String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd());

out.collect(stt + " " + edt + " " + result); } } ) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

} } /* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10

2023-03-22 16:05:20 2023-03-22 16:05:25 WaterSensor(id=a1, ts=1, vc=6) 2023-03-22 16:05:25 2023-03-22 16:05:30 WaterSensor(id=a1, ts=1, vc=3) 2023-03-22 16:05:30 2023-03-22 16:05:35 WaterSensor(id=u1, ts=2, vc=12) 2023-03-22 16:05:40 2023-03-22 16:05:45 WaterSensor(id=p1, ts=3, vc=10) 2023-03-22 16:05:45 2023-03-22 16:05:50 WaterSensor(id=p1, ts=3, vc=20) */

3-2.ReduceFunction 代碼示例:

package com.zenitera.bigdata.window;

import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;

/** ?ReduceFunction */ public class Flink03_Window_ReduceFunction { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);

env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”);

return new WaterSensor( data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce( (ReduceFunction) (value1, value2) -> { value1.setVc(value1.getVc() + value2.getVc()); return value1; }, new ProcessWindowFunction() { @Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception { WaterSensor result = elements.iterator().next();

String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd());

out.collect(stt + " " + edt + " " + result); } } ) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

} } /* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10

2023-03-22 16:13:05 2023-03-22 16:13:10 WaterSensor(id=a1, ts=1, vc=3) 2023-03-22 16:13:10 2023-03-22 16:13:15 WaterSensor(id=a1, ts=1, vc=6) 2023-03-22 16:13:15 2023-03-22 16:13:20 WaterSensor(id=u1, ts=2, vc=4) 2023-03-22 16:13:20 2023-03-22 16:13:25 WaterSensor(id=u1, ts=2, vc=8) 2023-03-22 16:13:25 2023-03-22 16:13:30 WaterSensor(id=p1, ts=3, vc=30) */

3-3.AggregateFunction 代碼示例:

package com.zenitera.bigdata.window;

import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;

public class Flink03_Window_AggregateFunction { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);

env .socketTextStream(“l(fā)ocalhost”, 6666) .map(line -> { String[] data = line.split(“,”);

return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate( new AggregateFunction() { @Override public Avg createAccumulator() { return new Avg(); }

@Override public Avg add(WaterSensor value, Avg acc) { acc.sum += value.getVc(); acc.count++; return acc; }

@Override public Double getResult(Avg acc) { return acc.sum * 1.0 / acc.count; }

@Override public Avg merge(Avg a, Avg b) { return null; } }, new ProcessWindowFunction() { @Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception { Double result = elements.iterator().next();

String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd());

out.collect(key + " " + stt + " " + edt + " " + result); } } ) .print();

try { env.execute(); } catch (Exception e) { e.printStackTrace(); }

}

public static class Avg { public Integer sum = 0; public Long count = 0L; } }

/* D:\netcat-win32-1.12>nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10

a1 2023-03-22 16:19:45 2023-03-22 16:19:50 3.0 a1 2023-03-22 16:19:50 2023-03-22 16:19:55 3.0 u1 2023-03-22 16:19:55 2023-03-22 16:20:00 4.0 u1 2023-03-22 16:20:00 2023-03-22 16:20:05 4.0 p1 2023-03-22 16:20:05 2023-03-22 16:20:10 10.0 p1 2023-03-22 16:20:10 2023-03-22 16:20:15 10.0 */

11 . 一個 Flink 任務(wù)中可以既有事件時間窗口,又有處理時間窗口嗎? ?

結(jié)論:一個 Flink 任務(wù)可以同時有事件時間窗口,又有處理時間窗口。 那么有些小伙伴們問了,為什么我們常見的 Flink 任務(wù)要么設(shè)置為事件時間語義,要么設(shè)置為處理時間語義? 確實,在生產(chǎn)環(huán)境中,我們的 Flink 任務(wù)一般不會同時擁有兩種時間語義的窗口。 那么怎么解釋開頭所說的結(jié)論呢? 這里從兩個角度進行說明: 1.? 我們其實沒有必要把一個 Flink 任務(wù)和某種特定的時間語義進行綁定。對于事件時間窗口來說,我們只要給它 watermark,能讓 watermark 一直往前推進,讓事件時間窗口能夠持續(xù)觸發(fā)計算就行。對于處理時間來說更簡單,只要窗口算子按照本地時間按照固定的時間間隔進行觸發(fā)就行。無論哪種時間窗口,主要滿足時間窗口的觸發(fā)條件就行。 2.? Flink 的實現(xiàn)上來說也是支持的。Flink 是使用一個叫做 TimerService 的組件來管理 timer 的,我們可以同時注冊事件時間和處理時間的 timer,F(xiàn)link 會自行判斷 timer 是否滿足觸發(fā)條件,如果是,則回調(diào)窗口處理函數(shù)進行計算。需求:數(shù)據(jù)源:用戶心跳日志(uid,time,type)。計算分 Android,iOS 的 DAU,最晚一分鐘輸出一次當(dāng)日零點累計到當(dāng)前的結(jié)果。 3.? 實現(xiàn)方式 1:cumulate 窗口 優(yōu)點:如果是曲線圖的需求,可以完美回溯曲線圖。 缺點:大窗口之間如果有數(shù)據(jù)亂序,有丟數(shù)風(fēng)險;并且由于是 watermark 推動產(chǎn)出,所以數(shù)據(jù)產(chǎn)出會有延遲。 1.? 實現(xiàn)方式 2:Deduplicate 優(yōu)點:計算快。 缺點:任務(wù)發(fā)生 failover,曲線圖不能很好回溯。沒法支持 cube 計算。 1.? 實現(xiàn)方式 3:group agg 優(yōu)點:計算快,支持 cube 計算。 缺點:任務(wù)發(fā)生 failover,曲線圖不能很好回溯

12 . 簡述作業(yè)在很多情況下有可能會失敗。失敗之后重新去運行時,我們?nèi)绾伪WC數(shù)據(jù)的一致性 ?

Fink 基于 Chandv-Lampot 算法,會把分布式的每一個節(jié)點的狀態(tài)保存到分布式文件系統(tǒng)里面作為 Checkpoint(檢點),過程大致如下。首先,從數(shù)據(jù)源端開始注入 Checkpoint Barrier,它是一種比較特殊的消息。 然后它會跟普通的事件一樣隨著數(shù)據(jù)流去流動,當(dāng) Barrier 到達算子之后,這個算子會把它當(dāng)前的本地狀態(tài)進行快照保存,當(dāng) Barrier流動到 Sink,所有的狀態(tài)都保存完整了之后,它就形成一個全局的快照。 這樣當(dāng)作業(yè)失敗之后,就可以通過遠程文件系統(tǒng)里面保存的 Checkpoint 來進行回滾:先把 Source 回滾到 Checkpoint 記錄的ofset,然后把有狀態(tài)節(jié)點當(dāng)時的狀態(tài)回滾到對應(yīng)的時間點,進行重新計算。這樣既可以不用從頭開始計算,又能保證數(shù)據(jù)語義的一致性。

13 . 簡述Flink的CEP ?

CEP的概念: ?復(fù)雜事件處理(Complex Event Processing),用于識別輸入流中符合指定規(guī)則的事件,并按照指定方式輸出。 ?起床—>洗漱—>吃飯—>上班一系列串聯(lián)起來的事件流形成的模式 ?瀏覽商品—>加入購物車—>創(chuàng)建訂單—>支付完成—>發(fā)貨—>收貨事件流形成的模式。 通過概念可以了解,CEP主要是識別輸入流中用戶指定的一些基本規(guī)則的事件,然后將這些事件再通過指定方式輸出。 如下圖所示: 我們指定“方塊、圓”為基本規(guī)則的事件,在輸入的原始流中,將這些事件作為一個結(jié)果流輸出來。 CEP的使用場景: 像用戶異常檢測:我們指定異常操作事件為要輸出的結(jié)果流;策略營銷:指定符合要求的事件為結(jié)果流;運維監(jiān)控:指定一定范圍的指標(biāo)為結(jié)果流;銀行卡盜刷:指定同一時刻在兩個地方被刷兩次為異常結(jié)果流。 在Flink CEP API中,主要通過Pattern 類來進行實現(xiàn)。CEP模式主要分為三種模式: 1、個體模式 ?單例模式:只接收一個事件 p 觸發(fā)條件 (.where()/.or()/.until()) ?循環(huán)模式:可以接收一個或多個事件 p單例 +量詞(在個體模式后追加量詞,指定循環(huán)次數(shù)) 個體模式分為單例模式和循環(huán)模式: 單例模式只接收一個事件,主要通過(.where()/.or()/.until())等條件觸發(fā),使用規(guī)則如下:start為定義的變量。 循環(huán)模式可以接收一個或多個事件,使用規(guī)則為 單例+量詞,如下圖所示: 2、組合模式(多個個體模式的組合) ?嚴(yán)格連續(xù)(next) ?中間沒有任何不匹配的事件 ?寬松連續(xù)(followBy) ?忽略匹配的事件之間的不匹配的事件 ?不確定的寬松連續(xù)(followByAny) ?一個匹配的事件能夠再次使用 組合模式主要分為三種使用規(guī)則:但是在使用組合模式之前,必須以初始模式開始,使用begin() 控制,如下圖: (1)嚴(yán)格連續(xù),通過next()方法控制,這句話是指用戶定義的基本事件,如最上圖中的方塊,必須是連續(xù)都是方塊,不能出圓圈。可以通過下圖理解。 (2)寬松連續(xù),通過followBy()控制,中間可以有不匹配的事件 (3)不確定的寬松連續(xù),通過followByAny()方法控制,表示一個匹配的事件可以多次被使用 組合模式還包含:“不希望出現(xiàn)某種連續(xù)關(guān)系”: .notNext()—— 不想讓某個事件嚴(yán)格近鄰前一個事件發(fā)生 .notFollowedBy()——不想讓某個事件在兩個事件之間發(fā)生 組合模式注意事項: 所有組合模式必須以.begin()開始;組合模式不能以.notFollowedBy()結(jié)束;"not"類型的模式不能被optional所修飾;此外,還可以為模式指定時間約束,用來要求在多長時間內(nèi)匹配有效。 3、模式組 ?一個組合模式作為條件嵌套在個體模式里pPattern((ab)c) 為例幫助大家更好的理解CEP的API使用,接下里通過兩個案例,對CEP的使用進行講解。 (a)電商案例——條件創(chuàng)建訂單之后15分鐘之內(nèi)一定要付款,否則取消訂單 案例介紹:在電商系統(tǒng)當(dāng)中,經(jīng)常會發(fā)現(xiàn)有些訂單下單之后沒有支付,就會有一個倒計時的時間值,提示你在15分鐘內(nèi)完成支付,如果沒有完成支付,那么該訂單就會被取消,主要是因為拍下訂單就會減庫存,但是如果一直沒有支付,那么就會造成庫存沒有了,后面購買的時候買不到。 在CEP概念介紹時,引入過一條鏈路: ?瀏覽商品—>加入購物車—>創(chuàng)建訂單—>支付完成—>發(fā)貨—>收貨事件流形成的模式。 上述模式是標(biāo)準(zhǔn)的電商事件流,我們案例中定義的CEP規(guī)則是 創(chuàng)建訂單—>支付完成 這兩個環(huán)節(jié),要求是,判斷15分鐘內(nèi)是否付款,否則訂單取消。 代碼設(shè)計: ?(1)定義實體類:訂單編號,訂單狀態(tài)(1 創(chuàng)建訂單,等待支付,2支付訂單完成,3取消訂單,申請退款,4已發(fā)貨,5確認收貨,已經(jīng)完成)訂單創(chuàng)建時間,訂單金額 ?(2)創(chuàng)建運行環(huán)境,設(shè)置流時間特性為事件時間,設(shè)置并行度 ?(3)設(shè)置Source源 ?(4)定義Pattern模式,指定條件 在Pattern模式中,我們使用到了個體模式,組合模式 首先,使用begin()方法以模式作為開始; 其次,由于創(chuàng)建的CEP規(guī)則是 創(chuàng)建訂單—>支付完成 ,則在單例模式下,通過where條 件觸發(fā)訂單的狀態(tài)已將處于 1 創(chuàng)建訂單,等待支付狀態(tài) 接著,通過寬松連續(xù)followBy()方法忽略15分鐘內(nèi)的產(chǎn)生的其他訂單,所以使用寬松連續(xù)。 然后,通過where條件觸發(fā)訂單的狀態(tài)是否已將處于 2 支付完成。 最后,通過within()方法判斷第二次觸發(fā)的訂單是否在支付的15分鐘內(nèi)完成。 ?(5)訂單超時檢測 ?(6)運行結(jié)果對比 查看最終的運行結(jié)果,可以得知201608041140… 這個訂單的狀態(tài)為1,且沒有其他狀態(tài),最終被系統(tǒng)判斷為超時訂單。 (b)系統(tǒng)登錄案例——當(dāng)2秒內(nèi)出現(xiàn)兩次登錄失敗(“fail”)時,輸出異常報警信息 同樣,根據(jù)上述信息,我們做以下代碼設(shè)計: ?(1)設(shè)置實體類:用戶ID,登錄IP,事件類型,事件時間 ?(2)設(shè)置環(huán)境,指定source源,傳入實體類 ?(3)定義pattern匹配模式(規(guī)則),指定條件 begin: 第一次;where: 出現(xiàn) fail;next: 緊接著第二次出現(xiàn) fail;times: 出現(xiàn)一次 ;within:表示在2秒內(nèi)。 ?(4)結(jié)果對比 查看最終的運行結(jié)果,可以得知服務(wù)器編號為1,連續(xù)2秒在兩個服務(wù)器登錄,登錄失??;服務(wù)器編號為3,連續(xù)2秒在一臺服務(wù)器登錄,登錄失?。?/p>

14 . 簡述作業(yè)在很多情況下有可能會失敗。失敗之后重新去運行時,我們?nèi)绾伪WC數(shù)據(jù)的一致性 ?

Flink 基于 Chandy-Lamport 算法,會把分布式的每一個節(jié)點的狀態(tài)保存到分布式文件系統(tǒng)里面作為 Checkpoint(檢查點),過程大致如下。首先,從數(shù)據(jù)源端開始注入 Checkpoint Barrier,它是一種比較特殊的消息。

然后它會跟普通的事件一樣隨著數(shù)據(jù)流去流動,當(dāng) Barrier 到達算子之后,這個算子會把它當(dāng)前的本地狀態(tài)進行快照保存,當(dāng) Barrier 流動到 Sink,所有的狀態(tài)都保存完整了之后,它就形成一個全局的快照。

這樣當(dāng)作業(yè)失敗之后,就可以通過遠程文件系統(tǒng)里面保存的 Checkpoint 來進行回滾:先把 Source 回滾到 Checkpoint 記錄的 offset,然后把有狀態(tài)節(jié)點當(dāng)時的狀態(tài)回滾到對應(yīng)的時間點,進行重新計算。這樣既可以不用從頭開始計算,又能保證數(shù)據(jù)語義的一致性。

15 . 簡述Flink Checkpoint與 Spark 的相比,F(xiàn)link 有什么區(qū)別或優(yōu)勢嗎 ?

Spark Streaming 的 Checkpoint 僅僅是針對 Driver 的故障恢復(fù)做了數(shù)據(jù)和元數(shù)據(jù)的 Checkpoint。而 Flink 的 Checkpoint 機制要復(fù)雜了很多,它采用的是輕量級的分布式快照,實現(xiàn)了每個算子的快照,及流動中的數(shù)據(jù)的快照

16 . 簡述Flink 中的 Time 有哪幾種 ?

Flink中的時間有三種類型:

Event Time:是事件創(chuàng)建的時間。它通常由事件中的時間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會記錄自己的生成時間,F(xiàn)link通過時間戳分配器訪問事件時間戳。 Ingestion Time:是數(shù)據(jù)進入Flink的時間。 Processing Time:是每一個執(zhí)行基于時間操作的算子的本地系統(tǒng)時間,與機器相關(guān),默認的時間屬性就是Processing Time。 例如,一條日志進入Flink的時間為2021-01-22 10:00:00.123,到達Window的系統(tǒng)時間為2021-01-22 10:00:01.234,日志的內(nèi)容如下: 2021-01-06 18:37:15.624 INFO Fail over to rm2

對于業(yè)務(wù)來說,要統(tǒng)計1min內(nèi)的故障日志個數(shù),哪個時間是最有意義的?—— eventTime,因為我們要根據(jù)日志的生成時間進行統(tǒng)計。

17 . 簡述Flink對于遲到數(shù)據(jù)是怎么處理的 ?

Flink中 WaterMark 和 Window 機制解決了流式數(shù)據(jù)的亂序問題,對于因為延遲而順序有誤的數(shù)據(jù),可以根據(jù)eventTime進行業(yè)務(wù)處理,對于延遲的數(shù)據(jù)Flink也有自己的解決辦法,主要的辦法是給定一個允許延遲的時間,在該時間范圍內(nèi)仍可以接受處理延遲數(shù)據(jù):

設(shè)置允許延遲的時間是通過allowedLateness(lateness: Time)設(shè)置 保存延遲數(shù)據(jù)則是通過sideOutputLateData(outputTag: OutputTag[T])保存 獲取延遲數(shù)據(jù)是通過DataStream.getSideOutput(tag: OutputTag[X])獲取

18 . 簡述Flink 的運行必須依賴 Hadoop 組件嗎 ?

Flink可以完全獨立于Hadoop,在不依賴Hadoop組件下運行。但是做為大數(shù)據(jù)的基礎(chǔ)設(shè)施,Hadoop體系是任何大數(shù)據(jù)框架都繞不過去的。Flink可以集成眾多Hadooop 組件,例如Yarn、Hbase、HDFS等等。例如,F(xiàn)link可以和Yarn集成做資源調(diào)度,也可以讀寫HDFS,或者利用HDFS做檢查點

19 . 簡述Flink集群有哪些角色?各自有什么作用 ?

有以下三個角色:

JobManager處理器:

也稱之為Master,用于協(xié)調(diào)分布式執(zhí)行,它們用來調(diào)度task,協(xié)調(diào)檢查點,協(xié)調(diào)失敗時恢復(fù)等。Flink運行時至少存在一個master處理器,如果配置高可用模式則會存在多個master處理器,它們其中有一個是leader,而其他的都是standby。

TaskManager處理器:

也稱之為Worker,用于執(zhí)行一個dataflow的task(或者特殊的subtask)、數(shù)據(jù)緩沖和data stream的交換,F(xiàn)link運行時至少會存在一個worker處理器。

Clint客戶端:

Client是Flink程序提交的客戶端,當(dāng)用戶提交一個Flink程序時,會首先創(chuàng)建一個Client,該Client首先會對用戶提交的Flink程序進行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager

20 . 簡述Flink 資源管理中 Task Slot 的概念 ?

在Flink中每個TaskManager是一個JVM的進程, 可以在不同的線程中執(zhí)行一個或多個子任務(wù)。為了控制一個worker能接收多少個task。worker通過task slot(任務(wù)槽)來進行控制(一個worker至少有一個task slot)

21 . 簡述Flink的重啟策略了解嗎 ?

Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟:

1:固定延遲重啟策略 固定延遲重啟策略會嘗試一個給定的次數(shù)來重啟Job,如果超過了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會等待一個固定的時間。

2:失敗率重啟策略 失敗率重啟策略在Job失敗后會重啟,但是超過失敗率后,Job會最終被認定失敗。在兩個連續(xù)的重啟嘗試之間,重啟策略會等待一個固定的時間。

3:無重啟策略 Job直接失敗,不會嘗試進行重啟。

22 . 簡述Flink 是如何保證 Exactly-once 語義的 ?

Flink通過實現(xiàn)兩階段提交和狀態(tài)保存來實現(xiàn)端到端的一致性語義。分為以下幾個步驟:

開始事務(wù)(beginTransaction)創(chuàng)建一個臨時文件夾,來寫把數(shù)據(jù)寫入到這個文件夾里面 預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫入文件并關(guān)閉 正式提交(commit)將之前寫完的臨時文件放入目標(biāo)目錄下。這代表著最終的數(shù)據(jù)會有一些延遲丟棄(abort)丟棄臨時文件 若失敗發(fā)生在預(yù)提交成功后,正式提交前??梢愿鶕?jù)狀態(tài)來提交預(yù)提交的數(shù)據(jù),也可刪除預(yù)提交的數(shù)據(jù)。

23 . 簡述Flink如何保證精確一次性消費 ?

Flink 保證精確一次性消費主要依賴于兩種Flink機制

1、Checkpoint機制

2、二階段提交機制

Checkpoint機制 主要是當(dāng)Flink開啟Checkpoint的時候,會往Source端插入一條barrir,然后這個barrir隨著數(shù)據(jù)流向一直流動,當(dāng)流入到一個算子的時候,這個算子就開始制作checkpoint,制作的是從barrir來到之前的時候當(dāng)前算子的狀態(tài),將狀態(tài)寫入狀態(tài)后端當(dāng)中。然后將barrir往下流動,當(dāng)流動到keyby 或者shuffle算子的時候,例如當(dāng)一個算子的數(shù)據(jù),依賴于多個流的時候,這個時候會有barrir對齊,也就是當(dāng)所有的barrir都來到這個算子的時候進行制作checkpoint,依次進行流動,當(dāng)流動到sink算子的時候,并且sink算子也制作完成checkpoint會向jobmanager 報告 checkpoint n 制作完成。

二階段提交機制 Flink 提供了CheckpointedFunction與CheckpointListener這樣兩個接口,CheckpointedFunction中有snapshotState方法,每次checkpoint觸發(fā)執(zhí)行方法,通常會將緩存數(shù)據(jù)放入狀態(tài)中,可以理解為一個hook,這個方法里面可以實現(xiàn)預(yù)提交,CheckpointListyener中有notifyCheckpointComplete方法,checkpoint完成之后的通知方法,這里可以做一些額外的操作。例如FLinkKafkaConumerBase使用這個來完成Kafka offset的提交,在這個方法里面可以實現(xiàn)提交操作。在2PC中提到如果對應(yīng)流程例如某個checkpoint失敗的話,那么checkpoint就會回滾,不會影響數(shù)據(jù)一致性,那么如果在通知checkpoint成功的之后失敗了,那么就會在initalizeSate方法中完成事務(wù)的提交,這樣可以保證數(shù)據(jù)的一致性。最主要是根據(jù)checkpoint的狀態(tài)文件來判斷的。

24 . 簡述Flink的狀態(tài)可以用來做什么 ?

Flink狀態(tài)主要有兩種使用方式:

1:checkpoint的數(shù)據(jù)恢復(fù) 2:邏輯計算

25 . 簡述Flink的Checkpoint底層如何實現(xiàn)的?savepoint和checkpoint有什么區(qū)別 ?

1、Checkpoint Flink中基于異步輕量級的分布式快照技術(shù)提供了Checkpoints容錯機制,分布式快照可以將同一時間點 Task/Operator的狀態(tài)數(shù)據(jù)全局統(tǒng)一快照處理,包括Keyed State和Operator State(State后面也接著介紹了)。如下圖所示,F(xiàn)link會在輸入的數(shù)據(jù)集上間隔性地生成checkpoint barrier,通過柵欄(barrier)將間隔時間段內(nèi)的數(shù)據(jù)劃分到相應(yīng)的checkpoint中。當(dāng)應(yīng)用出現(xiàn)異常時,Operator就能夠從上一次快照中 恢復(fù)所有算子之前的狀態(tài),從而保證數(shù)據(jù)的一致性。例如在Kafka Consumer算子中維護Ouset狀態(tài),當(dāng)系統(tǒng)出現(xiàn)問題無法從Kafka中消費數(shù)據(jù)時,可以將Ouset記錄在狀態(tài)中,當(dāng)任務(wù)重新恢復(fù)時就能夠從指定 的偏移量開始消費數(shù)據(jù)。對于狀態(tài)占用空間比較小的應(yīng)用,快照產(chǎn)生過程非常輕量,高頻率創(chuàng)建且對 Flink任務(wù)性能影響相對較小。checkpoint過程中狀態(tài)數(shù)據(jù)一般被保存在一個可配置的環(huán)境中,通常是在 JobManager節(jié)點或HDFS上。

checkpoint原理就是連續(xù)繪制分布式的快照,而且非常輕量級,可以連續(xù)繪制,并且不會對性能產(chǎn)生太 大影響。

默認情況下Flink不開啟檢查點的,用戶需要在程序中通過調(diào)用enable-Checkpointing(n)方法配置和開啟 檢查點,其中n為檢查點執(zhí)行的時間間隔,單位為毫秒。 2、savepoint和checkpoint區(qū)別 Savepoints是檢查點的一種特殊實現(xiàn),底層其實也是使用Checkpoints的機制。Savepoints是用戶以手工命令的方式觸發(fā)Checkpoint,并將結(jié)果持久化到指定的存儲路徑中,其主要目的是幫助用戶在升級和維 護集群過程中保存系統(tǒng)中的狀態(tài)數(shù)據(jù),避免因為停機運維或者升級應(yīng)用等正常終止應(yīng)用的操作而導(dǎo)致系 統(tǒng)無法恢復(fù)到原有的計算狀態(tài)的情況,從而無法實現(xiàn)端到端的Excatly-Once語義保證。 下面這張來自Flink 1.1版本文檔的圖示出了checkpoint和savepoint的關(guān)系。

總結(jié)如下所示: checkpoint的側(cè)重點是“容錯”,即Flink作業(yè)意外失敗并重啟之后,能夠直接從早先打下的 checkpoint恢復(fù)運行,且不影響作業(yè)邏輯的準(zhǔn)確性。而savepoint的側(cè)重點是“維護”,即Flink作業(yè)需 要在人工干預(yù)下手動重啟、升級、遷移或A/B測試時,先將狀態(tài)整體寫入可靠存儲,維護完畢之后 再從savepoint恢復(fù)現(xiàn)場。 savepoint是“通過checkpoint機制”創(chuàng)建的,所以savepoint本質(zhì)上是特殊的checkpoint。 checkpoint面向Flink Runtime本身,由Flink的各個TaskManager定時觸發(fā)快照并自動清理,一般不需要用戶干預(yù);savepoint面向用戶,完全根據(jù)用戶的需要觸發(fā)與清理。 checkpoint的頻率往往比較高(因為需要盡可能保證作業(yè)恢復(fù)的準(zhǔn)確度),所以checkpoint的存儲 格式非常輕量級,但作為trade-ou犧牲了一切可移植(portable)的東西,比如不保證改變并行度 和升級的兼容性。savepoint則以二進制形式存儲所有狀態(tài)數(shù)據(jù)和元數(shù)據(jù),執(zhí)行起來比較慢而且 “貴”,但是能夠保證portability,如并行度改變或代碼升級之后,仍然能正?;謴?fù)。 checkpoint是支持增量的(通過RocksDB),特別是對于超大狀態(tài)的作業(yè)而言可以降低寫入成本。 savepoint并不會連續(xù)自動觸發(fā),所以savepoint沒有必要支持增量

26 . 簡述Flink的Checkpoint流程 ?

Checkpoint由JM的Checkpoint Coordinator發(fā)起 第一步,Checkpoint Coordinator 向所有 source 節(jié)點 trigger Checkpoint;

第二步,source 節(jié)點向下游廣播 barrier,這個 barrier 就是實現(xiàn) Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才會執(zhí)行相應(yīng)的 Checkpoint。

第三步,當(dāng) task 完成 state 備份后,會將備份數(shù)據(jù)的地址(state handle)通知給 Checkpoint coordinator。

這里分為同步和異步(如果開啟的話)兩個階段: 1)同步階段:task執(zhí)行狀態(tài)快照,并寫入外部存儲系統(tǒng)(根據(jù)狀態(tài)后端的選擇不同有所區(qū)別) 執(zhí)行快照的過程: 對state做深拷貝 將寫操作封裝在異步的FutureTask中 FutureTask的作用包括: 打開輸入流 寫入狀態(tài)的元數(shù)據(jù)信息 寫入狀態(tài)關(guān)閉輸入流 2)異步階段 執(zhí)行同步階段創(chuàng)建的FutureTask 向Checkpoint Coordinator發(fā)送ACK響應(yīng)

第四步,下游的 sink 節(jié)點收集齊上游兩個 input 的 barrier 之后,會執(zhí)行本地快照,這里特地展示了RocksDB incremental Checkpoint 的流程,首先 RocksDB 會全量刷數(shù)據(jù)到磁盤上(紅色大三角表示), 然后 Flink 框架會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。 同樣的,sink 節(jié)點在完成自己的 Checkpoint 之后,會將 state handle 返回通知 Coordinator。

最后,當(dāng) Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全局完成了,向持久化存儲中再備份一個 Checkpoint meta 文件。

27 . 簡述Flink Checkpoint的作用 ?

Checkpoint :某一時刻,F(xiàn)link中所有的Operator的當(dāng)前State的全局快照,一般存在磁盤上。 CheckPoint是通過快照(SnapShot)的方式使得將歷史某些時刻的狀態(tài)保存下來,當(dāng)Flink任務(wù)意外掛掉 之后,重新運行程序后默認從最近一次保存的完整快照處進行恢復(fù)任務(wù)。 Flink中的Checkpoint底層使用了 Chandy-Lamport algorithm 分布式快照算法,可以保證數(shù)據(jù)的在分布式環(huán)境下的一致性。

28 . 簡述Flink中Checkpoint超時原因 ?

1)計算量大,CPU密集性,導(dǎo)致TM內(nèi)線程一直在processElement,而沒有時間做Checkpoint 解決方案:過濾掉部分?jǐn)?shù)據(jù),增大并行度 修改實現(xiàn)邏輯,進行批流分開計算,比如離線數(shù)據(jù)每半個小時進行一次計算,而實時計算只需要計算最 近半小時內(nèi)的數(shù)據(jù)即可??傊畠蓚€方法,一、減少源數(shù)據(jù)量,過濾黑名單或者非法ID,window聚合; 二、簡化處理邏輯,特別是減少遍歷。 2)數(shù)據(jù)傾斜 解決方案: 第一,兩階段聚合;第二,重新設(shè)置并行度,改變KeyGroup的分布 3)頻繁FULL GC(‘Checkpoint Duration (Async)’時間長) 當(dāng)StateSize達到200M以上,Async的時間會超過1min。 這種情況比較少見。 4)出現(xiàn)反壓 包含barrier的event buuer一直不到 ,subTaskCheckpointCoordinator做不了Checkpoint,就會超時

29 . 簡述Flink的Exactly Once語義怎么保證 ?

下級存儲支持事務(wù):Flink可以通過實現(xiàn)兩階段提交和狀態(tài)保存來實現(xiàn)端到端的一致性語義。 分為以下幾個步驟: 1)開始事務(wù)(beginTransaction)創(chuàng)建一個臨時文件夾,來寫把數(shù)據(jù)寫入到這個文件夾里面 2) 預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫入文件并關(guān)閉 3)正式提交(commit)將之前寫完的臨時文件放入目標(biāo)目錄下。這代表著最終的數(shù)據(jù)會有一些延遲 4)丟棄(abort)丟棄臨時文件 5)若失敗發(fā)生在預(yù)提交成功后,正式提交前。可以根據(jù)狀態(tài)來提交預(yù)提交的數(shù)據(jù),也可刪除預(yù)提交的 數(shù)據(jù)。 下級存儲不支持事務(wù): 具體實現(xiàn)是冪等寫入,需要下級存儲具有冪等性寫入特性

30 . 簡述Flink的端到端Exactly Once ?

1、Flink的Exactly-Once語義 對于Exactly-Once語義,指的是每一個到來的事件僅會影響最終結(jié)果一次。就算機器宕機或者軟件崩 潰,即沒有數(shù)據(jù)重復(fù),也沒有數(shù)據(jù)丟失。 這里有一個關(guān)于checkpoint算法的簡要介紹,這對于了解更廣的主題來說是十分必要的。 一個checkpoint是Flink的一致性快照,它包括: 程序當(dāng)前的狀態(tài)輸入流的位置 Flink通過一個可配置的時間,周期性的生成checkpoint,將它寫入到存儲中,例如S3或者HDFS。寫入到 存儲的過程是異步的,意味著Flink程序在checkpoint運行的同時還可以處理數(shù)據(jù)。 在機器或者程序遇到錯誤重啟的時候,F(xiàn)link程序會使用最新的checkpoint進行恢復(fù)。Flink會恢復(fù)程序的 狀態(tài),將輸入流回滾到checkpoint保存的位置,然后重新開始運行。這意味著Flink可以像沒有發(fā)生錯誤 一樣計算結(jié)果。 在Flink 1.4.0版本之前,F(xiàn)link僅保證Flink程序內(nèi)部的Exactly-Once語義,沒有擴展到在Flink數(shù)據(jù)處理完成后存儲的外部系統(tǒng)。 Flink程序可以和不同的接收器(sink)交互,開發(fā)者需要有能力在一個組件的上下文中維持Exactly-Once 語義。 為了提供端到端Exactly-Once語義,除了Flink應(yīng)用程序本身的狀態(tài),F(xiàn)link寫入的外部存儲也需要滿足這 個語義。也就是說,這些外部系統(tǒng)必須提供提交或者回滾的方法,然后通過Flink的checkpoint來協(xié)調(diào)。 在分布式系統(tǒng)中,協(xié)調(diào)提交和回滾的通用做法是兩階段提交。Flink的TwoPhaseCommitSinkFunction使 用兩階段提交協(xié)議來保證端到端的Exactly-Once語義。 2、Flink程序端到端的Exactly-Once語義

Kafka是一個流行的消息中間件,經(jīng)常被拿來和Flink一起使用,Kafka 在最近的0.11版本中添加了對事務(wù)的支持。這意味著現(xiàn)在Flink讀寫Kafka有了必要的支持,使之能提供端到端的Exactly-Once語義。 Flink對端到端的Exactly-Once語義不僅僅局限在Kafka,你可以使用任一輸入輸出源(source、sink),只 要他們提供了必要的協(xié)調(diào)機制。例如Pravega ,來自DELL/EMC的流數(shù)據(jù)存儲系統(tǒng),通過Flink的 TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once語義。 在這個示例程序中,有: 從Kafka讀取數(shù)據(jù)的data source(KafkaConsumer,在Flink中) 窗口聚合 將數(shù)據(jù)寫回到Kafka的data sink(KafkaProducer,在Flink中) 在data sink中要保證Exactly-Once語義,它必須將所有的寫入數(shù)據(jù)通過一個事務(wù)提交到Kafka。在兩個 checkpoint之間,一個提交綁定了所有要寫入的數(shù)據(jù)。 這保證了當(dāng)出錯的時候,寫入的數(shù)據(jù)可以被回滾。 然而在分布式系統(tǒng)中,通常擁有多個并行執(zhí)行的寫入任務(wù),簡單的提交和回滾是效率低下的。為了保證 一致性,所有的組件必須先達成一致,才能進行提交或者回滾。Flink使用了兩階段提交協(xié)議以及預(yù)提交 階段來解決這個問題。 在checkpoint開始的時候,即兩階段提交中的預(yù)提交階段。首先,F(xiàn)link的JobManager在數(shù)據(jù)流中注入一 個checkpoint屏障(它將數(shù)據(jù)流中的記錄分割開,一些進入到當(dāng)前的checkpoint,另一些進入下一個checkpoint)。 屏障通過operator傳遞。對于每一個operator,它將觸發(fā)operator的狀態(tài)快照寫入到state backend data source保存了Kafka的ouset,之后把checkpoint屏障傳遞到后續(xù)的operator。

這種方式僅適用于operator有他的內(nèi)部狀態(tài)。內(nèi)部狀態(tài)是指,F(xiàn)link state backends保存和管理的內(nèi)容-舉例來說,第二個operator中window聚合算出來的sum。當(dāng)一個進程有它的內(nèi)部狀態(tài)的時候,除了在 checkpoint之前將需要將數(shù)據(jù)更改寫入到state backend,不需要在預(yù)提交階段做其他的動作。在 checkpoint成功的時候,F(xiàn)link會正確的提交這些寫入,在checkpoint失敗的時候會終止提交 然而,當(dāng)一個進程有外部狀態(tài)的時候,需要用一種不同的方式來處理。外部狀態(tài)通常由需要寫入的外部 系統(tǒng)引入,例如Kafka。因此,為了提供Exactly-Once保證,外部系統(tǒng)必須提供事務(wù)支持,借此和兩階段 提交協(xié)議交互。 在這個例子中,由于需要將數(shù)據(jù)寫到Kafka,data sink有外部的狀態(tài)。因此,在預(yù)提交階段,除了將狀態(tài)寫入到state backend之外,data sink必須預(yù)提交自己的外部事務(wù)。

當(dāng)checkpoint屏障在所有operator中都傳遞了一遍,以及它觸發(fā)的快照寫入完成,預(yù)提交階段結(jié)束。這個 時候,快照成功結(jié)束,整個程序的狀態(tài),包括預(yù)提交的外部狀態(tài)是一致的。萬一出錯的時候,我們可以 通過checkpoint重新初始化。 下一步是通知所有operator,checkpoint已經(jīng)成功了。這時兩階段提交中的提交階段,Jobmanager為程 序中的每一個operator發(fā)起checkpoint已經(jīng)完成的回調(diào)。data source和window operator沒有外部的狀態(tài),在提交階段中,這些operator不會執(zhí)行任何動作。data sink擁有外部狀態(tài),所以通過事務(wù)提交外部寫入。

對上述的知識點匯總一下: 一旦所有的operator完成預(yù)提交,就提交一個commit。 如果至少有一個預(yù)提交失敗,其他的都會失敗,這時回滾到上一個checkpoint保存的位置。 預(yù)提交成功后,提交的commit也需要保障最終成功-operator和外部系統(tǒng)需要提供這個保障。如果 commit失敗了(比如網(wǎng)絡(luò)中斷引起的故障),整個flink程序也因此失敗,它會根據(jù)用戶的重啟策 略重啟,可能還會有一個嘗試性的提交。這個過程非常嚴(yán)苛,因為如果提交沒有最終生效,會導(dǎo)致 數(shù)據(jù)丟失。 因此,我們可以確定所有的operator同意checkpoint的最終結(jié)果:要么都同意提交數(shù)據(jù),要么提交被終止 然后回滾。

31 . 簡述Flink的水?。╓atermark),有哪幾種 ?

水?。╓atermark)用于處理亂序事件,而正確地處理亂序事件,通常用Watermark機制結(jié)合窗口來實 現(xiàn)。 從流處理原始設(shè)備產(chǎn)生事件,到Flink讀取到數(shù)據(jù),再到Flink多個算子處理數(shù)據(jù),在這個過程中,會受到 網(wǎng)絡(luò)延遲、數(shù)據(jù)亂序、背壓、Failover等多種情況的影響,導(dǎo)致數(shù)據(jù)是亂序的。雖然大部分情況下沒有問 題,但是不得不在設(shè)計上考慮此類異常情況,為了保證計算結(jié)果的正確性,需要等待數(shù)據(jù),這帶來了計 算的延遲。對于延遲太久的數(shù)據(jù),不能無限期地等下去,所以必須有一個機制,來保證特定的時間后一 定會觸發(fā)窗口進行計算,這個觸發(fā)機制就是Watermark。 在DataStream和Flink Table & SQL模塊中,使用了各自的Watermark生成體系。 1、DataStream Watermark生成 通常Watermark在Source Function中生成,如果是并行計算的任務(wù),在多個并行執(zhí)行的Source Function 中,相互獨立產(chǎn)生各自的Watermark。而Flink提供了額外的機制,允許在調(diào)用DataStream API操作(如 map、filter等)之后,根據(jù)業(yè)務(wù)邏輯的需要,使用時間戳和Watermark生成器修改數(shù)據(jù)記錄的時間戳和 Watermark。 1) Source Function中生成Watermark Source Function可以直接為數(shù)據(jù)元素分配時間戳,同時也會向下游發(fā)送Watermark。在Source Function中為數(shù)據(jù)分配了時間戳 和Watermark就不必在DataStream API中使用了。需要注意的是:如果一個timestamp分配器被使用的話, 由源提供的任何Timestamp和Watermark都會被重寫。

為了通過SourceFunction直接為一個元素分配一個時間戳,SourceFunction需要調(diào)用SourceContext中的collectWithTimestamp(…)方法。為了生成Watermark,源需要調(diào)用emitWatermark(Watermark)方 法,如以下代碼所示。

2) DataStream API中生成Watermark DataStream API中使用的TimestampAssigner 接口定義了時間戳的提取行為,其有兩個不同接口 AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks,分別代表了不同的Watermark生 成策略。TimestampAssigner接口體系如下圖所示。

AssignerWithPeriodicWatermarks 是周期性生成Watermark策略的頂層抽象接口,該接口的實現(xiàn)類周期性地生成Watermark,而不會針對每一個事件都生成。 AssignerWithPunctuatedWatermarks 對每一個事件都會嘗試進行Watermark的生成,但是如果生成的Watermark是null或者Watermark小于之前的Watermark,則該Watermark不會發(fā)往下游,因為發(fā)往下游 也不會有任何效果,不會觸發(fā)任何窗口的執(zhí)行。 2、Flink SQL Watermark生成 Flink SQL沒有DataStram API開發(fā)那么靈活,其Watermark的生成主要是在TableSource中完成的,其定義了3類Watermark生成策略。其Watermark生成策略體系如下圖所示。

Watermark的生成機制分為如下3類。 1)周期性Watermark策略

周期性Watermark策略在Flink中叫作 PeriodicWatermarkAssigner ,周期性(一定時間間隔或者達到一定的記錄條數(shù))地產(chǎn)生一個Watermark。在實際的生產(chǎn)中使用周期性Watermark策略的時候,必須注意 時間和數(shù)據(jù)量,結(jié)合時間和積累條數(shù)兩個維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會有很大 的延時。 (1) AscendingTimestamps:遞增Watermark,作用在Flink SQL中的Rowtime屬性上,Watermark=當(dāng)前收到的數(shù)據(jù)元素的最大時間戳-1,此處減1的目的是確保有最大時間戳的事件不會被當(dāng)做遲到數(shù)據(jù)丟棄。 (2) BoundedOutOfOrderTimestamps:固定延遲Watermark,作用在Flink SQL的Rowtime屬性上, Watermark=當(dāng)前收到的數(shù)據(jù)元素的最大時間戳-固定延遲。 2)每事件Watermark策略 每事件Watermark策略在Flink中叫作 PuntuatedWatamarkAssigner ,數(shù)據(jù)流中每一個遞增的 EventTime都會產(chǎn)生一個Watermark。在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark,在一定程度上會對下游算子造成壓力,所以只有在實時性要求非常高的場景下才會選擇Punctuated的方式進行Watermark的生成。 3)無為策略 無為策略在Flink中叫作 PreserveWatermark 。在Flink中可以使用DataStream API和Table & SQL混合編程,所以Flink SQL中不設(shè)定Watermark策略,使用底層DataStream中的Watermark策略也是可以的,這時Flink SQL的Table Source中不做處理。 3、多流的Watermark 在實際的流計算中一個作業(yè)中往往會處理多個Source的數(shù)據(jù),對Source的數(shù)據(jù)進行GroupBy分組,那么 來自不同Source的相同key值會shuule到同一個處理節(jié)點,并攜帶各自的Watermark,F(xiàn)link內(nèi)部要保證Watermark保持單調(diào)遞增,多個Source的Watermark匯聚到一起時可能不是單調(diào)自增的,對于這樣的情況

Flink內(nèi)部實現(xiàn)每一個邊上只能有一個遞增的Watermark,當(dāng)出現(xiàn)多流攜帶EventTime匯聚到一起 (GroupBy或Union)時,Apache Flink會選擇所有流入的EventTime中最小的一個向下游流出,從而保證 Watermark的單調(diào)遞增和數(shù)據(jù)的完整性。

Watermark是在Source Function中生成或者在后續(xù)的DataStream API中生成的。Flink作業(yè)一般是并行執(zhí)行的,作業(yè)包含多個Task,每個Task運行一個或一組算子(OperatorChain)實例,Task在生成Watermark 的時候是相互獨立的,也就是說在作業(yè)中存在多個并行的Watermark。 Watermark在作業(yè)的DAG從上游向下游傳遞,算子收到上游Watermark后會更新其Watermark。如果新的 Watermark大于算子的當(dāng)前Watermark,則更新算子的Watermark為新Watermark,并發(fā)送給下游算子。 某些算子會有多個上游輸入,如Union或keyBy、partition之后的算子。在Flink的底層執(zhí)行模型上,多流 輸入會被分解為多個雙流輸入,所以對于多流Watermark的處理也就是雙流Watermark的處理,無論是哪 一個流的Watermark進入算子,都需要跟另一個流的當(dāng)前算子進行比較,選擇較小的Watermark,即 Min(input1Watermark,intput2Watermark),與算子當(dāng)前的Watermark比較,如果大于算子當(dāng)前的 Watermark,則更新算子的Watermark為新的Watermark,并發(fā)送給下游,如以下代碼所示。 //AbstractStreamOperator.java public voidprocessWatermark1(Watermark mark)throws Exception { inputlWatermark =mark.getTimestamp(); longnewMin=Math.min(input1Watermark,input2Watermark); if(newMin >combinedWatermark){ combinedWatermark=newMin; processWatermark(new Watermark(combinedwatermark)); } } public voidprocessWatermark2(Watermark mark)throws Exception { input2Watermark=mark.getTimestamp(); longnewMin=Math.min(inputlWatermark,input2Watermark); if(newMin>combinedWatermark){ combinedwatermark=newMin; processWatermark(new Watermark(combinedwatermark)); } }

多流Watermark中使用了事件時間。

在上圖中,Source算子產(chǎn)生各自的Watermark,并隨著數(shù)據(jù)流流向下游的map算子,map算子是無狀態(tài)計 算,所以會將Watermark向下透傳。window算子收到上游兩個輸入的Watermark后,選擇其中較小的一 個發(fā)送給下游,window(1)算子比較Watermark 29和Watermark 14,選擇Watermark 14作為算子當(dāng)前Watermark,并將Watermark 14發(fā)往下游,window(2)算子也采用相同的邏輯

32 . 簡述什么是Flink的時間語義 ?

在流處理中,時間是一個非常核心的概念,是整個系統(tǒng)的基石。比如,我們經(jīng)常會遇到這樣的需求:給定一個時間窗口,比如一個小## 時,統(tǒng)計時間窗口的內(nèi)數(shù)據(jù)指標(biāo)。那如何界定哪些數(shù)據(jù)將進入這個窗口呢?在窗口的定義之前,首先需要確定一個應(yīng)用使用什么樣的時間語義。本文將介紹Flink的Event Time、Processing Time和Ingestion Time三種時間語義。 一、處理時間(process time) 處理時間是指的執(zhí)行操作的各個設(shè)備的時間。 對于運行在處理時間上的流程序, 所有的基于時間的操作(比如時間窗口)都是使用的設(shè)備時鐘.比如, 一個長度為1個小時的窗口將會包含設(shè)備時鐘表示的1個小時內(nèi)所有的數(shù)據(jù)。 假設(shè)應(yīng)用程序在 9:15am分啟動, 第1個小時窗口將會包含9:15am到10:00am所有的數(shù)據(jù), 然后下個窗口是10:00am-11:00am, 等等。 處理時間是最簡單時間語義, 數(shù)據(jù)流和設(shè)備之間不需要做任何的協(xié)調(diào). 他提供了最好的性能和最低的延遲。 但是, 在分布式和異步的環(huán)境下, 處理時間沒有辦法保證確定性, 容易受到數(shù)據(jù)傳遞速度的影響: 事件的延遲和亂序。 在使用窗口的時候, 如果使用處理時間, 就指定時間分配器為處理時間分配器。 二、攝取時間(ingestion time) 攝取時間是指Flink 讀取事件時記錄的時間。 Ingestion Time是事件到達Flink Souce的時間。從Source到下游各個算子中間可能有很多計算環(huán)節(jié),任何一個算子的處理速度快慢可能影響到下游算子的Processing Time。而Ingestion Time定義的是數(shù)據(jù)流最早進入Flink的時間,因此不會被算子處理速度影響。 Ingestion Time通常是Event Time和Processing Time之間的一個折中方案。 比起Event Time,Ingestion Time可以不需要設(shè)置復(fù)雜的Watermark,因此也不需要太多緩存,延遲較低。 比起Processing Time,Ingestion Time的時間是Souce賦值的,一個事件在整個處理過程從頭至尾都使用這個時間,而且后續(xù)算子不受前序算子處理速度的影響,計算結(jié)果相對準(zhǔn)確一些,但計算成本稍高。 三、事件時間(event time) 事件時間是指的這個事件發(fā)生的時間。 在event進入Flink之前, 通常被嵌入到了event中, 一般作為這個event的時間戳存在。 在事件時間體系中, 時間的進度依賴于數(shù)據(jù)本身, 和任何設(shè)備的時間無關(guān). 事件時間程序必須制定如何產(chǎn)生Event Time Watermarks(水印) . 在事件時間體系中, 水印是表示時間進度的標(biāo)志(作用就相當(dāng)于現(xiàn)實時間的時鐘)。 在理想情況下,不管事件時間何時到達或者他們的到達的順序如何, 事件時間處理將產(chǎn)生完全一致且確定的結(jié)果. 事件時間處理會在等待無序事件(遲到事件)時產(chǎn)生一定的延遲。由于只能等待有限的時間,因此這限制了確定性事件時間應(yīng)用程序的可使用性。 假設(shè)所有數(shù)據(jù)都已到達,事件時間操作將按預(yù)期方式運行,即使在處理無序或遲到的事件或重新處理歷史數(shù)據(jù)時,也會產(chǎn)生正確且一致的結(jié)果。例如,每小時事件時間窗口將包含帶有事件時間戳的所有記錄,該記錄落入該小時,無論它們到達的順序或處理時間。 注意: 在1.12之前默認的時間語義是處理時間, 從1.12開始, Flink內(nèi)部已經(jīng)把默認的語義改成了事件時間。 四、總結(jié) Event Time的優(yōu)勢是結(jié)果的可預(yù)測性,缺點是緩存較大,增加了延遲,且調(diào)試和定位問題更復(fù)雜。 Processing Time只依賴當(dāng)前執(zhí)行機器的系統(tǒng)時鐘,不需要依賴Watermark,無需緩存。Processing Time是實現(xiàn)起來非常簡單,也是延遲最小的一種時間語義;但是,在分布式和異步的環(huán)境下,Processing Time 不能提供確定性,因為它容易受到事件到達系統(tǒng)的速度(例如從消息隊列)、事件在系統(tǒng)內(nèi)操作流動的速度以及中斷的影響。 Ingestion Time通常是Event Time和Processing Time之間的一個折中方案。 在 Flink 流處理真實場景中,大部分的業(yè)務(wù)需求都會使用事件時間語義,但還是以具體的業(yè)務(wù)需求擇選不同的時間語義。

33 . 簡述Flink相比于其它流式處理框架的優(yōu)點 ?

1、同時支持高吞吐、低延遲、高性能 Flink是目前開源社區(qū)中唯一一套集高吞吐、低延遲、高性能三者于一身的分布式流式數(shù)據(jù)處理框架。Apache Spark也只能兼顧高吞吐和高性能特點,主要是因為Spark Streaming流式計算中無法做到低延遲保障;而流式計算框架Apache Storm只能支持低延遲和高性能特性,但是無法滿足高吞吐的要求。而滿足高吞吐、低延遲、高性能這三個目標(biāo)對分布式流式計算框架來說是非常重要的。 2、支持事件時間(Event Time) 在流式計算領(lǐng)域中,窗口計算的地位舉足輕重,但目前大多數(shù)框架窗口計算采用的都是系統(tǒng)時間 (Process Time),也是事件傳輸?shù)接嬎憧蚣芴幚頃r,系統(tǒng)主機的當(dāng)前時間。Flink能夠支持基于事件時間(Event Time)語義進行窗口計算,也就是使用事件產(chǎn)生的時間,這種基于事件驅(qū)動的機制使得事件即使亂序到達,流系統(tǒng)也能夠計算出精確的結(jié)果,保證了事件原本產(chǎn)生時的時序性,盡可能避免網(wǎng)絡(luò)傳輸 或硬件系統(tǒng)的影響。 3、支持有狀態(tài)計算 Flink在1.4版本中實現(xiàn)了狀態(tài)管理,所謂狀態(tài)就是在流式計算過程中將算子的中間結(jié)果數(shù)據(jù)保存在內(nèi)存或 文件系統(tǒng)中,等下一個事件進入算子后可以從之前的狀態(tài)中獲取中間結(jié)果,計算當(dāng)前的結(jié)果,從而無需 每次都基于全部的原始數(shù)據(jù)來統(tǒng)計結(jié)果,這種方式極大地提升了系統(tǒng)的性能,并降低了數(shù)據(jù)計算過程的 資源消耗。對于數(shù)據(jù)量大且運算邏輯非常復(fù)雜的流式計算場景,有狀態(tài)計算發(fā)揮了非常重要的作用。 4、支持高度靈活的窗口(Window)操作 在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,需要通過窗口的方式對流數(shù)據(jù)進行一定范圍的聚合計算,例如統(tǒng) 計在過去1分鐘內(nèi)有多少用戶點擊某一網(wǎng)頁,在這種情況下,我們必須定義一個窗口,用來收集最近一 分鐘內(nèi)的數(shù)據(jù),并對這個窗口的數(shù)據(jù)進行再計算。 Flink將窗口劃分為基于Time、Count、Session,以及Data-Driven等類型的窗口操作,窗口可以用靈活的 出發(fā)條件定制化來達到對復(fù)雜的流傳輸模式的支持,用戶可以定義不同的窗口出發(fā)機制來滿足不同的需 求。 5、基于輕量級分布式快照(CheckPoint)實現(xiàn)的容錯

Flink能夠分布式運行在上千個節(jié)點上,將一個大型計算任務(wù)的流程拆解成曉得計算過程,然后將task分 布到并行節(jié)點上處理。在任務(wù)執(zhí)行過程中,能夠自動發(fā)現(xiàn)事件處理過程中的錯誤而導(dǎo)致數(shù)據(jù)不一致的問 題,比如:節(jié)點宕機、網(wǎng)絡(luò)傳輸問題,或是由于用戶升級或修復(fù)問題而導(dǎo)致計算服務(wù)重啟等。在這些情 況下,通過基于分布式快照技術(shù)的Checkpoints,將執(zhí)行過程中的狀態(tài)信息進行持久化恢復(fù),以確保數(shù)據(jù) 在處理過程中的一致性(Exactly-Once)。 6、基于JVM實現(xiàn)獨立的內(nèi)存管理 內(nèi)存管理是所有計算框架需要重點考慮的部分,尤其對于計算量比較大的計算場景,數(shù)據(jù)在內(nèi)存中該如 何進行管理顯得至關(guān)重要。針對內(nèi)存管理,F(xiàn)Link實現(xiàn)了自身管理內(nèi)存的機制,盡可能減少JVM GC對系統(tǒng)的影響。另外,F(xiàn)Link通過序列化/反序列化方法將所有的數(shù)據(jù)對象轉(zhuǎn)換成二進制在內(nèi)存中存儲,降低 數(shù)據(jù)存儲的大小的同事,能夠更加有效地對內(nèi)存空間進行利用,降低GC帶來的性能下降或任務(wù)異常的風(fēng) 險,因此Flink較其他分布式處理的框架會顯得更加穩(wěn)定,不會因為JVM GC等問題而影響整個應(yīng)用的運行 7、Savepoints(保存點) 對于7*24小時運行的流式應(yīng)用,數(shù)據(jù)源源不斷的接入,在一段時間內(nèi)應(yīng)用的終止有可能導(dǎo)致數(shù)據(jù)的丟失 或者極端結(jié)果的不準(zhǔn)確,例如進行集群版本的升級、停機運維操作等操作。值得一提的是,F(xiàn)Link通過 Save Points技術(shù)將任務(wù)執(zhí)行的快照保存在存儲介質(zhì)上,當(dāng)任務(wù)重啟的時候可以直接從事先保存的Save Points恢復(fù)原有的計算狀態(tài),是的任務(wù)繼續(xù)按照停機之前的狀態(tài)運行,Save Points技術(shù)可以燙用戶更好地管理和運維實時流式應(yīng)用。

34 . 簡述Flink和Spark的區(qū)別?什么情況下使用Flink?有什么優(yōu)點 ?

1、Flink和Spark的區(qū)別 數(shù)據(jù)模型 Flink基本數(shù)據(jù)模型是數(shù)據(jù)流,以及事件序列。 Spark采用RDD模型,Spark Streaming的DStream實際上也就是一組組小批數(shù)據(jù)RDD的集合。運行時架構(gòu) Flink是標(biāo)準(zhǔn)的流執(zhí)行模式,一個事件在一個節(jié)點處理完后可以直接發(fā)往下一個節(jié)點進行處理。 Spark是批計算,將DAG劃分為不同的Stage,一個完成后才可以計算下一個。 2、Flink應(yīng)用場景 在實際生產(chǎn)的過程中,大量數(shù)據(jù)在不斷地產(chǎn)生,例如金融交易數(shù)據(jù)、互聯(lián)網(wǎng)訂單數(shù)據(jù)、GPS定位數(shù)據(jù)、 傳感器信號、移動終端產(chǎn)生的數(shù)據(jù)、通信信號數(shù)據(jù)等,以及我們熟悉的網(wǎng)絡(luò)流量監(jiān)控、服務(wù)器產(chǎn)生的日 志數(shù)據(jù),這些數(shù)據(jù)最大的共同點就是實時從不同的數(shù)據(jù)源中產(chǎn)生,然后再傳輸?shù)较掠蔚姆治鱿到y(tǒng)。針對 這些數(shù)據(jù)類型主要包括實時智能推薦、復(fù)雜事件處理、實時欺詐檢測、實時數(shù)倉與ETL類型、流數(shù)據(jù)分 析類型、實時報表類型等實時業(yè)務(wù)場景,而Flink對于這些類型的場景都有著非常好的支持。 1)實時智能推薦 智能推薦會根據(jù)用戶歷史的購買行為,通過推薦算法訓(xùn)練模型,預(yù)測用戶未來可能會購買的物品。對個 人來說,推薦系統(tǒng)起著信息過濾的作用,對Web/App服務(wù)端來說,推薦系統(tǒng)起著滿足用戶個性化需求, 提升用戶滿意度的作用。推薦系統(tǒng)本身也在飛速發(fā)展,除了算法越來越完善,對時延的要求也越來越苛 刻和實時化。利用Flink流計算幫助用戶構(gòu)建更加實時的智能推薦系統(tǒng),對用戶行為指標(biāo)進行實時計算, 對模型進行實時更新,對用戶指標(biāo)進行實時預(yù)測,并將預(yù)測的信息推送給Wep/App端,幫助用戶獲取想 要的商品信息,另一方面也幫助企業(yè)提升銷售額,創(chuàng)造更大的商業(yè)價值。 2)復(fù)雜事件處理

對于復(fù)雜事件處理,比較常見的案例主要集中于工業(yè)領(lǐng)域,例如對車載傳感器、機械設(shè)備等實時故障檢 測,這些業(yè)務(wù)類型通常數(shù)據(jù)量都非常大,且對數(shù)據(jù)處理的時效性要求非常高。通過利用Flink提供的 CEP(復(fù)雜事件處理)進行事件模式的抽取,同時應(yīng)用Flink的Sql進行事件數(shù)據(jù)的轉(zhuǎn)換,在流式系統(tǒng)中構(gòu) 建實時規(guī)則引擎,一旦事件觸發(fā)報警規(guī)則,便立即將告警結(jié)果傳輸至下游通知系統(tǒng),從而實現(xiàn)對設(shè)備故 障快速預(yù)警監(jiān)測,車輛狀態(tài)監(jiān)控等目的。 3)實時欺詐檢測 在金融領(lǐng)域的業(yè)務(wù)中,常常出現(xiàn)各種類型的欺詐行為,例如信用卡欺詐、信貸申請欺詐等,而如何保證 用戶和公司的資金安全,是來近年來許多金融公司及銀行共同面對的挑戰(zhàn)。隨著不法分子欺詐手段的不 斷升級,傳統(tǒng)的反欺詐手段已經(jīng)不足以解決目前所面臨的問題。以往可能需要幾個小時才能通過交易數(shù) 據(jù)計算出用戶的行為指標(biāo),然后通過規(guī)則判別出具有欺詐行為嫌疑的用戶,再進行案件調(diào)查處理,在這 種情況下資金可能早已被不法分子轉(zhuǎn)移,從而給企業(yè)和用戶造成大量的經(jīng)濟損失。而運用Flink流式計算 技術(shù)能夠在毫秒內(nèi)就完成對欺詐判斷行為指標(biāo)的計算,然后實時對交易流水進行規(guī)則判斷或者模型預(yù) 測,這樣一旦檢測出交易中存在欺詐嫌疑,則直接對交易進行實時攔截,避免因為處理不及時而導(dǎo)致的 經(jīng)濟損失。 4)實時數(shù)倉與ETL 結(jié)合離線數(shù)倉,通過利用流計算諸多優(yōu)勢和SQL靈活的加工能力,對流式數(shù)據(jù)進行實時清洗、歸并、結(jié) 構(gòu)化處理,為離線數(shù)倉進行補充和優(yōu)化。另一方面結(jié)合實時數(shù)據(jù)ETL處理能力,利用有狀態(tài)流式計算技 術(shù),可以盡可能降低企業(yè)由于在離線數(shù)據(jù)計算過程中調(diào)度邏輯的復(fù)雜度,高效快速地處理企業(yè)需要的統(tǒng) 計結(jié)果,幫助企業(yè)更好地應(yīng)用實時數(shù)據(jù)所分析出來的結(jié)果。 5)流數(shù)據(jù)分析 實時計算各類數(shù)據(jù)指標(biāo),并利用實時結(jié)果及時調(diào)整在線系統(tǒng)相關(guān)策略,在各類內(nèi)容投放、無線智能推送 領(lǐng)域有大量的應(yīng)用。流式計算技術(shù)將數(shù)據(jù)分析場景實時化,幫助企業(yè)做到實時化分析Web應(yīng)用或者App 應(yīng)用的各項指標(biāo),包括App版本分布情況、Crash檢測和分布等,同時提供多維度用戶行為分析,支持日 志自主分析,助力開發(fā)者實現(xiàn)基于大數(shù)據(jù)技術(shù)的精細化運營、提升產(chǎn)品質(zhì)量和體驗、增強用戶黏性。 6)實時報表分析 實時報表分析是近年來很多公司采用的報表統(tǒng)計方案之一,其中最主要的應(yīng)用便是實時大屏展示。利用 流式計算實時得出的結(jié)果直接被推送到前端應(yīng)用,實時顯示出重要指標(biāo)的變換情況。最典型的案例便是 淘寶的雙十一活動,每年雙十一購物節(jié),除瘋狂購物外,最引人注目的就是天貓雙十一大屏不停跳躍的 成交總額。在整個計算鏈路中包括從天貓交易下單購買到數(shù)據(jù)采集、數(shù)據(jù)計算、數(shù)據(jù)校驗,最終落到雙 十一大屏上展現(xiàn)的全鏈路時間壓縮在5秒以內(nèi),頂峰計算性能高達數(shù)三十萬筆訂單/秒,通過多條鏈路流 計算備份確保萬無一失。而在其他行業(yè),企業(yè)也在構(gòu)建自己的

35 . 簡述Flink backPressure反壓機制,指標(biāo)監(jiān)控你是怎么做的 ?

背壓是指系統(tǒng)在一個臨時負載峰值期間接收數(shù)據(jù)的速率大于其處理速率的一種場景(備注:就是處理速 度慢,接收速度快,系統(tǒng)處理不了接收的數(shù)據(jù))。許多日常情況都會導(dǎo)致背壓。例如,垃圾回收卡頓可 能導(dǎo)致流入的數(shù)據(jù)堆積起來,或者數(shù)據(jù)源可能出現(xiàn)發(fā)送數(shù)據(jù)過快的峰值。如果處理不當(dāng),背壓會導(dǎo)致資 源耗盡,甚至導(dǎo)致數(shù)據(jù)丟失。 1、Flink反壓

每個子任務(wù)都有自己的本地緩存池,收到的數(shù)據(jù)以及發(fā)出的數(shù)據(jù),都會序列化之后,放入到緩沖池里。 然后,兩個TaskManager之間,只會建立一條物理鏈路(底層使用Netty通訊),所有子任務(wù)之間的通訊,都由這條鏈路承擔(dān)。

當(dāng)任何一個子任務(wù)的發(fā)送緩存(不管是子任務(wù)自己的本地緩存,還是底層傳輸時Netty的發(fā)送緩存)耗 盡時,發(fā)送方就會被阻塞,產(chǎn)生背壓;同樣,任何任務(wù)接收數(shù)據(jù)時,如果本地緩存用完了,都會停止從底層Netty那里讀取數(shù)據(jù),這樣很快上游的數(shù)據(jù)很快就會占滿下游的底層接收緩存,從而背壓到發(fā)送 端,形成對上游所有的任務(wù)的背壓。 很顯然,這種思路有個明顯的問題,任何一個下游子任務(wù)的產(chǎn)生背壓,都會影響整條TaskManager之間 的鏈路,導(dǎo)致全鏈路所有子任務(wù)背壓。

為了解決上節(jié)的單任務(wù)背壓影響全鏈路的問題,在Flink 1.5之后,引入了Credit-based Flow Control,基于信用點的流量控制。 這種方法,首先把每個子任務(wù)的本地緩存分為兩個部分,獨占緩存(Exclusive Buuers)和浮動緩存 (Floating Buuers); 然后,獨占緩存的大小作為信用點發(fā)給數(shù)據(jù)發(fā)送方,發(fā)送方會按照不同的子任務(wù)分別記錄信用點,并發(fā) 送盡可能多數(shù)據(jù)給接收方,發(fā)送后則降低對應(yīng)信用點的大??; 當(dāng)信用點為0時,則不再發(fā)送,起到背壓的作用。在發(fā)送數(shù)據(jù)的同時,發(fā)送方還會把隊列中暫存排隊的 數(shù)據(jù)量發(fā)給接收方,接收方收到后,根據(jù)本地緩存的大小,決定是否去浮動緩存里請求更多的緩存來加 速隊列的處理,起到動態(tài)控制流量的作用。整個過程參考上圖。

通過這樣的設(shè)計,就實現(xiàn)了任務(wù)級別的背壓:任意一個任務(wù)產(chǎn)生背壓,只會影響這個任務(wù),并不會對 TaskManger上的其它任務(wù)造成影響。 2、Flink監(jiān)控指標(biāo) Flink任務(wù)提交到集群后,接下來就是對任務(wù)進行有效的監(jiān)控。Flink將任務(wù)監(jiān)控指標(biāo)主要分為系統(tǒng)指標(biāo)和 用戶指標(biāo)兩種:系統(tǒng)指標(biāo)主要包括Flink集群層面的指標(biāo),例如CPU負載,各組件內(nèi)存使用情況等;用戶 指標(biāo)主要包括用戶在任務(wù)中自定義注冊的監(jiān)控指標(biāo),用于獲取用戶的業(yè)務(wù)狀況等信息。Flink中的監(jiān)控指 標(biāo)可以通過多種方式獲取,例如可以從Flink UI中直接查看,也可以通過Rest Api或Reporter獲取

36 . 簡述Flink如何保證一致性 ?

Flink的檢查點和恢復(fù)機制定期的會保存應(yīng)用程序狀態(tài)的一致性檢查點。在故障的情況下,應(yīng)用程序的狀 態(tài)將會從最近一次完成的檢查點恢復(fù),并繼續(xù)處理。盡管如此,可以使用檢查點來重置應(yīng)用程序的狀態(tài) 無法完全達到令人滿意的一致性保證。相反,source和sink的連接器需要和Flink的檢查點和恢復(fù)機制進 行集成才能提供有意義的一致性保證。 為了給應(yīng)用程序提供恰好處理一次語義的狀態(tài)一致性保證,應(yīng)用程序的source連接器需要能夠?qū)ource 的讀位置重置到之前保存的檢查點位置。當(dāng)處理一次檢查點時,source操作符將會把source的讀位置持 久化,并在恢復(fù)的時候從這些讀位置開始重新讀取。支持讀位置的檢查點的source連接器一般來說是基 于文件的存儲系統(tǒng),如:文件流或者Kafka source(檢查點會持久化某個正在消費的topic的讀偏移 量)。如果一個應(yīng)用程序從一個無法存儲和重置讀位置的source連接器攝入數(shù)據(jù),那么當(dāng)任務(wù)出現(xiàn)故障 的時候,數(shù)據(jù)就會丟失。也就是說我們只能提供at-most-once)的一致性保證。 Fink的檢查點和恢復(fù)機制和可以重置讀位置的source連接器結(jié)合使用,可以保證應(yīng)用程序不會丟失任何 數(shù)據(jù)。盡管如此,應(yīng)用程序可能會發(fā)出兩次計算結(jié)果,因為從上一次檢查點恢復(fù)的應(yīng)用程序所計算的結(jié) 果將會被重新發(fā)送一次(一些結(jié)果已經(jīng)發(fā)送出去了,這時任務(wù)故障,然后從上一次檢查點恢復(fù),這些結(jié) 果將被重新計算一次然后發(fā)送出去)。所以,可重置讀位置的source和Flink的恢復(fù)機制不足以提供端到 端的恰好處理一次語義,即使應(yīng)用程序的狀態(tài)是恰好處理一次一致性級別。 Flink 中的一個大的特性就是exactly-once的特性,我們在一般的流處理程序中,會有三種處理語義 AT-MOST-ONCE(最多一次):當(dāng)故障發(fā)生的時候,什么都不干。就是說每條消息就只消費一次。 AT-LEAST-ONCE(至少一次):為了確保數(shù)據(jù)不丟失,確保每個時間都得到處理,一些時間可能會被 處理多次。 EXACTLY-ONCE(精確一次):每個時間都精確處理一次端到端的保證: 內(nèi)部保證— checkpoint source端—可重設(shè)數(shù)據(jù)的讀取位置 sink端—從故障恢復(fù)時,數(shù)據(jù)不會重復(fù)寫入外部系統(tǒng) Flink(checkpoint)和source端(Kafka)可以保證不出問題。但一個志在提供端到端恰好處理一次語義一致性 的應(yīng)用程序需要特殊的sink連接器。sink連接器可以在不同的情況下使用兩種技術(shù)來達到恰好處理一次 一致性語義:冪等性寫入和事務(wù)性寫入。

1、冪等性寫入 一個冪等操作無論執(zhí)行多少次都會返回同樣的結(jié)果。例如,重復(fù)的向hashmap中插入同樣的key-value對 就是冪等操作,因為頭一次插入操作之后所有的插入操作都不會改變這個hashmap,因為hashmap已經(jīng) 包含這個key-value對了。另一方面,append操作就不是冪等操作了,因為多次append同一個元素將會 導(dǎo)致列表每次都會添加一個元素。在流處理程序中,冪等寫入操作是很有意思的,因為冪等寫入操作可 以執(zhí)行多次但不改變結(jié)果。所以它們可以在某種程度上緩和Flink檢查點機制帶來的重播計算結(jié)果的效 應(yīng)。 需要注意的是,依賴于冪等性sink來達到exactly-once語義的應(yīng)用程序,必須保證在從檢查點恢復(fù)以后, 它將會覆蓋之前已經(jīng)寫入的結(jié)果。例如,一個包含有sink操作的應(yīng)用在sink到一個key-value存儲時必須 保證它能夠確定的計算出將要更新的key值。同時,從Flink程序sink到的key-value存儲中讀取數(shù)據(jù)的應(yīng) 用,在Flink從檢查點恢復(fù)的過程中,可能會看到不想看到的結(jié)果。當(dāng)重播開始時,之前已經(jīng)發(fā)出的計算 結(jié)果可能會被更早的結(jié)果所覆蓋(因為在恢復(fù)過程中)。所以,一個消費Flink程序輸出數(shù)據(jù)的應(yīng)用,可 能會觀察到時間回退,例如讀到了比之前小的計數(shù)。也就是說,當(dāng)流處理程序處于恢復(fù)過程中時,流處 理程序的結(jié)果將處于不穩(wěn)定的狀態(tài),因為一些結(jié)果被覆蓋掉,而另一些結(jié)果還沒有被覆蓋。一旦重播完 成,也就是說應(yīng)用程序已經(jīng)通過了之前出故障的點,結(jié)果將會繼續(xù)保持一致性。

2、事務(wù)性寫入 實現(xiàn)端到端的恰好處理一次一致性語義的方法基于事務(wù)性寫入。其思想是只將最近一次成功保存的檢查 點之前的計算結(jié)果寫入到外部系統(tǒng)中去。這樣就保證了在任務(wù)故障的情況下,端到端恰好處理一次語 義。應(yīng)用將被重置到最近一次的檢查點,而在這個檢查點之后并沒有向外部系統(tǒng)發(fā)出任何計算結(jié)果。通 過只有當(dāng)檢查點保存完成以后再寫入數(shù)據(jù)這種方法,事務(wù)性的方法將不會遭受冪等性寫入所遭受的重播 不一致的問題。盡管如此,事務(wù)性寫入?yún)s帶來了延遲,因為只有在檢查點完成以后,我們才能看到計算 結(jié)果。 Flink提供了兩種構(gòu)建模塊來實現(xiàn)事務(wù)性sink連接器:write-ahead-log(WAL,預(yù)寫式日志)sink和兩階段 提交sink。WAL式sink將會把所有計算結(jié)果寫入到應(yīng)用程序的狀態(tài)中,等接到檢查點完成的通知,才會將 計算結(jié)果發(fā)送到sink系統(tǒng)。因為sink操作會把數(shù)據(jù)都緩存在狀態(tài)后段,所以WAL可以使用在任何外部sink 系統(tǒng)上。盡管如此,WAL還是無法提供刀槍不入的恰好處理一次語義的保證,再加上由于要緩存數(shù)據(jù)帶 來的狀態(tài)后段的狀態(tài)大小的問題,WAL模型并不十分完美。 與之形成對比的,2PC sink需要sink系統(tǒng)提供事務(wù)的支持或者可以模擬出事務(wù)特性的模塊。對于每一個檢查點,sink開始一個事務(wù),然后將所有的接收到的數(shù)據(jù)都添加到事務(wù)中,并將這些數(shù)據(jù)寫入到sink系統(tǒng),但并沒有提交(commit)它們。當(dāng)事務(wù)接收到檢查點完成的通知時,事務(wù)將被commit,數(shù)據(jù)將被 真正的寫入sink系統(tǒng)。這項機制主要依賴于一次sink可以在檢查點完成之前開始事務(wù),并在應(yīng)用程序從 一次故障中恢復(fù)以后再commit的能力。

2PC協(xié)議依賴于Flink的檢查點機制。檢查點屏障是開始一個新的事務(wù)的通知,所有操作符自己的檢查點 成功的通知是它們可以commit的投票,而作業(yè)管理器通知一個檢查點成功的消息是commit事務(wù)的指令。于WAL sink形成對比的是,2PC sinks依賴于sink系統(tǒng)和sink本身的實現(xiàn)可以實現(xiàn)恰好處理一次語義。更多的,2PC sink不斷的將數(shù)據(jù)寫入到sink系統(tǒng)中,而WAL寫模型就會有之前所述的問題。 事務(wù)寫的方式能提供端到端的Exactly-Once一致性,它的代價也是非常明顯的,就是犧牲了延遲。輸出 數(shù)據(jù)不再是實時寫入到外部系統(tǒng),而是分批次地提交。目前來說,沒有完美的故障恢復(fù)和Exactly-Once 保障機制,對于開發(fā)者來說,需要在不同需求之間權(quán)衡

37 . 簡述Flink支持JobMaster的HA???原理是怎么樣的?

1、JobManager 高可用(HA) jobManager協(xié)調(diào)每個flink任務(wù)部署。它負責(zé)任務(wù)調(diào)度和資源管理。 默認情況下,每個flink集群只有一個JobManager,這將導(dǎo)致一個單點故障(SPOF):如果JobManager掛 了,則不能提交新的任務(wù),并且運行中的程序也會失敗。 使用JobManager HA,集群可以從JobManager故障中恢復(fù),從而避免SPOF(單點故障) 。 用戶可以在 standalone或 YARN集群 模式下,配置集群高可用。 Standalone集群的高可用 Standalone模式(獨立模式)下JobManager的高可用性的基本思想是,任何時候都有一個 Master JobManager ,并且多個Standby JobManagers 。 Standby JobManagers可以在Master JobManager 掛掉的情況下接管集群成為Master JobManager。 這樣保證了沒有單點故障,一旦某一個Standby JobManager 接管集群,程序就可以繼續(xù)運行。 Standby JobManager和Master JobManager實例之間沒有明確區(qū)別。每個JobManager都可以成為Master或Standby節(jié)點 2、Yarn 集群高可用 flink on yarn的HA 其實主要是利用yarn自己的job恢復(fù)機制

38 . 簡述如何確定Flink任務(wù)的合理并行度 ?

task的parallelism可以在Flink的不同級別上指定。 四種級別是:算子級別、執(zhí)行環(huán)境(ExecutionEnvironment)級別、客戶端(命令行)級別、配置文件 (flink-conf.yaml)級別。 每個operator、data source或者data sink都可以通過調(diào)用setParallelism()方法來指定 運行環(huán)境的默認并發(fā)數(shù)可以通過調(diào)用setParallelism()方法來指定。env.setParallelism(3);運行環(huán)境的 并發(fā)數(shù)可以被每個算子確切的并發(fā)數(shù)配置所覆蓋。 對于CLI客戶端,并發(fā)參數(shù)可以通過-p來指定 影響所有運行環(huán)境的系統(tǒng)級別的默認并發(fā)度可以在./conf/flink-conf.yaml的parallelism.defaul項中指 定。不建議。 當(dāng)然,也可以設(shè)置最大的并行度,通過調(diào)用setMaxParallelism()方法來設(shè)置最大并發(fā)度。Flink如何確定TaskManager個數(shù):Job的最大并行度除以每個TaskManager分配的任務(wù)槽數(shù)。Flink on YARN時,TaskManager的數(shù)量就是:max(parallelism) / yarnslots(向上取整)。 例如,一個最大并行度為10,每個TaskManager有兩個任務(wù)槽的作業(yè),就會啟動5個TaskManager

39 . 簡述link任務(wù)如何實現(xiàn)端到端一致 ?

Source端:數(shù)據(jù)從上游進入Flink,必須保證消息嚴(yán)格一次消費。同時Source 端必須滿足可重放 (replay)。否則 Flink 計算層收到消息后未計算,卻發(fā)生 failure 而重啟,消息就會丟失。 Flink計算層:利用 Checkpoint 機制,把狀態(tài)數(shù)據(jù)定期持久化存儲下來,F(xiàn)link程序一旦發(fā)生故障的時候,可以選擇狀態(tài)點恢復(fù),避免數(shù)據(jù)的丟失、重復(fù)。

Sink端:Flink將處理完的數(shù)據(jù)發(fā)送到Sink端時,通過 兩階段提交協(xié)議 ,即 TwoPhaseCommitSinkFunction 函數(shù)。該 SinkFunction 提取并封裝了兩階段提交協(xié)議中的公共邏輯,保證Flink 發(fā)送Sink端時實現(xiàn)嚴(yán)格一次處理語義。 同時:Sink端必須支持事務(wù)機制,能夠進行數(shù)據(jù)回滾或者滿足冪等性。 回滾機制:即當(dāng)作業(yè)失敗后,能夠?qū)⒉糠謱懭氲慕Y(jié)果回滾到之前寫入的狀態(tài)。 冪等性:就是一個相同的操作,無論重復(fù)多少次,造成的結(jié)果和只操作一次相等。即當(dāng)作業(yè)失敗后,寫 入部分結(jié)果,但是當(dāng)重新寫入全部結(jié)果時,不會帶來負面結(jié)果,重復(fù)寫入不會帶來錯誤結(jié)果

40 . 簡述Flink如何處理背(反)壓 ?

1、什么原因?qū)е卤硥海?流系統(tǒng)中消息的處理速度跟不上消息的發(fā)送速度,導(dǎo)致消息的堆積。如果系統(tǒng)能感知消息堆積,并調(diào)整 消息發(fā)送的速度,使消息的處理速度和發(fā)送速度相協(xié)調(diào)就是有背壓感知的系統(tǒng)。 背壓如果不能得到正確地處理,可能會導(dǎo)致資源被耗盡或者甚至出現(xiàn)更糟的情況導(dǎo)致數(shù)據(jù)丟失。flink就 是一個有背壓感知的基于流的分布式消息處理系統(tǒng)。 消息發(fā)送的太快,消息接受的太慢,產(chǎn)生消息擁堵。發(fā)生消息擁堵后,系統(tǒng)會自動降低消息發(fā)送的速度。

舉例說明: 正常情況下:消息處理速度>=消息的發(fā)送速度,不會發(fā)送消息擁堵,系統(tǒng)運行流暢 異常情況下:消息處理速度<消息的發(fā)送速度,發(fā)生消息堵塞,系統(tǒng)運行不流暢 消息擁堵可以采用兩種方案: 將擁堵的消息直接刪除,將導(dǎo)致數(shù)據(jù)丟失,在精確度要求高的場景非常不合適。

將擁堵的消息緩存起來,并告知消息發(fā)送者減緩消息發(fā)送的速度。 將消息緩存起來,并將緩沖區(qū)持久化,以方便在處理失敗的情況下進行數(shù)據(jù)重復(fù)。有些source本身提供 持久化機制,可以優(yōu)先考慮。例如Kafka就是一個很不錯的選擇,可以背壓從sink到source的整個pipeline,同時對source進行限流來適配整合pipeline中最慢組件的速度,從而獲得系統(tǒng)的穩(wěn)定狀態(tài)。

2、Flink中的背壓 Flink 沒有使用任何復(fù)雜的機制來解決背壓問題,因為根本不需要那樣的方案!它利用自身作為純數(shù)據(jù)流引擎的優(yōu)勢來優(yōu)雅地響應(yīng)背壓問題。 Flink 在運行時主要由 operators 和 streams 兩大組件構(gòu)成。每個 operator 會消費中間態(tài)的流,并在流上進行轉(zhuǎn)換,然后生成新的流。對于 Flink 的網(wǎng)絡(luò)機制一種形象的類比是,F(xiàn)link 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。使用 BlockingQueue 的話,一個較慢的接受者會降低發(fā)送者的發(fā)送速率,因為一旦隊列滿了(有界隊列)發(fā)送者會被阻塞。Flink 解決背壓的方案就是這種感覺。 在 Flink 中,這些分布式阻塞隊列就是這些邏輯流,而隊列容量是通過緩沖池來(LocalBuuerPool)實現(xiàn)的。每個被生產(chǎn)和被消費的流都會被分配一個緩沖池。緩沖池管理著一組緩沖(Buuer),緩沖在被消費后 可以被回收循環(huán)利用。這很好理解:你從池子中拿走一個緩沖,填上數(shù)據(jù),在數(shù)據(jù)消費完之后,又把緩 沖還給池子,之后你可以再次使用它。 3、網(wǎng)絡(luò)傳輸中的內(nèi)存管理 在解釋 Flink 的反壓原理之前,我們必須先對 Flink 中網(wǎng)絡(luò)傳輸?shù)膬?nèi)存管理有個了解。 如下圖所示展示了 Flink 在網(wǎng)絡(luò)傳輸場景下的內(nèi)存管理。網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)會寫到 Task 的InputGate(IG) 中,經(jīng)過 Task 的處理后,再由 Task 寫到 ResultPartition(RS) 中。每個 Task 都包括了輸入和輸入,輸入和輸出的數(shù)據(jù)存在 Buuer 中(都是字節(jié)數(shù)據(jù))。Buuer 是 MemorySegment 的包裝 類。 1) TaskManager(TM)在啟動時,會先初始化NetworkEnvironment對象,TM 中所有與網(wǎng)絡(luò)相關(guān)的東西都由該類來管理(如 Netty 連接),其中就包括NetworkBuuerPool。根據(jù)配置,F(xiàn)link 會在 NetworkBuuerPool 中生成一定數(shù)量(默認2048)的內(nèi)存塊 MemorySegment,內(nèi)存塊的總數(shù)量就代表了網(wǎng)絡(luò)傳輸中所有可用的內(nèi)存。NetworkEnvironment 和 NetworkBuuerPool 是 Task 之間共享的,每個 TM 只會實例化一個。 2) Task 線程啟動時,會向 NetworkEnvironment 注冊,NetworkEnvironment 會為 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創(chuàng)建一個 LocalBuuerPool(緩沖池)并設(shè)置可申請的 MemorySegment(內(nèi)存塊)數(shù)量。IG 對應(yīng)的緩沖池初始的內(nèi)存塊數(shù)量與 IG 中 InputChannel 數(shù)量一致, RP 對應(yīng)的緩沖池初始的內(nèi)存塊數(shù)量與 RP 中的 ResultSubpartition 數(shù)量一致。不過,每當(dāng)創(chuàng)建或銷毀緩沖池時,NetworkBuuerPool 會計算剩余空閑的內(nèi)存塊數(shù)量,并平均分配給已創(chuàng)建的緩沖池。注意,這個過程只是指定了緩沖池所能使用的內(nèi)存塊數(shù)量,并沒有真正分配內(nèi)存塊,只有當(dāng)需要時才分配。為什么 ## 要動態(tài)地為緩沖池擴容呢?因為內(nèi)存越多,意味著系統(tǒng)可以更輕松地應(yīng)對瞬時壓力(如GC),不會頻繁 地進入反壓狀態(tài),所以我們要利用起那部分閑置的內(nèi)存塊。 3)在 Task 線程執(zhí)行過程中,當(dāng) Netty 接收端收到數(shù)據(jù)時,為了將 Netty 中的數(shù)據(jù)拷貝到 Task 中, InputChannel(實際是 RemoteInputChannel)會向其對應(yīng)的緩沖池申請內(nèi)存塊(上圖中的①)。如果緩沖池中也沒有可用的內(nèi)存塊且已申請的數(shù)量還沒到池子上限,則會向 NetworkBuuerPool 申請內(nèi)存塊 (上圖中的②)并交給 InputChannel 填上數(shù)據(jù)(上圖中的③和④)。如果緩沖池已申請的數(shù)量達到上限了呢?或者 ## NetworkBuuerPool 也沒有可用內(nèi)存塊了呢?這時候,Task 的 Netty Channel 會暫停讀取,上游的發(fā)送端會立即響應(yīng)停止發(fā)送,拓撲會進入反壓狀態(tài)。當(dāng) Task 線程寫數(shù)據(jù)到 ResultPartition 時,也會向緩沖池請求內(nèi)存塊,如果沒有可用內(nèi)存塊時,會阻塞在請求內(nèi)存塊的地方,達到暫停寫入的目的。 4)當(dāng)一個內(nèi)存塊被消費完成之后(在輸入端是指內(nèi)存塊中的字節(jié)被反序列化成對象了,在輸出端是指 內(nèi)存塊中的字節(jié)寫入到 Netty Channel 了),會調(diào)用 Buuer.recycle() 方法,會將內(nèi)存塊還給LocalBuuerPool (上圖中的⑤)。如果LocalBuuerPool中當(dāng)前申請的數(shù)量超過了池子容量(由于上文提到的動態(tài)容量,由于新注冊的 Task 導(dǎo)致該池子容量變?。?,則LocalBuuerPool會將該內(nèi)存塊回收給NetworkBuuerPool(上圖中的⑥)。如果沒超過池子容量,則會繼續(xù)留在池子中,減少反復(fù)申請的開 銷。 4、背壓過程 舉例說明Flink背壓的過程: 下圖有一個簡單的flow,它由兩個task組成

1)記錄A進入Flink,然后Task1處理 2) Task1處理后的結(jié)果被序列化進緩存區(qū) 3) Task2從緩存區(qū)內(nèi)讀取一些數(shù)據(jù),緩存區(qū)內(nèi)將有更多的空間 4)如果Task2處理的較慢,Task1的緩存區(qū)很快填滿,發(fā)送速度隨之下降。 不要忘記記錄能被Flink處理的前提:必須有空閑可用的緩存區(qū)(Buuer)。 結(jié)合上面兩張圖看:Task 1 在輸出端有一個相關(guān)聯(lián)的 LocalBuuerPool(稱緩沖池1),Task 2 在輸入端也有一個相關(guān)聯(lián)的 LocalBuuerPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buuer 來序列化記錄“A”,我們就序列化并發(fā)送該 buuer。 這里我們需要注意兩個場景:

本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節(jié)點(TaskManager),該 buuer 可以直接交給下一個 Task。一旦 Task 2 消費了該 buuer,則該 buuer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢, 那么 buuer 回收的速度就會趕不上 Task 1 取 buuer 的速度,導(dǎo)致緩沖池1無可用的 buuer,Task 1 等待在可用的 buuer 上。最終形成 Task 1 的降速。 遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節(jié)點上,那么 buuer 會在發(fā)送到網(wǎng)絡(luò)(TCP Channel)后被回收。在接收端,會從 LocalBuuerPool 中申請 buuer,然后拷貝網(wǎng)絡(luò)中的數(shù)據(jù)到 buuer 中。如果沒有可用的 buuer,會停止從 TCP 連接中讀取數(shù)據(jù)。在輸出端,通過 Netty 的水位值機制來保證不往網(wǎng)絡(luò)中寫入太多數(shù)據(jù)(后面會說)。如果網(wǎng)絡(luò)中的數(shù)據(jù)(Netty輸出緩沖中的字節(jié)數(shù))超過了高水 位值,我們會等到其降到低水位值以下才繼續(xù)寫入數(shù)據(jù)。這保證了網(wǎng)絡(luò)中不會有太多的數(shù)據(jù)。如果接收 端停止消費網(wǎng)絡(luò)中的數(shù)據(jù)(由于接收端緩沖池沒有可用 buuer),網(wǎng)絡(luò)中的緩沖數(shù)據(jù)就會堆積,那么發(fā)送端也會暫停發(fā)送。另外,這會使得發(fā)送端的緩沖池得不到回收,writer 阻塞在向 LocalBuuerPool 請求buuer,阻塞了 writer 往 ResultSubPartition 寫數(shù)據(jù)。 這種固定大小緩沖池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得 Task 生產(chǎn)數(shù)據(jù)的速度不會快于消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數(shù)據(jù)傳輸自然地擴展到更復(fù)雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。

41 . 簡述Flink解決數(shù)據(jù)延遲的問題 ?

Flink數(shù)據(jù)延遲的原因有很多,可能是程序自身存在問題,也可能是外部因素造成的,下面列舉一些可能的原因和相應(yīng)的處理方案: 數(shù)據(jù)輸入環(huán)節(jié)問題:可能是數(shù)據(jù)來源的數(shù)據(jù)增長速度過快,導(dǎo)致flink消費者處理數(shù)據(jù)的速度跟不上數(shù)據(jù)生成的速度。解決方案:增加flink消費者的并發(fā)度,使用分區(qū)和并行流的方式來處理數(shù)據(jù),以保證消費者可以快速地處理大量的數(shù)據(jù)。 數(shù)據(jù)輸出環(huán)節(jié)問題:可能是flink消費者完成數(shù)據(jù)計算之后,輸出數(shù)據(jù)的過程速度過慢,導(dǎo)致數(shù)據(jù)延遲。解決方案:優(yōu)化輸出數(shù)據(jù)的方式,可以使用緩存和批處理的方式輸出數(shù)據(jù),以提高輸出速度。 中間處理環(huán)節(jié)問題:可能是flink計算模塊自身出現(xiàn)問題,例如程序過度消耗資源、任務(wù)堆積、程序過于復(fù)雜等。解決方案:優(yōu)化flink程序自身,去除重復(fù)代碼,盡量避免程序出現(xiàn)任務(wù)堆積、大循環(huán)等問題,并使用合適的檢測工具來監(jiān)測程序性能和運行狀態(tài)。 外部因素問題:可能是計算集群內(nèi)存不足、網(wǎng)絡(luò)問題、硬件故障等因素造成的。解決方案:根據(jù)具體情況進行調(diào)整,例如增加計算集群內(nèi)存、優(yōu)化網(wǎng)絡(luò)連接、處理硬件故障等。 總結(jié)來說,在處理flink數(shù)據(jù)延遲時,需要針對不同的具體場景確定問題所在,并進行相應(yīng)的優(yōu)化和解決方案。通過不斷優(yōu)化、調(diào)整和監(jiān)測整個flink系統(tǒng)的運行環(huán)境,可以保證flink系統(tǒng)運行的效率和準(zhǔn)確性。

使用代碼舉例 下面是使用flink Stream API實現(xiàn)基于水印(watermark)的數(shù)據(jù)延遲處理的代碼示例:

public class DataDelayAnalysisJob {

public static void main(String[] args) throws Exception {

// 創(chuàng)建 Flink 執(zhí)行環(huán)境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 從 Kafka 中讀取數(shù)據(jù) Properties properties = new Properties(); properties.setProperty(“bootstrap.servers”, “l(fā)ocalhost:9092”); properties.setProperty(“group.id”, “test”); FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(“topic-name”, new SimpleStringSchema(), properties); DataStream input = env .addSource(kafkaConsumer) .assignTimestampsAndWatermarks(new WatermarkStrategy() { @Override public WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator() { private long maxTimestamp; @Override public void onEvent(String event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { long maxOutOfOrderness = 5000; // 5 seconds output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness)); } }; } });

// 處理數(shù)據(jù)和計算 DataStream delayed = input .filter(new FilterFunction() { @Override public boolean filter(String value) { // 過濾出延遲時間超過 5s 的數(shù)據(jù) long eventTime = Long.parseLong(value.split(“\t”)[0]); long now = System.currentTimeMillis(); return now - eventTime > 5000; // 5 seconds } });

// 將延遲數(shù)據(jù)輸出到外部存儲 delayed.writeToSocket(“l(fā)ocalhost”, 9999, new SimpleStringSchema());

// 啟動 Flink 執(zhí)行環(huán)境 env.execute(“Data Delay Analysis Job”); } }

在上述代碼中,對數(shù)據(jù)進行了流式處理,并使用基于水?。╳atermark)的方式判斷數(shù)據(jù)是否存在延遲,若延遲時間超過 5s,則將該數(shù)據(jù)輸出到外部存儲并保存,以后進行分析和處理。這樣,便通過代碼實現(xiàn)了對flink數(shù)據(jù)延遲的處理方案。

42 . 簡述使用flink-client消費kafka數(shù)據(jù)還是使用flink-connector消費 ?

Flink 是通過Connector與具體的source 和 sink進行通信的

43 . 簡述如何動態(tài)修改Flink的配置,前提是Flink不能重啟 ?

1、Flink/Spark 如何實現(xiàn)動態(tài)更新作業(yè)配置 由于實時場景對可用性十分敏感,實時作業(yè)通常需要避免頻繁重啟,因此動態(tài)加載作業(yè)配置(變量)是 實時計算里十分常見的需求,比如通常復(fù)雜事件處理 (CEP) 的規(guī)則或者在線機器學(xué)習(xí)的模型。盡管常見,實現(xiàn)起來卻并沒有那么簡單,其中最難點在于如何確保節(jié)點狀態(tài)在變更期間的一致性。目前來說一 般有兩種實現(xiàn)方式: 輪詢拉取方式,即作業(yè)算子定時檢測在外部系統(tǒng)的配置是否有變更,若有則同步配置。 控制流方式,即作業(yè)除了用于計算的一個或多個普通數(shù)據(jù)流以外,還有提供一個用于改變作業(yè)算子 狀態(tài)的元數(shù)據(jù)流,也就是控制流。 輪詢拉取方式基于 pull 模式,一般實現(xiàn)是用戶在 Stateful 算子(比如 RichMap)里實現(xiàn)后臺線程定時從外部系統(tǒng)同步變量。這種方式對于一般作業(yè)或許足夠,但存在兩個缺點分別限制了作業(yè)的實時性和準(zhǔn)確性的 進一步提高:首先,輪詢總是有一定的延遲,因此變量的變更不能第一時間生效;其次,這種方式依賴 于節(jié)點本地時間來進行校準(zhǔn)。如果在同一時間有的節(jié)點已經(jīng)檢測到變更并更新狀態(tài),而有的節(jié)點還沒有 檢測到或者還未更新,就會造成短時間內(nèi)的不一致。 控制流方式基于 push 模式,變更的檢測和節(jié)點更新的一致性都由計算框架負責(zé),從用戶視角看只需要定義如何更新算子狀態(tài)并負責(zé)將控制事件丟入控制流,后續(xù)工作計算框架會自動處理??刂屏鞑煌谄?他普通數(shù)據(jù)流的地方在于控制流是以廣播形式流動的,否則在有 Keyby 或者 rebalance 等提高并行度分流的算子的情況下就無法將控制事件傳達給所有的算子。 以目前最流行的兩個實時計算框架 Spark Streaming 和 Flink 來說,前者是以類似輪詢的方式來實現(xiàn)實時作業(yè)的更新,而后者則是基于控制流的方式。

2、Spark Streaming Broadcast Variable Spark Streaming 為用戶提供了 Broadcast Varialbe,可以用于節(jié)點算子狀態(tài)的初始化和后續(xù)更新。Broacast Variable 是一組只讀的變量,它在作業(yè)初始化時由 Spark Driver 生成并廣播到每個 Executor 節(jié)點,隨后該節(jié)點的 Task 可以復(fù)用同一份變量。 Broadcast Variable 的設(shè)計初衷是為了避免大文件,比如 NLP 常用的分詞詞典,隨序列化后的作業(yè)對象一起分發(fā),造成重復(fù)分發(fā)的網(wǎng)絡(luò)資源浪費和啟動時間延長。這類文件的更新頻率是相對低的,扮演的角色 類似于只讀緩存,通過設(shè)置 TTL 來定時更新,緩存過期之后 Executor 節(jié)點會重新向 Driver 請求最新的變量。 Broadcast Variable 并不是從設(shè)計理念上就支持低延遲的作業(yè)狀態(tài)更新,因此用戶想出了不少 Hack 的方法,其中最為常見的方式是:一方面在 Driver 實現(xiàn)后臺線程不斷更新 Broadcast Variavle,另一方面在作業(yè)運行時通過顯式地刪除 Broadcast Variable 來迫使 Executor 重新從 Driver 拉取最新的 Broadcast Variable。這個過程會發(fā)生在兩個 micro batch 計算之間,以確保每個 micro batch 計算過程中狀態(tài)是一致的。 比起用戶在算子內(nèi)訪問外部系統(tǒng)實現(xiàn)更新變量,這種方式的優(yōu)點在于一致性更有保證。因為 Broadcast Variable 是統(tǒng)一由 Driver 更新并推到 Executor 的,這就保證不同節(jié)點的更新時間是一致的。然而相對地,缺點是會給 Driver 帶來比較大的負擔(dān),因為需要不斷分發(fā)全量的 Broadcast Variable (試想下一個巨大的 Map,每次只會更新少數(shù) Entry,卻要整個 Map 重新分發(fā))。在 Spark 2.0 版本以后,Broadcast Variable 的分發(fā)已經(jīng)從 Driver 單點改為基于 BitTorrent 的 P2P 分發(fā),這一定程度上緩解了隨著集群規(guī)模提升 Driver 分發(fā)變量的壓力,但我個人對這種方式能支持到多大規(guī)模的部署還是持懷疑態(tài)度。另外一點是重新分發(fā) Broadcast Variable 需要阻塞作業(yè)進行,這也會使作業(yè)的吞吐量和延遲受到比較大的影響。

3、Flink Broadcast State & Stream Broadcast Stream 是 Flink 1.5.0 發(fā)布的新特性,基于控制流的方式實現(xiàn)了實時作業(yè)的狀態(tài)更新。Broadcast Stream 的創(chuàng)建方式與普通數(shù)據(jù)流相同,例如從 Kafka Topic 讀取,特別之處在于它承載的是控制事件流,會以廣播形式將數(shù)據(jù)發(fā)給下游算子的每個實例。Broadcast Stream 需要在作業(yè)拓撲的某個節(jié)點和普通數(shù)據(jù)流 (Main Stream) join 到一起。 該節(jié)點的算子需要同時處理普通數(shù)據(jù)流和控制流:一方面它需要讀取控制流以更新本地狀態(tài) (Broadcast State),另外一方面需要讀取 Main Stream 并根據(jù) Broadcast State 來進行數(shù)據(jù)轉(zhuǎn)換。由于每個算子實例讀到的控制流都是相同的,它們生成的 Broadcast State 也是相同的,從而達到通過控制消息來更新所有算子實例的效果。 目前 Flink 的 Broadcast Stream 從效果上實現(xiàn)了控制流的作業(yè)狀態(tài)更新,不過在編程模型上有點和一般直覺不同。原因主要在于 Flink 對控制流的處理方式和普通數(shù)據(jù)流保持了一致,最為明顯的一點是控制流除了改變本地 State 還可以產(chǎn)生 output,這很大程度上影響了 Broadcast Stream 的使用方式。Broadcast Stream 的使用方式與普通的 DataStream 差別比較大,即需要和 DataStream 連接成為BroadcastConnectedStream 后,再通過特殊的 BroadcastProcessFunction 來處理,而 BroadcastProcessFunction 目前只支持 類似于 RichCoFlatMap 效果的操作。RichCoFlatMap 可以間接實現(xiàn)對 Main Stream 的 Map 轉(zhuǎn)換(返回一只有一個元素的集合)和 Filter 轉(zhuǎn)換(返回空集合),但無法實現(xiàn) Window 類計算。這意味著如果用戶希望改變 Window 算子的狀態(tài),那么需要將狀態(tài)管理提前到上游的BroadcastProcessFunction,然后再通過 BroadcastProcessFunction 的輸出來將影響下游 Window 算子的行為。

4、總結(jié) 實時作業(yè)運行時動態(tài)加載變量可以令大大提升實時作業(yè)的靈活性和適應(yīng)更多應(yīng)用場景,目前無論是 Flink 還是 Spark Streaming 對動態(tài)加載變量的支持都不是特別完美。Spark Streaming 受限于 Micro Batch 的計算模型(雖然現(xiàn)在 2.3 版本引入 Continuous Streaming 來支持流式處理,但離成熟還需要一定時間), 將作業(yè)變量作為一致性和實時性要求相對低的節(jié)點本地緩存,并不支持低延遲地、低成本地更新作業(yè)變 量。Flink 將變量更新視為特殊的控制事件流,符合 Even Driven 的流式計算框架定位,目前在業(yè)界已有比較成熟的應(yīng)用。不過美中不足的是編程模型的易用性上有提高空間:控制流目前只能用于和數(shù)據(jù)流的join,這意味著下游節(jié)點無法繼續(xù)訪問控制流或者需要把控制流數(shù)據(jù)插入到數(shù)據(jù)流中(這種方式并不優(yōu) 雅),從而降低了編程模型的靈活性。最好的情況是大部分的算子都可以被拓展為具有BroadcastOperator,就像 RichFunction 一樣,它們可以接收一個數(shù)據(jù)流和一個至多個控制流,并維護對應(yīng)的 BroadcastState,這樣控制流的接入成本將顯著下降。

44 . 簡述什么是Flink流批一體 ?

在大數(shù)據(jù)處理計算領(lǐng)域,有離線計算和實時計算兩種模式。一般都是用mapreduce / hive / sparkSQL來處理離線場景,用 sparkStreaming / flink處理實時場景,但是這種lambda架構(gòu)會導(dǎo)致一個問題:進行更改時要同時更改兩套代碼,進行同步。 flink流批一體橫空處理,為大數(shù)據(jù)處理帶來了一套新的解決方案。 雙11中Flink流批一體開始在阿里最核心的數(shù)據(jù)業(yè)務(wù)場景嶄露頭角,并扛住了40億/秒的實時計算峰值。

其實流批一體的技術(shù)里面最早提出于2015年,它的初衷是讓大數(shù)據(jù)開發(fā)人員能夠用同一套接口實現(xiàn)大數(shù) 據(jù)的流計算和批計算,進而保證處理過程與結(jié)果的一致性。spark、flink都陸續(xù)提出了自己的解決方案。 雖然spark是最早提出流批一體理念的計算引擎之一,但其本質(zhì)還是用批來實現(xiàn)流,用的是微批次的思 想,有秒級的延遲,而且無法正確處理時間語義(數(shù)據(jù)在分布式傳輸過程中順序發(fā)生改變,先生產(chǎn)的數(shù) 據(jù)反而后到,導(dǎo)致計算不準(zhǔn)確的一種現(xiàn)象),所以難以滿足復(fù)雜、大規(guī)模的實時計算場景,遲遲無法落 地。而2019年阿里收購flink后,投入大量研發(fā)力量,同時公司也面臨離線和實時數(shù)據(jù)統(tǒng)計口徑不一致的 問題,影響廣告、商務(wù)甚至是公司的運行決策,業(yè)務(wù)的迫切要求,技術(shù)力量的不斷加入,都促進了flink 向流批一體的發(fā)展。

在流處理引擎之上,F(xiàn)link 有以下機制: 檢查點機制和狀態(tài)機制:用于實現(xiàn)容錯、有狀態(tài)的處理; 水印機制:用于實現(xiàn)事件時鐘; 窗口和觸發(fā)器:用于限制計算范圍,并定義呈現(xiàn)結(jié)果的時間。 在同一個流處理引擎之上,F(xiàn)link 還存在另一套機制,用于實現(xiàn)高效的批處理。 用于調(diào)度和恢復(fù)的回溯法:由 Microsoh Dryad 引入,現(xiàn)在幾乎用于所有批處理器; 用于散列和排序的特殊內(nèi)存數(shù)據(jù)結(jié)構(gòu):可以在需要時,將一部分?jǐn)?shù)據(jù)從內(nèi)存溢出到硬盤上; 優(yōu)化器:盡可能地縮短生成結(jié)果的時間。 兩套機制分別對應(yīng)各自的API(DataStream API 和 DataSet API);在創(chuàng)建 Flink 作業(yè)時,并不能通過將兩者混合在一起來同時 利用 Flink 的所有功能。 在最新的版本中,F(xiàn)link 支持兩種關(guān)系型的 API,Table API 和 SQL。這兩個 API 都是批處理和流處理統(tǒng)一的 API,這意味著在無邊界的實時數(shù)據(jù)流和有邊界的歷史記錄數(shù)據(jù)流上,關(guān)系型 API 會以相同的語義執(zhí)行查詢,并產(chǎn)生相同的結(jié)果。Table API 和 SQL 借助了 Apache Calcite 來進行查詢的解析,校驗以及優(yōu)化。它們可以與 DataStream 和 DataSet API 無縫集成,并支持用戶自定義的標(biāo)量函數(shù),聚合函數(shù)以及表值函數(shù)。 Table API / SQL 正在以流批統(tǒng)一的方式成為分析型用例的主要 API。 DataStream API 是數(shù)據(jù)驅(qū)動應(yīng)用程序和數(shù)據(jù)管道的主要API。 從長遠來看,DataStream API應(yīng)該通過有界數(shù)據(jù)流完全包含DataSet API

45 . 簡述什么是Flink的check和barrier ?

1、Flink Check Flink Check 是 Apache Flink 的一個基于屬性的測試庫,它擴展了 ScalaCheck 的線性時序邏輯運算符,適用于測試 Flink 數(shù)據(jù)流轉(zhuǎn)換。 基于屬性的測試(PBT)是一種自動的黑盒測試技術(shù),它通過生成隨機輸入并檢查獲得的輸出是否滿足給定 的屬性來測試功能。 Flink Check 提供了一個有邊界的時間邏輯,用于生成函數(shù)的輸入和聲明屬性。這個邏輯是為流媒體系統(tǒng)設(shè)計的,它允許用戶定義流如何隨時間變化,以及哪些屬性應(yīng)該驗證相應(yīng)的輸出。 Flink Check隨機生成指定數(shù)量的有限輸入流前綴,并對Flink運行時產(chǎn)生的輸出流進行評估。 Flink Check 是基于 sscheck,這是Spark 的一個基于屬性的測試庫,所以它依賴于 sscheck-core 項目,其中包含了 sscheck 和 Flink Check 共同的代碼,特別是系統(tǒng)所基于的 LTLss 邏輯的實現(xiàn)。 LTLss 是一種有限字的離散時間線性時序邏輯,在 Spark Streaming 的 Property-based testing for Spark Streaming 的論文中有詳細介紹。 2、barrier Flink提供了容錯機制,能夠在應(yīng)用失敗的時候重新恢復(fù)任務(wù)。這個機制主要就是通過持續(xù)產(chǎn)生快照的方 式實現(xiàn)的。Flink快照主要包括兩部分?jǐn)?shù)據(jù)一部分是數(shù)據(jù)流的數(shù)據(jù),另一部分是operator的狀態(tài)數(shù)據(jù)。對 應(yīng)的快照機制的實現(xiàn)有主要兩個部分組成,一個是屏障(Barrier),一個是狀態(tài)(State)。 Flink 分布式快照里面的一個核心的元素就是流屏障(stream barrier)。這些屏障會被插入(injected)到數(shù)據(jù)流中,并作為數(shù)據(jù)流的一部分隨著數(shù)據(jù)流動。屏障并不會持有任何數(shù)據(jù),而是和數(shù)據(jù)一樣線性的流 動。可以看到屏障將數(shù)據(jù)流分成了兩部分?jǐn)?shù)據(jù)(實際上是多個連續(xù)的部分),一部分是當(dāng)前快照的數(shù) 據(jù),一部分下一個快照的數(shù)據(jù)。每個屏障會帶有它的快照ID。這個快照的數(shù)據(jù)都在這個屏障的前面。從 圖上看,數(shù)據(jù)是從左向右移動(右邊的先進入系統(tǒng)),那么快照n包含的數(shù)據(jù)就是右側(cè)到下一個屏障 (n-1)截止的數(shù)據(jù),圖中兩個灰色豎線之間的部分,也就是part of checkpoint n。另外屏障并不會打斷數(shù)的流動,因而屏障是非常輕量的。在同一個時刻,多個快照可以在同一個數(shù)據(jù)流中,這也就是說多個快 照可以同時產(chǎn)生。

46 . 簡述Flink狀態(tài)機制 ?

一、前言 有狀態(tài)的計算是流處理框架要實現(xiàn)的重要功能,因為稍復(fù)雜的流處理場景都需要記錄狀態(tài),然后在新流入數(shù)據(jù)的基礎(chǔ)上不斷更新狀態(tài)。下面的幾個場景都需要使用流處理的狀態(tài)功能:

數(shù)據(jù)流中的數(shù)據(jù)有重復(fù),想對重復(fù)數(shù)據(jù)去重,需要記錄哪些數(shù)據(jù)已經(jīng)流入過應(yīng)用,當(dāng)新數(shù)據(jù)流入時,根據(jù)已流入過的數(shù)據(jù)來判斷去重。 檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態(tài)的形式緩存下來。比如,判斷一個溫度傳感器數(shù)據(jù)流中的溫度是否在持續(xù)上升。 對一個時間窗口內(nèi)的數(shù)據(jù)進行聚合分析,分析一個小時內(nèi)某項指標(biāo)的75分位或99分位的數(shù)值。 一個狀態(tài)更新和獲取的流程如下圖所示,一個算子子任務(wù)接收輸入流,獲取對應(yīng)的狀態(tài),根據(jù)新的計算結(jié)果更新狀態(tài)。一個簡單的例子是對一個時間窗口內(nèi)輸入流的某個整數(shù)字段求和,那么當(dāng)算子子任務(wù)接收到新元素時,會獲取已經(jīng)存儲在狀態(tài)中的數(shù)值,然后將當(dāng)前輸入加到狀態(tài)上,并將狀態(tài)數(shù)據(jù)更新。

二、狀態(tài)類型 Flink有兩種基本類型的狀態(tài):托管狀態(tài)(Managed State)和原生狀態(tài)(Raw State)。

兩者的區(qū)別:Managed State是由Flink管理的,F(xiàn)link幫忙存儲、恢復(fù)和優(yōu)化,Raw State是開發(fā)者自己管理的,需要自己序列化。

具體區(qū)別有:

從狀態(tài)管理的方式上來說,Managed State由Flink Runtime托管,狀態(tài)是自動存儲、自動恢復(fù)的,F(xiàn)link在存儲管理和持久化上做了一些優(yōu)化。當(dāng)橫向伸縮,或者說修改Flink應(yīng)用的并行度時,狀態(tài)也能自動重新分布到多個并行實例上。Raw State是用戶自定義的狀態(tài)。 從狀態(tài)的數(shù)據(jù)結(jié)構(gòu)上來說,Managed State支持了一系列常見的數(shù)據(jù)結(jié)構(gòu),如ValueState、ListState、MapState等。Raw State只支持字節(jié),任何上層數(shù)據(jù)結(jié)構(gòu)需要序列化為字節(jié)數(shù)組。使用時,需要用戶自己序列化,以非常底層的字節(jié)數(shù)組形式存儲,F(xiàn)link并不知道存儲的是什么樣的數(shù)據(jù)結(jié)構(gòu)。 從具體使用場景來說,絕大多數(shù)的算子都可以通過繼承Rich函數(shù)類或其他提供好的接口類,在里面使用Managed State。Raw State是在已有算子和Managed State不夠用時,用戶自定義算子時使用。 對Managed State繼續(xù)細分,它又有兩種類型:Keyed State(鍵控狀態(tài))和Operator State(算子狀態(tài))。

為了自定義Flink的算子,可以重寫Rich Function接口類,比如RichFlatMapFunction。使用Keyed State時,通過重寫Rich Function接口類,在里面創(chuàng)建和訪問狀態(tài)。對于Operator State,還需進一步實現(xiàn)CheckpointedFunction接口。

2.1、Keyed State Flink 為每個鍵值維護一個狀態(tài)實例,并將具有相同鍵的所有數(shù)據(jù),都分區(qū)到同一個算子任務(wù)中,這個任務(wù)會維護和處理這個key對應(yīng)的狀態(tài)。當(dāng)任務(wù)處理一條數(shù)據(jù)時,它會自動將狀態(tài)的訪問范圍限定為當(dāng)前數(shù)據(jù)的key。因此,具有相同key的所有數(shù)據(jù)都會訪問相同的狀態(tài)。

需要注意的是鍵控狀態(tài)只能在 KeyedStream 上進行使用,可以通過 stream.keyBy(…) 來得到 KeyedStream(鍵控流) 。

Flink 提供了以下數(shù)據(jù)格式來管理和存儲鍵控狀態(tài) (Keyed State):

ValueState:存儲單值類型的狀態(tài)??梢允褂?update(T)進行更新,并通過 T value()進行檢索。 ListState:存儲列表類型的狀態(tài)??梢允褂?add(T) 或 addAll(List) 添加元素;并通過 get() 獲得整個列表。 ReducingState:用于存儲經(jīng)過 ReduceFunction 計算后的結(jié)果,使用 add(T) 增加元素。 AggregatingState:用于存儲經(jīng)過 AggregatingState 計算后的結(jié)果,使用 add(IN) 添加元素。 FoldingState:已被標(biāo)識為廢棄,會在未來版本中移除,官方推薦使用 AggregatingState 代替。 MapState:維護 Map 類型的狀態(tài)。 假設(shè)我們正在開發(fā)一個監(jiān)控系統(tǒng),當(dāng)監(jiān)控數(shù)據(jù)超過閾值一定次數(shù)后,需要發(fā)出報警信息:

import java.util

import org.apache.commons.compress.utils.Lists import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector

/** ?@author w1992wishes 2020/7/20 19:45 */ class ThresholdWarning(threshold: Long, numberOfTimes: Int) extends RichFlatMapFunction[(String, Long), (String, util.ArrayList[Long])] {

// 通過ListState來存儲非正常數(shù)據(jù)的狀態(tài) private var abnormalData: ListState[Long] = _

override def open(parameters: Configuration): Unit = { // 創(chuàng)建StateDescriptor val abnormalDataStateDescriptor = new ListStateDescriptor[Long](“abnormalData”, classOf[Long]) // 通過狀態(tài)名稱(句柄)獲取狀態(tài)實例,如果不存在則會自動創(chuàng)建 abnormalData = getRuntimeContext.getListState(abnormalDataStateDescriptor) }

override def flatMap(value: (String, Long), out: Collector[(String, util.ArrayList[Long])]): Unit = { val inputValue = value._2 // 如果輸入值超過閾值,則記錄該次不正常的數(shù)據(jù)信息 if (inputValue >= threshold) abnormalData.add(inputValue) val list = Lists.newArrayList(abnormalData.get.iterator) // 如果不正常的數(shù)據(jù)出現(xiàn)達到一定次數(shù),則輸出報警信息 if (list.size >= numberOfTimes) { out.collect((value._1 + " 超過指定閾值 ", list)) // 報警信息輸出后,清空狀態(tài) abnormalData.clear() } } }

object KeyedStateDetailTest extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment val dataStreamSource = env.fromElements( (“a”, 50L), (“a”, 80L), (“a”, 400L), (“a”, 100L), (“a”, 200L), (“a”, 200L), (“b”, 100L), (“b”, 200L), (“b”, 200L), (“b”, 500L), (“b”, 600L), (“b”, 700L))

dataStreamSource .keyBy(_._1) .flatMap(new ThresholdWarning(100L, 3)) // 超過100的閾值3次后就進行報警 .printToErr() env.execute(“Managed Keyed State”) } 2.2、Operator State Operator State可以用在所有算子上,每個算子子任務(wù)或者說每個算子實例共享一個狀態(tài),流入這個算子子任務(wù)的數(shù)據(jù)可以訪問和更新這個狀態(tài)。

算子狀態(tài)不能由相同或不同算子的另一個實例訪問。

Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu):

ListState:存儲列表類型的狀態(tài)。 UnionListState:存儲列表類型的狀態(tài),與 ListState 的區(qū)別在于:如果并行度發(fā)生變化,ListState 會將該算子的所有并發(fā)的狀態(tài)實例進行匯總,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實例匯總起來,具體的劃分行為則由用戶進行定義。 BroadcastState:用于廣播的算子狀態(tài)。如果一個算子有多項任務(wù),而它的每項任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)。 假設(shè)此時不需要區(qū)分監(jiān)控數(shù)據(jù)的類型,只要有監(jiān)控數(shù)據(jù)超過閾值并達到指定的次數(shù)后,就進行報警: import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer import scala.collection.JavaConversions._

object OperatorStateDetail extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment // 開啟檢查點機制 env.enableCheckpointing(1000) env.setParallelism(1) // 設(shè)置并行度為1 val dataStreamSource = env.fromElements( (“a”, 50L), (“a”, 80L), (“a”, 400L), (“a”, 100L), (“a”, 200L), (“a”, 200L), (“b”, 100L), (“b”, 200L), (“b”, 200L), (“b”, 500L), (“b”, 600L), (“b”, 700L)) dataStreamSource .flatMap(new OperatorStateDetailThresholdWarning(100L, 3)) .printToErr() env.execute(“Managed Operator State”) }

class OperatorStateDetailThresholdWarning(threshold: Long, numberOfTimes: Int) extends RichFlatMapFunction[(String, Long), (String, ListBuffer[(String, Long)])] with CheckpointedFunction {

// 正常數(shù)據(jù)緩存 private var bufferedData: ListBuffer[(String, Long)] = ListBuffer(String, Long)

// checkPointedState private var checkPointedState: ListState[(String, Long)] = _

override def flatMap(value: (String, Long), out: Collector[(String, ListBuffer[(String, Long)])]): Unit = { val inputValue = value._2 // 超過閾值則進行記錄 if (inputValue >= threshold) { bufferedData += value } // 超過指定次數(shù)則輸出報警信息 if (bufferedData.size >= numberOfTimes) { // 順便輸出狀態(tài)實例的hashcode out.collect((checkPointedState.hashCode() + “閾值警報!”, bufferedData)) bufferedData.clear() } }

override def snapshotState(context: FunctionSnapshotContext): Unit = { // 在進行快照時,將數(shù)據(jù)存儲到checkPointedState checkPointedState.clear() for (element <- bufferedData) { checkPointedState.add(element) } }

override def initializeState(context: FunctionInitializationContext): Unit = { // 注冊ListStateDescriptor val descriptor = new ListStateDescriptor[(String, Long)]( “buffered-abnormalData”, TypeInformation.of(new TypeHint(String, Long) {}) )

// 從FunctionInitializationContext中獲取OperatorStateStore,進而獲取ListState checkPointedState = context.getOperatorStateStore.getListState(descriptor)

// 如果是作業(yè)重啟,讀取存儲中的狀態(tài)數(shù)據(jù)并填充到本地緩存中 if (context.isRestored) { for (element <- checkPointedState.get()) { bufferedData += element } } } }

三、狀態(tài)橫向擴展 狀態(tài)的橫向擴展問題主要是指修改Flink應(yīng)用的并行度,確切的說,每個算子的并行實例數(shù)或算子子任務(wù)數(shù)發(fā)生了變化,應(yīng)用需要關(guān)?;騿右恍┧阕幼尤蝿?wù),某份在原來某個算子子任務(wù)上的狀態(tài)數(shù)據(jù)需要平滑更新到新的算子子任務(wù)上。

Flink的Checkpoint就是一個非常好的在各算子間遷移狀態(tài)數(shù)據(jù)的機制。算子的本地狀態(tài)將數(shù)據(jù)生成快照(snapshot),保存到分布式存儲(如HDFS)上。橫向伸縮后,算子子任務(wù)個數(shù)變化,子任務(wù)重啟,相應(yīng)的狀態(tài)從分布式存儲上重建(restore)。

對于Keyed State和Operator State這兩種狀態(tài),他們的橫向伸縮機制不太相同。由于每個Keyed State總是與某個Key相對應(yīng),當(dāng)橫向伸縮時,Key總會被自動分配到某個算子子任務(wù)上,因此Keyed State會自動在多個并行子任務(wù)之間遷移。對于一個非KeyedStream,流入算子子任務(wù)的數(shù)據(jù)可能會隨著并行度的改變而改變。如上圖所示,假如一個應(yīng)用的并行度原來為2,那么數(shù)據(jù)會被分成兩份并行地流入兩個算子子任務(wù),每個算子子任務(wù)有一份自己的狀態(tài),當(dāng)并行度改為3時,數(shù)據(jù)流被拆成3支,或者并行度改為1,數(shù)據(jù)流合并為1支,此時狀態(tài)的存儲也相應(yīng)發(fā)生了變化。對于橫向伸縮問題,Operator State有兩種狀態(tài)分配方式:一種是均勻分配,另一種是將所有狀態(tài)合并,再分發(fā)給每個實例上。

四、檢查點機制 為了使 Flink 的狀態(tài)具有良好的容錯性,F(xiàn)link 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,F(xiàn)link 定期在數(shù)據(jù)流上生成 checkpoint barrier ,當(dāng)某個算子收到 barrier 時,即會基于當(dāng)前狀態(tài)生成一份快照,然后再將該 barrier 傳遞到下游算子,下游算子接收到該 barrier 后,也基于當(dāng)前狀態(tài)生成一份快照,依次傳遞直至到最后的 Sink 算子上。當(dāng)出現(xiàn)異常后,F(xiàn)link 就可以根據(jù)最近的一次的快照數(shù)據(jù)將所有算子恢復(fù)到先前的狀態(tài)。

4.1、開啟檢查點 默認情況下 checkpoint 是禁用的。通過調(diào)用 StreamExecutionEnvironment 的 enableCheckpointing(n)來啟用 checkpoint,里面的 n 是進行 checkpoint 的間隔,單位毫秒。

Checkpoint 其他的屬性包括:

精確一次(exactly-once)對比至少一次(at-least-once):你可以選擇向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中傳入一個模式來選擇使用兩種保證等級中的哪一種。對于大多數(shù)應(yīng)用來說,精確一次是較好的選擇。至少一次可能與某些延遲超低(始終只有幾毫秒)的應(yīng)用的關(guān)聯(lián)較大。 checkpoint 超時:如果 checkpoint 執(zhí)行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄。 checkpoints 之間的最小時間:該屬性定義在 checkpoint 之間需要多久的時間,以確保流應(yīng)用在 checkpoint 之間有足夠的進展。如果值設(shè)置為了 5000,無論 checkpoint 持續(xù)時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒后會才開始下一個 checkpoint。 并發(fā) checkpoint 的數(shù)目: 默認情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統(tǒng)不會觸發(fā)另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。不過允許多個 checkpoint 并行進行是可行的,對于有確定的處理延遲(例如某方法所調(diào)用比較耗時的外部服務(wù)),但是仍然想進行頻繁的 checkpoint 去最小化故障后重跑的 pipelines 來說,是有意義的。 externalized checkpoints: 你可以配置周期存儲 checkpoint 到外部系統(tǒng)中。Externalized checkpoints 將他們的元數(shù)據(jù)寫到持久化存儲上并且在 job 失敗的時候不會被自動刪除。這種方式下,如果你的 job 失敗,你將會有一個現(xiàn)有的 checkpoint 去恢復(fù)。更多的細節(jié)請看 Externalized checkpoints 的部署文檔。 在 checkpoint 出錯時使 task 失敗或者繼續(xù)進行 task:他決定了在 task checkpoint 的過程中發(fā)生錯誤時,是否使 task 也失敗,使失敗是默認的行為。 或者禁用它時,這個任務(wù)將會簡單的把 checkpoint 錯誤信息報告給 checkpoint coordinator 并繼續(xù)運行。 優(yōu)先從 checkpoint 恢復(fù)(prefer checkpoint for recovery):該屬性確定 job 是否在最新的 checkpoint 回退,即使有更近的 savepoint 可用,這可以潛在地減少恢復(fù)時間(checkpoint 恢復(fù)比 savepoint 恢復(fù)更快)。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 1000ms 開始一次 checkpoint env.enableCheckpointing(1000); // 高級選項: // 設(shè)置模式為精確一次 (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確認 checkpoints 之間的時間會進行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必須在一分鐘內(nèi)完成,否則就會被拋棄 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只允許一個 checkpoint 進行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 開啟在 job 中止后仍然保留的 externalized checkpoints env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 允許在有更近 savepoint 時回退到 checkpoint env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

4.2、保存點機制 保存點機制 (Savepoints) 是檢查點機制的一種特殊的實現(xiàn),它允許通過手工的方式來觸發(fā) Checkpoint,并將結(jié)果持久化存儲到指定路徑中,主要用于避免 Flink 集群在重啟或升級時導(dǎo)致狀態(tài)丟失。示例如下:

觸發(fā)指定id的作業(yè)的Savepoint,并將結(jié)果存儲到指定目錄下 bin/flink savepoint :jobId [:targetDirectory] 1 2 五、狀態(tài)后端 Flink 提供了多種 state backends,它用于指定狀態(tài)的存儲方式和位置。

狀態(tài)可以位于 Java 的堆或堆外內(nèi)存。取決于 state backend,F(xiàn)link 也可以自己管理應(yīng)用程序的狀態(tài)。為了讓應(yīng)用程序可以維護非常大的狀態(tài),F(xiàn)link 可以自己管理內(nèi)存(如果有必要可以溢寫到磁盤)。默認情況下,所有 Flink Job 會使用配置文件 flink-conf.yaml 中指定的 state backend。

但是,配置文件中指定的默認 state backend 會被 Job 中指定的 state backend 覆蓋。

5.1、狀態(tài)管理器分類 MemoryStateBackend 默認的方式,即基于 JVM 的堆內(nèi)存進行存儲,主要適用于本地開發(fā)和調(diào)試。

FsStateBackend 基于文件系統(tǒng)進行存儲,可以是本地文件系統(tǒng),也可以是 HDFS 等分布式文件系統(tǒng)。 需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的數(shù)據(jù)仍然是存儲在 TaskManager 的內(nèi)存中的,只有在 checkpoint 時,才會將狀態(tài)快照寫入到指定文件系統(tǒng)上。

RocksDBStateBackend RocksDBStateBackend 是 Flink 內(nèi)置的第三方狀態(tài)管理器,采用嵌入式的 key-value 型數(shù)據(jù)庫 RocksDB 來存儲正在進行的數(shù)據(jù)。等到 checkpoint 時,再將其中的數(shù)據(jù)持久化到指定的文件系統(tǒng)中,所以采用 RocksDBStateBackend 時也需要配置持久化存儲的文件系統(tǒng)。之所以這樣做是因為 RocksDB 作為嵌入式數(shù)據(jù)庫安全性比較低,但比起全文件系統(tǒng)的方式,其讀取速率更快;比起全內(nèi)存的方式,其存儲空間更大,因此它是一種比較均衡的方案。

5.2、配置方式 Flink 支持使用兩種方式來配置后端管理器:

第一種方式:基于代碼方式進行配置,只對當(dāng)前作業(yè)生效:

// 配置 FsStateBackend env.setStateBackend(new FsStateBackend(“hdfs://namenode:40010/flink/checkpoints”)); // 配置 RocksDBStateBackend env.setStateBackend(new RocksDBStateBackend(“hdfs://namenode:40010/flink/checkpoints”)); 配置 RocksDBStateBackend 時,需要額外導(dǎo)入下面的依賴:

org.apache.flink flink-statebackend-rocksdb_2.11 1.9.0

第二種方式:基于 flink-conf.yaml 配置文件的方式進行配置,對所有部署在該集群上的作業(yè)都生效: state.backend: filesystem state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints 1 2 六、狀態(tài)一致性 6.1、端到端(end-to-end) 在真實應(yīng)用中,流處理應(yīng)用除了流處理器以外還包含了數(shù)據(jù)源(例如 Kafka)和輸出到持久化系統(tǒng)。

端到端的一致性保證,意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的始終;每一個組件都保證了它自己的一致性,整個端到端的一致性級別取決于所有組件中一致性最弱的組件。具體可以劃分如下:

內(nèi)部保證:依賴checkpoint source 端:需要外部源可重設(shè)數(shù)據(jù)的讀取位置 sink 端:需要保證從故障恢復(fù)時,數(shù)據(jù)不會重復(fù)寫入外部系統(tǒng)。 而對于sink端,又有兩種具體的實現(xiàn)方式:

冪等(Idempotent)寫入:所謂冪等操作,是說一個操作,可以重復(fù)執(zhí)行很多次,但只導(dǎo)致一次結(jié)果更改,也就是說,后面再重復(fù)執(zhí)行就不起作用了。 事務(wù)性(Transactional)寫入:需要構(gòu)建事務(wù)來寫入外部系統(tǒng),構(gòu)建的事務(wù)對應(yīng)著 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應(yīng)的結(jié)果寫入 sink 系統(tǒng)中。 對于事務(wù)性寫入,具體又有兩種實現(xiàn)方式:預(yù)寫日志(WAL)和兩階段提交(2PC)。Flink DataStream API 提供了GenericWriteAheadSink 模板類和 TwoPhaseCommitSinkFunction 接口,可以方便地實現(xiàn)這兩種方式的事務(wù)性寫入。

6.2、Flink+Kafka 實現(xiàn)端到端的 exactly-once語義 端到端的狀態(tài)一致性的實現(xiàn),需要每一個組件都實現(xiàn),對于Flink + ## Kafka的數(shù)據(jù)管道系統(tǒng)(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?

內(nèi)部:利用checkpoint機制,把狀態(tài)存盤,發(fā)生故障的時候可以恢復(fù),保證內(nèi)部的狀態(tài)一致性 source:kafka consumer作為source,可以將偏移量保存下來,如果后續(xù)任務(wù)出現(xiàn)了故障,恢復(fù)的時候可以由連接器重置偏移量,重新消費數(shù)據(jù),保證一致性 sink:kafka producer作為sink,采用兩階段提交 sink,需要實現(xiàn)一個TwoPhaseCommitSinkFunction內(nèi)部的checkpoint機制。 Flink由JobManager協(xié)調(diào)各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內(nèi)存級的,也可以改為文件級的進行持久化保存。

當(dāng) checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數(shù)據(jù)流;barrier會在算子間傳遞下去。

每個算子會對當(dāng)前的狀態(tài)做個快照,保存到狀態(tài)后端。對于source任務(wù)而言,就會把當(dāng)前的offset作為狀態(tài)保存起來。下次從checkpoint恢復(fù)時,source任務(wù)可以重新提交偏移量,從上次保存的位置開始重新消費數(shù)據(jù)。

每個內(nèi)部的 transform 任務(wù)遇到 barrier 時,都會把狀態(tài)存到 checkpoint 里。

sink 任務(wù)首先把數(shù)據(jù)寫入外部 kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)(還不能被消費);當(dāng)遇到 barrier 時,把狀態(tài)保存到狀態(tài)后端,并開啟新的預(yù)提交事務(wù)。

當(dāng)所有算子任務(wù)的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務(wù)發(fā)通知,確認這次 checkpoint 完成。當(dāng)sink 任務(wù)收到確認通知,就會正式提交之前的事務(wù),kafka 中未確認的數(shù)據(jù)就改為“已確認”,數(shù)據(jù)就真正可以被消費了。

所以看到,執(zhí)行過程實際上是一個兩段式提交,每個算子執(zhí)行完成,會進行“預(yù)提交”,直到執(zhí)行完sink操作,會發(fā)起“確認提交”,如果執(zhí)行失敗,預(yù)提交會放棄掉。

具體的兩階段提交步驟總結(jié)如下:

第一條數(shù)據(jù)來了之后,開啟一個 kafka 的事務(wù)(transaction),正常寫入 kafka 分區(qū)日志但標(biāo)記為未提交,這就是“預(yù)提交”, jobmanager 觸發(fā) checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態(tài)存入狀態(tài)后端,并通知 jobmanager sink 連接器收到 barrier,保存當(dāng)前狀態(tài),存入 checkpoint,通知 jobmanager,并開啟下一階段的事務(wù),用于提交下個檢查點的數(shù)據(jù) jobmanager 收到所有任務(wù)的通知,發(fā)出確認信息,表示 checkpoint 完成 sink 任務(wù)收到 jobmanager 的確認信息,正式提交這段時間的數(shù)據(jù) 外部kafka關(guān)閉事務(wù),提交的數(shù)據(jù)可以正常消費了。 所以也可以看到,如果宕機需要通過StateBackend進行恢復(fù),只能恢復(fù)所有確認提交的操作

47 . 簡述Flink廣播流 ?

其實在上一題中,在Operator States也有介紹廣播流(Broadcast State)。 廣播狀態(tài)(Broadcast State)是 Apache Flink 中支持的第三種類型的Operator State。Broadcast State使得 Flink 用戶能夠以容錯、一致、可擴縮容地將來自廣播的低吞吐的事件流數(shù)據(jù)存儲下來,被廣播到某個 operator 的所有并發(fā)實例中,然后與另一條流數(shù)據(jù)連接進行計算。 廣播狀態(tài)與其他 operator state 之間有三個主要區(qū)別: Map 的格式 有一條廣播的輸入流 operator 可以有多個不同名字的廣播狀態(tài) 1、注意事項 在使用廣播狀態(tài)時要記住以下4個重要事項:

1)使用廣播狀態(tài),operator task 之間不會相互通信 這也是為什么(Keyed)-BroadcastProcessFunction上只有廣播的一邊可以修改廣播狀態(tài)的內(nèi)容。用戶必須 保證所有 operator 并發(fā)實例上對廣播狀態(tài)的修改行為都是一致的?;蛘哒f,如果不同的并發(fā)實例擁有不同的廣播狀態(tài)內(nèi)容,將導(dǎo)致不一致的結(jié)果。 2)廣播狀態(tài)中事件的順序在各個并發(fā)實例中可能不盡相同 雖然廣播流的元素保證了將所有元素(最終)都發(fā)給下游所有的并發(fā)實例,但是元素的到達的順序可能 在并發(fā)實例之間并不相同。因此,對廣播狀態(tài)的修改不能依賴于輸入數(shù)據(jù)的順序。 3)所有 operator task 都會快照下他們的廣播狀態(tài) 在 checkpoint 時,所有的 task 都會 checkpoint 下它們的廣播狀態(tài),并不僅僅是其中一個,即使所有 task 在廣播狀態(tài)中存儲的元素是一模一樣的。這是一個設(shè)計傾向,為了避免在恢復(fù)期間從單個文件讀取而造成熱點。然而,隨著并發(fā)度的增加,checkpoint 的大小也會隨之增加,這里會存在一個并發(fā)因子p 的權(quán)衡。Flink保證了在恢復(fù)/擴縮容時不會出現(xiàn)重復(fù)數(shù)據(jù)和少數(shù)據(jù)。在以相同或更小并行度恢復(fù)時,每 個 task 會讀取其對應(yīng)的檢查點狀態(tài)。在已更大并行度恢復(fù)時,每個 task 讀取自己的狀態(tài),剩余的 task (p_newp_old)會以循環(huán)方式(round-robin)讀取檢查點的狀態(tài)。 4) RocksDBStateBackend狀態(tài)后端目前還不支持廣播狀態(tài) 廣播狀態(tài)目前在運行時保存在內(nèi)存中。因為當(dāng)前,RocksDB狀態(tài)后端還不適用于operator state。Flink 用戶應(yīng)該相應(yīng)地為其應(yīng)用程序配置足夠的內(nèi)存。 2、廣播狀態(tài)模式的應(yīng)用 一般來說廣播狀態(tài)的主要應(yīng)用場景如下: 動態(tài)規(guī)則:動態(tài)規(guī)則是一條事件流,要求吞吐量不能太高。例如,當(dāng)一個報警規(guī)則時觸發(fā)報警信息等。 我們將這個規(guī)則廣播到計算的算子的所有并發(fā)實例中。 數(shù)據(jù)豐富:例如,將用戶的詳細信息作業(yè)廣播狀態(tài)進行廣播,對包含用戶ID的交易數(shù)據(jù)流進行數(shù)據(jù)豐富

48 . 簡述什么是Flink實時topN ?

TopN 是統(tǒng)計報表和大屏非常常見的功能,主要用來實時計算排行榜。流式的TopN可以使業(yè)務(wù)方在內(nèi)存中按照某個統(tǒng)計指標(biāo)(如出現(xiàn)次數(shù))計算排名并快速出發(fā)出更新后的排行榜。我們以統(tǒng)計詞 頻為例展示一下如何快速開發(fā)一個計算TopN的Flink程序。

49 . 簡述什么是Flink的Savepoint ?

Savepoint在Flink中叫做保存點,是基于Flink檢查點機制的應(yīng)用完整快照備份機制,用來保存狀態(tài),可 以在另一個集群或者另一個時間點,從保存的狀態(tài)中將作業(yè)恢復(fù)回來,適用于應(yīng)用升級、集群遷移、 Flink集群版本更新、A/B測試以及假定場景、暫停和重啟、歸檔等場景。保存點可以視為一個 (算子 ID→State)的Map,對于每一個有狀態(tài)的算子,Key是算子ID,Value是算子的State。 在作業(yè)恢復(fù)方面,F(xiàn)link提供了應(yīng)用自動容錯機制,可以減少人為干預(yù),降低運維復(fù)雜度。同時為了提高 靈活度,也提供了手動恢復(fù)。Flink提供了外部檢查點和保存點兩種手動作業(yè)恢復(fù)方式。這里說下保存 點恢復(fù)方式。 保存點恢復(fù)方式 用戶通過命令觸發(fā),由用戶手動創(chuàng)建、清理。使用了標(biāo)準(zhǔn)化格式存儲,允許作業(yè)升級或者配置變更。用 戶在恢復(fù)時需要提供用于恢復(fù)作業(yè)狀態(tài)的保存點路徑。

其實,從保存點恢復(fù)作業(yè)并不簡單,尤其是在作業(yè)變更(如修改邏輯、修復(fù)bug)的情況下,需要考慮 如下幾點。 1)算子的順序改變 如果對應(yīng)的UID沒變,則可以恢復(fù),如果對應(yīng)的UID變了則恢復(fù)失敗。 2)作業(yè)中添加了新的算子 如果是無狀態(tài)算子,沒有影響,可以正常恢復(fù),如果是有狀態(tài)的算子,跟無狀態(tài)的算子一樣處理。 3)從作業(yè)中刪除了一個有狀態(tài)的算子 默認需要恢復(fù)保存點中所記錄的所有算子的狀態(tài),如果刪除了一個有狀態(tài)的算子,從保存點恢復(fù)的時候 被刪除的OperatorID找不到,所以會報錯,可以通過在命令中添加-allowNonRestoredState (short: -n) 跳過無法恢復(fù)的算子。 4)添加和刪除無狀態(tài)的算子 如果手動設(shè)置了UID,則可以恢復(fù),保存點中不記錄無狀態(tài)的算子,如果是自動分配的UID,那么有狀態(tài) 算子的UID可能會變(Flink使用一個單調(diào)遞增的計數(shù)器生成UID,DAG改版,計數(shù)器極有可能會變),很 有可能恢復(fù)失敗。 5)恢復(fù)的時候調(diào)整并行度 Flink1.2.0及以上版本,如果沒有使用作廢的API,則沒問題;1.2.0以下版本需要首先升級到1.2.0才可以

50 . 簡述為什么用Flink不用別的微批考慮過嗎 ?

mini-batch模式的處理過程如下: 在數(shù)據(jù)流中收集記錄; 收集若干記錄后,調(diào)度一個批處理作業(yè)進行數(shù)據(jù)處理; 在批處理運行的同時,收集下一批次的記錄。 也就是說Spark為了處理一個mini-batch,需要調(diào)度一個批處理作業(yè),相比于Flink延遲較大,Spark的處 理延遲在秒級。而Flink只需啟動一個流計算拓撲,處理持續(xù)不斷的數(shù)據(jù),F(xiàn)link的處理延遲在毫秒級 別。如果計算中涉及到多個網(wǎng)絡(luò)Shuule,Spark Streaming和Flink之間的延遲差距會進一步拉大

51 . 簡述解釋一下啥叫背壓 ?

什么是背壓 在流式處理系統(tǒng)中,如果出現(xiàn)下游消費的速度跟不上上游生產(chǎn)數(shù)據(jù)的速度,就種現(xiàn)象就叫做背壓(backpressure,有人叫反壓,不糾結(jié),本篇叫背壓)。本篇主要以Flink作為流式計算框架來簡單背壓機制,為了更好理解,只做簡單分享。

背壓產(chǎn)生的原因 下游消費的速度跟不上上游生產(chǎn)數(shù)據(jù)的速度,可能出現(xiàn)的原因如下:

(1)節(jié)點有性能瓶頸,可能是該節(jié)點所在的機器有網(wǎng)絡(luò)、磁盤等等故障,機器的網(wǎng)絡(luò)延遲和磁盤不足、頻繁GC、數(shù)據(jù)熱點等原因。 (2)數(shù)據(jù)源生產(chǎn)數(shù)據(jù)的速度過快,計算框架處理不及時。比如消息中間件kafka,生產(chǎn)者生產(chǎn)數(shù)據(jù)過快,下游flink消費計算不及時。 (3)flink算子間并行度不同,下游算子相比上游算子過小。

背壓導(dǎo)致的影響 首先,背壓不會直接導(dǎo)致系統(tǒng)的崩盤,只是處在一個不健康的運行狀態(tài)。

(1)背壓會導(dǎo)致流處理作業(yè)數(shù)據(jù)延遲的增加。 (2)影響到Checkpoint,導(dǎo)致失敗,導(dǎo)致狀態(tài)數(shù)據(jù)保存不了,如果上游是kafka數(shù)據(jù)源,在一致性的要求下,可能會導(dǎo)致offset的提交不上。 原理: 由于Flink的Checkpoint機制需要進行Barrier對齊,如果此時某個Task出現(xiàn)了背壓,Barrier流動的速度就會變慢,導(dǎo)致Checkpoint整體時間變長,如果背壓很嚴(yán)重,還有可能導(dǎo)致Checkpoint超時失敗。 (3)影響state的大小,還是因為checkpoint barrier對齊要求。導(dǎo)致state變大。

原理:接受到較快的輸入管道的barrier后,它后面數(shù)據(jù)會被緩存起來但不處理,直到較慢的輸入管道的barrier也到達。這些被緩存的數(shù)據(jù)會被放到state 里面,導(dǎo)致state變大。

52 . 簡述Flink分布式快照 ?

分布式快照可以將同一時間點Task/Operator的狀態(tài)數(shù)據(jù)全局統(tǒng)一快照處理,包括Keyed State和Operator State。 Flink的分布式快照是根據(jù)Chandy-Lamport算法量身定做的。簡單來說就是持續(xù)創(chuàng)建分布式數(shù)據(jù)流及其狀態(tài)的一致快照。 核心思想是在 input source 端插入 barrier,控制 barrier 的同步來實現(xiàn) snapshot 的備份和 exactly-once 語義。

53 . 簡述Flink SQL解析過程 ?

首先,先了解下Calcite是什么。 1、Apache Calcite是什么 Apache Calcite是一個動態(tài)數(shù)據(jù)管理框架,它具備很多典型數(shù)據(jù)庫管理系統(tǒng)的功能,如SQL解析、SQL校驗、SQL查詢優(yōu)化、SQL生成以及數(shù)據(jù)連接查詢等,但是又省略了一些關(guān)鍵的功能,如Calcite并不存儲相 關(guān)的元數(shù)據(jù)和基本數(shù)據(jù),不完全包含相關(guān)處理數(shù)據(jù)的算法等。 Calcite采用的是業(yè)界大數(shù)據(jù)查詢框架的一種通用思路,它的目標(biāo)是“one size fits all(一種方案適應(yīng)所有需求場景)”,希望能為不同計算平臺和數(shù)據(jù)源提供統(tǒng)一的查詢引擎。 Calcite作為一個強大的SQL計算引擎,在Flink內(nèi)部的SQL引擎模塊就是基于Calcite。 2、Calcite的特點 支持標(biāo)準(zhǔn)SQL語言; 獨立于編程語言和數(shù)據(jù)源,可以支持不同的前端和后端; 支持關(guān)系代數(shù)、可定制的邏輯規(guī)則和基于成本模型優(yōu)化的查詢引擎; 支持物化視圖(materialized view)的管理(創(chuàng)建、丟棄、持久化和自動識別); 基于物化視圖的Lattice和Tile機制,以應(yīng)用于OLAP分析; 支持對流數(shù)據(jù)的查詢。 3、Calcite的功能

1) SQL解析

Calcite的SQL解析是通過JavaCC實現(xiàn)的,使用JavaCC編寫SQL語法描述文件,將SQL解析成未經(jīng)校驗的 AST語法樹。 2) SQL效驗 校驗分兩部分: 無狀態(tài)的校驗:即驗證SQL語句是否符合規(guī)范。 有狀態(tài)的校驗:即通過與元數(shù)據(jù)結(jié)合驗證SQL中的Schema、Field、Function是否存在,輸入輸出類 型是否匹配等。 3) SQL查詢優(yōu)化 對上個步驟的輸出(RelNode,邏輯計劃樹)進行優(yōu)化,得到優(yōu)化后的物理執(zhí)行計劃。優(yōu)化有兩種:基 于規(guī)則的優(yōu)化和基于代價的優(yōu)化。 4) SQL生成 將物理執(zhí)行計劃生成為在特定平臺/引擎的可執(zhí)行程序,如生成符合MySQL或Oracle等不同平臺規(guī)則的 SQL查詢語句等。 5)數(shù)據(jù)連接與執(zhí)行 通過各個執(zhí)行平臺執(zhí)行查詢,得到輸出結(jié)果。 在Flink或者其他使用Calcite的大數(shù)據(jù)引擎中,一般到SQL查詢優(yōu)化即結(jié)束,由各個平臺結(jié)合Calcite的SQL 代碼生成和平臺實現(xiàn)的代碼生成,將優(yōu)化后的物理執(zhí)行計劃組合成可執(zhí)行的代碼,然后在內(nèi)存中編譯執(zhí) 行。 4、Flink SQL結(jié)合Calcite(Flink SQL解析)

一條SQL從提交到Calcite解析,優(yōu)化,到最后的Flink執(zhí)行,一般分以下過程: 1)Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST; 2)Sql Validator: 結(jié)合數(shù)字字典(catalog)去驗證sql語法; 3)生成Logical Plan: 將sqlNode表示的AST轉(zhuǎn)換成LogicalPlan, 用relNode表示; 4)生成 optimized LogicalPlan: 先基于calcite rules 去優(yōu)化logical Plan,基于flink定制的一些優(yōu)化rules 去優(yōu)化logical Plan; 5)生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉(zhuǎn)成成Flink的物理執(zhí)行計劃; 6)將物理執(zhí)行計劃轉(zhuǎn)成Flink ExecutionPlan:就是調(diào)用相應(yīng)的tanslateToPlan方法轉(zhuǎn)換和利用CodeGen元編程成Flink的各種算子。

這里再提一下SQL的優(yōu)化: 5、SQL查詢優(yōu)化器 SQL優(yōu)化的發(fā)展,則可以分為兩個階段,即RBO(基于規(guī)則),和CBO(基于代價)。 邏輯優(yōu)化使用Calcite的Hep優(yōu)化器(基于規(guī)則),物理優(yōu)化階段使用了Calcite的Hep規(guī)則優(yōu)化器和 Volcano優(yōu)化器(基于代價)。 1) RBO(基于規(guī)則的優(yōu)化器)會將原有表達式裁剪掉,遍歷一系列規(guī)則(Rule),只要滿足條件就轉(zhuǎn) 換,生成最終的執(zhí)行計劃。一些常見的規(guī)則包括分區(qū)裁剪(Partition Prune)、列裁剪、謂詞下推 (Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit下推、sort下推、常量折疊(Constant Folding)、子查詢內(nèi)聯(lián)轉(zhuǎn)join等。 2) CBO(基于代價的優(yōu)化器)會將原有表達式保留,基于統(tǒng)計信息和代價模型,嘗試探索生成等價關(guān) 系表達式,最終取代價最小的執(zhí)行計劃。CBO的實現(xiàn)有兩種模型,Vol can o模型,Cascades模型。這兩種模型思想很是相似,不同點在于Cascades模型一邊遍歷SQL邏輯樹,一邊優(yōu)化,從而進一步裁剪掉一 些執(zhí)行計劃。 看一個案例: RBO(基于規(guī)則)優(yōu)化 RBO主要是開發(fā)人員在使用SQL的過程中,有些發(fā)現(xiàn)有些通用的規(guī)則,可以顯著提高SQL執(zhí)行的效率,比 如最經(jīng)典的Filter下推:

將Filter下推到Join之前執(zhí)行,這樣做的好處是減少了Join的數(shù)量,同時降低了CPU,內(nèi)存,網(wǎng)絡(luò)等方面 的開銷,提高效率。 RBO和CBO的區(qū)別大概在于:RBO只為應(yīng)用提供的rule,而CBO會根據(jù)給出的Cost信息,智能應(yīng)用rule, 求出一個Cost最低的執(zhí)行計劃。需要糾正很多人誤區(qū)的一點是,CBO其實也是基于rule的,接觸到RBO和 CBO這兩個概念的時候,很容易將他們對立起來。但實際上CBO,可以理解為就是加上Cost的RBO。 目前各大數(shù)據(jù)庫和計算引擎傾向于CBO。

54 . 簡述 什么是Flink on YARN模式 ?

Flink支持多種部署模式: 1) Standalone模式:Flink安裝在普通的Linux機器上,或者安裝在K8s中,集群的資源由Flink自行管理。 2) Yarn、Mesos、K8s等資源管理集群模式:Flink向資源集群申請資源,創(chuàng)建Flink集群。 3)云上模式:Flink可以在Google、亞馬遜云計算平臺上輕松部署

Flink on Yarn交互過程如下:

1) Client上傳Flink的jar包和配置文件到HDFS集群上 2) Client向Yarn的ResourceManager提交任務(wù)和申請資源 3) ResourceManager分配Container資源并啟動ApplicationMaster 4) ApplicationMaster加載Flink的jar包和配置文件構(gòu)建環(huán)境啟動Flink-JobManager 5) ApplicationMaster向ResourceManager申請任務(wù)資源 6) NodeManager加載Flink的jar包和配置文件構(gòu)建環(huán)境并啟動TaskManager 7) TaskManager啟動后會向JobManager發(fā)送心跳,并等待JobManager向其分配任務(wù)Flink On Yarn模式的兩種方式:Session模式和Per-Job模式 1、Session模式(適合小任務(wù)使用) 需要先申請資源,啟動JobManager和TaskManager 不需要每次提交任務(wù)再去申請資源,而是使用已經(jīng)申請好的資源,從而提高執(zhí)行效率任務(wù)提交完資源不會被釋放,因此一直會占用資源

2、Per-Job模式:(適合使用大任務(wù),且資源充足) 每次提交任務(wù)都需要去申請資源,申請資源需要時間,所有影響執(zhí)行效率(但是在大數(shù)據(jù)面前都是小 事) 每次執(zhí)行完任務(wù)資源就會立刻被釋放,不會占用資源

55 . 簡述Flink如何保證數(shù)據(jù)不丟失 ?

Checkpoint(檢查點)是Flink實現(xiàn)應(yīng)用容錯的核心機制。 Flink根據(jù)配置周期性通知Stream中各個算子的狀態(tài)來生成檢查點快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化 存儲下來,F(xiàn)link程序一旦意外崩潰,重新運行程序時可以有選擇地從這些快照進行恢復(fù),將應(yīng)用恢復(fù)到 最后一次快照的狀態(tài),從此刻開始重新執(zhí)行,避免數(shù)據(jù)的丟失、重復(fù)。 默認情況下,如果設(shè)置了檢查點選項,則Flink只保留最近成功生成的一個檢查點,而當(dāng)Flink程序失敗 時,可以從最近的這個檢查點來進行恢復(fù)。但是,如果希望保留多個檢查點,并能夠根據(jù)實際需要選擇 其中一個進行恢復(fù),會更加靈活。 默認情況下,檢查點不會被保留,取消程序時即會刪除它們,但是可以通過配置保留定期檢查點,根據(jù) 配置,當(dāng)作業(yè)失敗或者取消的時候,不會自動清除這些保留的檢查點。 如果想保留檢查點,那么Flink也設(shè)計了相關(guān)實現(xiàn),可選項如下。 ExternalizedCheckpointCleanup. RETAIN_ON_CANCELLATION:取消作業(yè)時保留檢查點。在這種情況下,必須在取消后手動清理檢查點狀態(tài)。 ExternalizedCheckpointCleanup. DELETE_ON_CANCELLATION:取消作業(yè)時刪除檢查點。只有在作業(yè)失敗時檢查點狀態(tài)才可用。

56 . 簡述Flink的API可分為哪幾層 ?

(1)SQL & Tale AP!同時適用于批處理和流處理,這意味著你可以對有界數(shù)據(jù)流和無界數(shù)據(jù)流以相同的語義進行查詢,并產(chǎn)生相同的結(jié)果。除了基本查詢外, 它還支持自定義的標(biāo)量函數(shù),聚合函數(shù)以及表值函數(shù),可以滿足多樣化的查詢需求。 (2)DataStream & DataSet API 是 Flink 數(shù)據(jù)處理的核心 APL,支持使用 Java 語言或 Scala 語言進行調(diào)用,提供了數(shù)據(jù)讀取,數(shù)換和數(shù)據(jù)輸出等一系列常用操作的封裝。 (3)Stateful Stream Processing 是最低級別的抽象,它通過 Process Function 函數(shù)內(nèi)嵌到 DataStream AP1 中。ProcessEunction 是 Elink 提供的最底層 API,具有最大的靈活性,允許開發(fā)者對于時間和狀態(tài)進行細粒度的控制

57 . 簡述Flink的分區(qū)策略 ?

按照key值分區(qū) 全部發(fā)往一個分區(qū) 廣播 上下游并行度一樣時一對一發(fā)送 隨機均勻分配 輪流分配

58 . 簡述KeyedState都有哪幾類 ?

Keyed State 可以進一步劃分為下面的 5 類,它們分別是 。比較常用的: ValueState、ListState、MapState 。不太常用的: ReducingState 和 AggregationState

59 . 簡述Flink全局快照 ?

全局快照首先是一個分布式應(yīng)用,它有多個進程分布在多個服務(wù)器上: 其次,它在應(yīng)用內(nèi)部有自己的處理邏輯和狀態(tài): 第三,應(yīng)用間是可以互相通信的: 第四,在這種分布式的應(yīng)用,有內(nèi)部狀態(tài),硬件可以通信的情況下,某一時刻的全局狀態(tài),就叫做全局的快照

60 . 簡述Flink CEP 編程中當(dāng)狀態(tài)沒有到達的時候會將數(shù)據(jù)保存在哪里 ?

在 Flink CEP 的處理邏輯中,狀態(tài)沒有滿足的和遲到的數(shù)據(jù),都會存儲在一個 Map 數(shù)據(jù)結(jié)構(gòu)中,也就是說,如果我們限定判斷事件序列的時長為 5 分鐘,那么內(nèi)存中就會存儲 5 分鐘的數(shù)據(jù)

61 . 簡述Flink中的廣播變量,使用時需要注意什么 ?

我們知道Flink是并行的,計算過程可能不在一個 Slot 中進行,那么有一種情況即:當(dāng)我們需要訪問同一份數(shù)據(jù)。那么Fink中的廣播變量就是為了解決這種情況。 我們可以把廣播變量理解為是一個公共的共享變量,我們可以把一個dataset 數(shù)據(jù)集廣播出去,然后不同的task在節(jié)點上都能夠獲取到,這個數(shù)據(jù)在每個節(jié)點上只會存在一份

62 . 簡述Flink-On-Yarn常見的提交模式有哪些,分別有什么優(yōu)缺點 ?

1.yarn-session 式 這種方式需要先啟動集群,然后在提交作業(yè),接著會向varn申請一塊空間后,資源永遠保持不變。如果資源滿了,下一個就任務(wù)就無法提交,只能等到varn中其中一個作業(yè)完成后,程放了資源,那下-個作業(yè)才會正常提交,這種方式資源被限制在sesSi0n中,不能超過比較適合特定的運行環(huán)境或測試環(huán)境。 2.per-job模式 這種方式直接在yarn上提交任務(wù)運行Flink作業(yè),這種方式的好處是一個任務(wù)會對應(yīng)一個job,即每提交一個作業(yè)會根據(jù)自身的情況.向yarn中申請資源,直到作業(yè)執(zhí)行完成,并不會影響下一個作業(yè)的正常運行,除非是yarn上面沒有任何資源的情況下。一般生產(chǎn)環(huán)境是采用此方式運行。這種方式需要保證集群資源足夠

63 . 簡述什么是Flink Operator Chains ?

為了更高效地分布式執(zhí)行,F(xiàn)link會盡可能地將operator的subtask鏈接 chain) 在一起形成task。每個task在一個線程中執(zhí)行。將operatorst技成ask是非帶有效的優(yōu)化:它能減少線程之同的切授,減少消息的序列化/反成列化,減少少了是遲的同時提高整體的吞吐量。這就是我們所說的算子鏈。其實就是盡量把操作邏輯放入到同一個subtask里就是一個槽taskSlot

64 . 簡述Flink中應(yīng)用在ableAPI中的UDF有幾種 ?

scalar function: 針對一條record的一個字段的操作,返回一個字段。 table function: 針對一條record的一個字段的操作,返回多個字段 aggregate function: 針對多條記錄的一個字段操作,返回一條記錄

65 . 簡述Flink中如何進行狀態(tài)恢復(fù)?

Flink使用檢查點(Checkpoint)機制進行狀態(tài)恢復(fù),即在運行過程中將狀態(tài)保存到外部存儲系統(tǒng)(如HDFS、S3等)中,以便在節(jié)點故障或手動操作(如更新應(yīng)用程序代碼)時重新啟動時恢復(fù)狀態(tài)。Flink提供兩種類型的檢查點:增量檢查點和精確一次檢查點。

增量檢查點(Incremental Checkpoints)只保存從上一個檢查點到當(dāng)前檢查點之間發(fā)生的更改。因此,它們比精確一次檢查點更快,但在恢復(fù)時需要應(yīng)用更多的更改,因此可能需要更長的時間來恢復(fù)應(yīng)用程序狀態(tài)。

精確一次檢查點(Exactly-once Checkpoints)是最常用的檢查點類型。它會對整個應(yīng)用程序的狀態(tài)進行快照,并確保檢查點是精確一次的,即檢查點保存的狀態(tài)不包含任何重復(fù)的記錄。這種類型的檢查點是最可靠和最完整的狀態(tài)恢復(fù)方式,但需要更長的恢復(fù)時間和更多的資源。

66 . 簡述Flink中的任務(wù)并發(fā)度是怎樣控制的 ?

Flink中的任務(wù)并發(fā)度由并行度和任務(wù)槽數(shù)量共同控制。在Flink中,每個任務(wù)槽(task slot)代表一個Flink集群中的一個物理資源,可以理解為一個線程。并行度指的是同一算子并行執(zhí)行的任務(wù)槽數(shù)量。并行度越高,同一算子的任務(wù)被分配到的任務(wù)槽數(shù)量越多,任務(wù)的執(zhí)行速度也就越快。但是并行度越高,也會帶來更多的通信和協(xié)調(diào)開銷。因此,在實際使用中需要根據(jù)數(shù)據(jù)量、計算復(fù)雜度和硬件資源等因素進行調(diào)整。

在Flink中,可以通過以下方式控制任務(wù)的并發(fā)度:

全局并發(fā)度:在執(zhí)行環(huán)境中指定的并行度,是整個作業(yè)的并行度,控制著算子任務(wù)的總數(shù)。 算子并發(fā)度:在算子實例化時指定的并行度,控制著算子任務(wù)的分配數(shù)量。

67 . 簡述Flink中的批處理有哪些優(yōu)化策略 ?

Flink中批處理作業(yè)的優(yōu)化主要集中在以下幾個方面:

數(shù)據(jù)源:合理選擇數(shù)據(jù)源,減少數(shù)據(jù)傾斜和數(shù)據(jù)借助操作的開銷。

分區(qū):根據(jù)數(shù)據(jù)量和計算資源,合理設(shè)置并行度和分區(qū)數(shù),充分利用集群的計算資源。

內(nèi)存管理:Flink中使用了內(nèi)存管理機制對內(nèi)存進行管理和分配,通過優(yōu)化內(nèi)存使用方式,可以減少內(nèi)存分配和GC開銷,提高處理性能。

重用對象:避免在算子中頻繁地創(chuàng)建和銷毀對象,可以通過對象重用機制減少內(nèi)存分配和GC開銷。

算子選擇:根據(jù)具體的業(yè)務(wù)場景,選擇性能更優(yōu)的算子,比如使用聚合算子代替多個Map和Reduce算子,可以減少數(shù)據(jù)傾斜和網(wǎng)絡(luò)開銷。

并行算法:對于一些高性能的算法,可以采用并行算法來進行計算,提高處理性能。

緩存:使用緩存機制可以減少IO和網(wǎng)絡(luò)開銷,提高數(shù)據(jù)讀寫速度和處理性能。

壓縮:對于一些需要傳輸?shù)拇髷?shù)據(jù),可以使用壓縮算法來減小數(shù)據(jù)的傳輸量,提高數(shù)據(jù)傳輸速度。 總之,F(xiàn)link中的批處理作業(yè)優(yōu)化需要結(jié)合具體的業(yè)務(wù)場景和數(shù)據(jù)規(guī)模來進行,需要從多個方面入手進行優(yōu)化,以達到提高作業(yè)性能和效率的目的。

柚子快報激活碼778899分享:【大數(shù)據(jù)專題】Flink題庫

http://yzkb.51969.com/

精彩內(nèi)容

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19241440.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄