新聞中心
這篇文章主要介紹Flink 算子狀態(tài)怎么用,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!
湛河網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、自適應(yīng)網(wǎng)站建設(shè)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營維護(hù)。成都創(chuàng)新互聯(lián)從2013年成立到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)。
1. 算子狀態(tài)分類
算子狀態(tài)的作用范圍限定為算子并行子任務(wù)。這意味著由同一并行子任務(wù)所處理的所有數(shù)據(jù)都可以訪問到相同的狀態(tài),狀態(tài)對(duì)于同一子任務(wù)而言是共享的。算子狀態(tài)不能由相同或不同算子的另一個(gè)并行子任務(wù)訪問。
Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu),主要介紹當(dāng)并行度改變(擴(kuò)縮容)時(shí),從保存點(diǎn)重新啟動(dòng)時(shí),算子狀態(tài)如何分配:
列表狀態(tài)(List state):將狀態(tài)表示為一組數(shù)據(jù)的列表。
帶有算子列表狀態(tài)的算子在擴(kuò)縮容時(shí)會(huì)對(duì)列表中的條目進(jìn)行重新分配。理論上,所有并行算子任務(wù)的列表?xiàng)l目會(huì)被統(tǒng)一收集起來,隨后均勻分配到更少或更多的任務(wù)之上。如果列表?xiàng)l目的數(shù)量小于算子新設(shè)置的并行度,部分任務(wù)在啟動(dòng)時(shí)的狀態(tài)就可能為空。
聯(lián)合列表狀態(tài)(Union list state) 也將狀態(tài)表示為數(shù)據(jù)的列表。它與常規(guī)列表狀態(tài)的區(qū)別在于,在發(fā)生故障從保存點(diǎn)(savepoint)啟動(dòng)應(yīng)用程序時(shí)進(jìn)行恢復(fù),如果并行度發(fā)生改變,帶有算子聯(lián)合列表狀態(tài)的算子會(huì)在擴(kuò)縮容時(shí)把狀態(tài)列表的全部條目廣播到全部任務(wù)上,隨后由任務(wù)自己決定哪些條目應(yīng)該保留,哪些應(yīng)該丟棄。
對(duì)于同一個(gè)算子來說,假如之前的并行度為2,那么就會(huì)有兩個(gè)子任務(wù),也就是兩個(gè)狀態(tài),假如改變其并行度為3,那么就把之前的兩個(gè)狀態(tài),給每個(gè)并行子任務(wù)都發(fā)一份,這樣每個(gè)并行子任務(wù)上都有所有的狀態(tài),然后由并行子任務(wù)去決定使用哪個(gè)狀態(tài)。
廣播狀態(tài)(Broadcast state):不同于普通的算子狀態(tài),每個(gè)并行子任務(wù)的狀態(tài)相同。但是仍然是每個(gè)并行子任務(wù)訪問自己的狀態(tài),但是狀態(tài)都是一樣的。 如果一個(gè)算子有多項(xiàng)任務(wù),而它的每個(gè)并行子任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)。
帶有算子廣播狀態(tài)的算子在擴(kuò)縮容時(shí)會(huì)把狀態(tài)拷貝到全部新任務(wù)上,這樣做的原因是廣播狀態(tài)能確保所有任務(wù)的狀態(tài)相同。在縮容的情況下,由于狀態(tài)經(jīng)過復(fù)制不會(huì)丟失,我們可以簡(jiǎn)單的停掉多出的任務(wù)。
2.算子狀態(tài)的使用
public class StateTest1_OperatorState { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // socket文本流 DataStreaminputStream = env.socketTextStream("localhost", 7777); // 轉(zhuǎn)換成SensorReading類型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定義一個(gè)有狀態(tài)的map操作,統(tǒng)計(jì)當(dāng)前分區(qū)數(shù)據(jù)個(gè)數(shù) SingleOutputStreamOperator resultStream = dataStream.map(new MyCountMapper()); resultStream.print(); env.execute(); } // 自定義MapFunction public static class MyCountMapper implements MapFunction , ListCheckpointed { // 定義一個(gè)本地變量,作為算子狀態(tài) private Integer count = 0; @Override public Integer map(SensorReading value) throws Exception { count++; return count; } @Override public List snapshotState(long checkpointId, long timestamp) throws Exception { return Collections.singletonList(count); } @Override public void restoreState(List state) throws Exception { for( Integer num: state ) count += num; } } }
算子狀態(tài)的定義和普通的成員變量定義相同,但是對(duì)應(yīng)的算子處理函數(shù)要繼承對(duì)應(yīng)的接口,例如ListCheckpointed,自定義狀態(tài)進(jìn)行快照和恢復(fù)的邏輯。
以上是“Flink 算子狀態(tài)怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
網(wǎng)頁標(biāo)題:Flink算子狀態(tài)怎么用
鏈接地址:http://www.ef60e0e.cn/article/gcigih.html