新聞中心
??spark提供了對(duì)數(shù)據(jù)的核心抽象——彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset,簡(jiǎn)稱(chēng)RDD)。RDD是一個(gè)分布式的數(shù)據(jù)集合,數(shù)據(jù)可以跨越集群中的多個(gè)機(jī)器節(jié)點(diǎn),被分區(qū)并行執(zhí)行。
?在spark中,對(duì)數(shù)據(jù)的所有操作不外乎創(chuàng)建RDD、轉(zhuǎn)化已有RDD及調(diào)用RDD操作進(jìn)行求值。spark會(huì)自動(dòng)地將RDD中的數(shù)據(jù)分發(fā)到集群中并行執(zhí)行。
專(zhuān)注于為中小企業(yè)提供網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)金牛免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千多家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
五大特性
- a list of partitions
?RDD是一個(gè)由多個(gè)partition(某個(gè)節(jié)點(diǎn)里的某一片連續(xù)的數(shù)據(jù))組成的的list;將數(shù)據(jù)加載為RDD時(shí),一般會(huì)遵循數(shù)據(jù)的本地性(一般一個(gè)hdfs里的block會(huì)加載為一個(gè)partition)。 - a function for computing each split
?RDD的每個(gè)partition中都會(huì)有function,即函數(shù)應(yīng)用,其作用是實(shí)現(xiàn)RDD之間partition的轉(zhuǎn)換。 - a list of dependencies on other RDDs
?RDD會(huì)記錄它的依賴(lài),為了容錯(cuò)(重算,cache,checkpoint),即內(nèi)存中的RDD操作出錯(cuò)或丟失時(shí)會(huì)進(jìn)行重算。 - Optionally,a Partitioner for Key-value RDDs
?可選項(xiàng),如果RDD里面存的數(shù)據(jù)是key-value形式,則可以傳遞一個(gè)自定義的Partitioner進(jìn)行重新分區(qū),例如自定義的Partitioner是基于key進(jìn)行分區(qū),那則會(huì)將不同RDD里面的相同key的數(shù)據(jù)放到同一個(gè)partition里面。 - Optionally, a list of preferred locations to compute each split on
?可選項(xiàng),最優(yōu)的位置去計(jì)算每個(gè)分片,即數(shù)據(jù)的本地性。創(chuàng)建RDD
??spark提供了兩種創(chuàng)建RDD的方式:讀取外部數(shù)據(jù)源、將驅(qū)動(dòng)器程序中的集合進(jìn)行并行化。
并行化集合
??使用sparkContext的parallelize()方法將集合并行化。
?parallelize()方法第二個(gè)參數(shù)可指定分區(qū)數(shù)。spark會(huì)為每個(gè)分區(qū)創(chuàng)建一個(gè)task任務(wù),通常每個(gè)cpu需要2-4個(gè)分區(qū)。spark會(huì)自動(dòng)地根據(jù)集群大小設(shè)置分區(qū)數(shù),也支持通過(guò)parallelize()方法的第二個(gè)參數(shù)手動(dòng)指定。scala
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
java
List
data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD distData = sc.parallelize(data); python
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
??注:除了開(kāi)發(fā)和測(cè)試外,這種方式用得不多。這種方式需要把整個(gè)數(shù)據(jù)集先放到一臺(tái)機(jī)器的內(nèi)存中。
讀取外部數(shù)據(jù)源
??spark可接入多種hadoop支持的數(shù)據(jù)源來(lái)創(chuàng)建分布式數(shù)據(jù)集。包括:本地文件系統(tǒng)、HDFS、Cassandra、HBase、Amazon S3等。
?spark支持多種存儲(chǔ)格式,包括textFiles、SequenceFiles及其他hadoop存儲(chǔ)格式。scala
scala> val distFile = sc.textFile("data.txt") distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at
:26 java
JavaRDD
distFile = sc.textFile("data.txt"); python
>>> distFile = sc.textFile("data.txt")
RDD操作
??RDD支持兩種操作:轉(zhuǎn)化操作和行動(dòng)操作。
轉(zhuǎn)化操作
??RDD的轉(zhuǎn)化操作會(huì)返回一個(gè)新的RDD。轉(zhuǎn)化操作是惰性求值的,只有行動(dòng)操作用到轉(zhuǎn)化操作生成的RDD時(shí),才會(huì)真正進(jìn)行轉(zhuǎn)化。
?spark使用lineage(血統(tǒng))來(lái)記錄轉(zhuǎn)化操作生成的不同RDD之間的依賴(lài)關(guān)系。依賴(lài)分為窄依賴(lài)(narrow dependencies)和寬依賴(lài)(wide dependencies)。
- 窄依賴(lài)
- 子RDD的每個(gè)分區(qū)依賴(lài)于常數(shù)個(gè)父分區(qū)
- 輸入輸出一對(duì)一,結(jié)果RDD的分區(qū)結(jié)構(gòu)不變,主要是map、flatMap
- 輸入輸出一對(duì)一,但結(jié)果RDD的分區(qū)結(jié)構(gòu)發(fā)生變化,如union、coalesce
- 從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample
寬依賴(lài)
- 子RDD的每個(gè)分區(qū)依賴(lài)于所有父RDD分區(qū)
- 對(duì)單個(gè)RDD基于key進(jìn)行重組和reduce,如groupByKey、reduceByKey
對(duì)兩個(gè)RDD基于key進(jìn)行合并和重組,如join
行動(dòng)操作
??行動(dòng)操作則會(huì)向驅(qū)動(dòng)器程序返回結(jié)果或把結(jié)果寫(xiě)入外部系統(tǒng),會(huì)觸發(fā)實(shí)際的計(jì)算。
緩存方式
??RDD通過(guò)persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。
?cache最終也是調(diào)用了persist方法,默認(rèn)的存儲(chǔ)級(jí)別是僅在內(nèi)存存儲(chǔ)一份。
?Spark的存儲(chǔ)級(jí)別還有好多種,存儲(chǔ)級(jí)別在object StorageLevel中定義的。
?緩存有可能丟失,RDD的緩存容錯(cuò)機(jī)制保證即使緩存丟失也能保證計(jì)算正確執(zhí)行。通過(guò)基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會(huì)被重算,由于RDD的各個(gè)Partition是相對(duì)獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。容錯(cuò)機(jī)制
Lineage機(jī)制
RDD的Lineage記錄的是粗粒度的特定數(shù)據(jù)Transformation操作行為。當(dāng)RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),可以通過(guò)Lineage來(lái)重新運(yùn)算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運(yùn)用場(chǎng)合,所以Spark并不適用于所有高性能要求的場(chǎng)景,但同時(shí)相比細(xì)顆粒度的數(shù)據(jù)模型,也帶來(lái)了性能的提升。
Spark Lineage機(jī)制是通過(guò)RDD的依賴(lài)關(guān)系來(lái)執(zhí)行的
窄依賴(lài)可以在某個(gè)計(jì)算節(jié)點(diǎn)上直接通過(guò)計(jì)算父RDD的某塊數(shù)據(jù)計(jì)算得到子RDD對(duì)應(yīng)的某塊數(shù)據(jù)。
- 寬依賴(lài)則要等到父RDD所有數(shù)據(jù)都計(jì)算完成后,將父RDD的計(jì)算結(jié)果進(jìn)行hash并傳到對(duì)應(yīng)節(jié)點(diǎn)上之后才能計(jì)算子RDD。寬依賴(lài)要將祖先RDD中的所有數(shù)據(jù)塊全部重新計(jì)算,所以在長(zhǎng)“血統(tǒng)”鏈特別是有寬依賴(lài)的時(shí)候,需要在適當(dāng)?shù)臅r(shí)機(jī)設(shè)置數(shù)據(jù)檢查點(diǎn)。
Checkpoint機(jī)制
簡(jiǎn)介
- 當(dāng)RDD的action算子觸發(fā)計(jì)算結(jié)束后會(huì)執(zhí)行checkpoint;Task計(jì)算失敗的時(shí)候會(huì)從checkpoint讀取數(shù)據(jù)進(jìn)行計(jì)算。
實(shí)現(xiàn)方式(checkpoint有兩種實(shí)現(xiàn)方式,如果代碼中沒(méi)有設(shè)置checkpoint,則使用local的checkpoint模式,如果設(shè)置路徑,則使用reliable的checkpoint模式。)
LocalRDDCheckpointData:臨時(shí)存儲(chǔ)在本地executor的磁盤(pán)和內(nèi)存上。該實(shí)現(xiàn)的特點(diǎn)是比較快,適合lineage信息需要經(jīng)常被刪除的場(chǎng)景(如GraphX),可容忍executor掛掉。
- ReliableRDDCheckpointData:存儲(chǔ)在外部可靠存儲(chǔ)(如hdfs),可以達(dá)到容忍driver 掛掉情況。雖然效率沒(méi)有存儲(chǔ)本地高,但是容錯(cuò)級(jí)別最好。
忠于技術(shù),熱愛(ài)分享。歡迎關(guān)注公眾號(hào):java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。
網(wǎng)站欄目:5.sparkcore之RDD編程
本文地址:http://www.ef60e0e.cn/article/jpjcid.html
其他資訊
- 詳解springcloud中使用Ribbon實(shí)現(xiàn)客戶端的軟負(fù)載均衡-創(chuàng)新互聯(lián)
- 微信小程序中實(shí)現(xiàn)動(dòng)態(tài)綁定數(shù)據(jù)及動(dòng)態(tài)事件處理的示例分析-創(chuàng)新互聯(lián)
- html5的優(yōu)點(diǎn)與缺點(diǎn)大概總結(jié)-創(chuàng)新互聯(lián)
- 【三】6.Android中Context的理解及使用-創(chuàng)新互聯(lián)
- 原生js+jquery+ajax請(qǐng)求以及jsonp如何調(diào)用-創(chuàng)新互聯(lián)