新聞中心
Spark Streaming的事務(wù)處理和關(guān)系型數(shù)據(jù)庫的事務(wù)的概念有所不同,關(guān)系型數(shù)據(jù)庫事務(wù)關(guān)注的是語句級別的一致性,例如銀行轉(zhuǎn)賬。而Spark Streaming的事務(wù)關(guān)注的是某次job執(zhí)行的一致性。也就是如何保證Job在處理數(shù)據(jù)的過程中做到如下兩點:
創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、壽光網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5響應(yīng)式網(wǎng)站、電子商務(wù)商城網(wǎng)站建設(shè)、集團公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為壽光等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。不丟失數(shù)據(jù)
不重復(fù)處理數(shù)據(jù)
SparkStreaming程序執(zhí)行架構(gòu)大致如下:
一、我們先來說說丟失數(shù)據(jù)的情況:
Receiver接收到數(shù)據(jù)后,首先會在Executor級別上保存數(shù)據(jù)(根據(jù)StorageLevel的設(shè)置),例如socketTextStream的Receiver。在內(nèi)存和磁盤上保留2份副本數(shù)據(jù)
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) }
如果StorageLevel設(shè)置的是只進行內(nèi)存級別的存儲,那么當(dāng)程序崩潰后,即便對Driver進行了Checkpoint,然后重新啟動程序。該部分數(shù)據(jù)也會丟失。因為Driver的Checkpoint并不對計算數(shù)據(jù)進行保存。
我們假設(shè)StorageLevel設(shè)置了磁盤級別的存儲,也不能完全保證數(shù)據(jù)不被丟失,因為Receiver并不是接收一條數(shù)據(jù)寫一次磁盤,而是按照數(shù)據(jù)塊為單位寫數(shù)據(jù)。然后將數(shù)據(jù)塊的元數(shù)據(jù)信息發(fā)送給Driver,Driver的Checkpoint記錄的數(shù)Block的元數(shù)據(jù)信息。當(dāng)數(shù)據(jù)塊寫到一半的時候,或者是元數(shù)據(jù)還沒有發(fā)送給Driver的時候,Executor崩潰了,數(shù)據(jù)也就丟失啦。
解決方案:為了減少這種情況的發(fā)送,可以在Receiver端引入WAL寫機制,因為WAL寫的頻率要比數(shù)據(jù)塊的頻率高的多。這樣,當(dāng)Executor恢復(fù)的時候,可以讀取WAL日志恢復(fù)數(shù)據(jù)塊。
但是通過WAL方式會極大的損傷Spark Streaming中Receivers接受數(shù)據(jù)的性能;
WAL也不能完全的解決數(shù)據(jù)丟失的問題,就像Oracle一樣,日志文件的寫,也是先寫到內(nèi)存中,然后根據(jù)一定的觸發(fā)條件再將數(shù)據(jù)寫到磁盤。如果還沒有來的及寫WAL日志,此時數(shù)據(jù)也會有不一致的情況(數(shù)據(jù)已經(jīng)接收,但是還沒有寫到WAL的這部分數(shù)據(jù)是恢復(fù)不出來的。)。
Spark Streaming 1.3的時候為了避免WAL的性能損失和實現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲系統(tǒng)!!!此時兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢,至此,Spark Streaming+Kafka就構(gòu)建了完美的流處理世界!!!所有的Executors通過Kafka API直接消費數(shù)據(jù),直接管理Offset,所以也不會重復(fù)消費數(shù)據(jù);事務(wù)實現(xiàn)啦!!!
2. Driver崩潰,此時Job正在處理的數(shù)據(jù),包括Receiver已經(jīng)接收到還未被處理的數(shù)據(jù)將全部丟失。
解決方案:對Driver進行Checkpoint,此處的Checkpoint和RDD的Checkpoint并不一樣。
我們看看Checkpoint都包含哪些屬性:
private[streaming] class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll
其中g(shù)raph是DStreamGraph的實例化,它里面包含了InputDStream
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
我們以DirectKafkaInputDStream為例,其中包含了checkpointData
protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData
其中只是包含:
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] }
就是每個batch 的唯一標(biāo)識 time 對象,以及每個KafkaRDD對應(yīng)的的Kafka偏移信息。
所以:
checkpoint 是非常高效的。沒有涉及到實際數(shù)據(jù)的存儲。一般大小只有幾十K,因為只存了Kafka的偏移量等信息。
checkpoint 采用的是序列化機制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函數(shù)應(yīng)該也會被序列化。如果采用了CheckPoint機制,而你的程序包做了做了變更,恢復(fù)后可能會有一定的問題。
二、關(guān)于數(shù)據(jù)重復(fù)處理涉及兩個方面:
數(shù)據(jù)被重復(fù)讀取:在使用Kafka的情況下,Receiver收到數(shù)據(jù)且保存到了HDFS等持久化引擎但是沒有來得及進行updateOffsets,此時Receiver崩潰后重新啟動就會通過管理Kafka的ZooKeeper中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時SparkStreaming認為是成功的,但是Kafka認為是失敗的(因為沒有更新offset到ZooKeeper中),此時就會導(dǎo)致數(shù)據(jù)重新消費的情況。
數(shù)據(jù)輸出多次重寫
為什么會有這個問題,因為Spark Streaming在計算的時候基于Spark Core,Spark Core天生會做以下事情導(dǎo)致Spark Streaming的部分結(jié)果重復(fù)輸出(例如數(shù)據(jù)輸出后,該Task的后續(xù)程序發(fā)生錯誤,而任務(wù)發(fā)生錯誤,Spark Core會進入如下程序):
Task重試;慢任務(wù)推測(兩個相同任務(wù)可能會同時執(zhí)行),Stage重復(fù);Job重試;
具體解決方案:
設(shè)置spark.task.maxFailures次數(shù)為1;
設(shè)置spark.speculation為關(guān)閉狀態(tài)(因為慢任務(wù)推測其實非常消耗性能,所以關(guān)閉后可以顯著提高Spark Streaming處理性能)
Spark Streaming on Kafka的話,Job失敗后可以設(shè)置auto.offset.reset為“l(fā)argest”的方式;
Exactly Once的事務(wù)處理必須滿足:
Receiver數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來源和可靠的Receiver,且通過WAL來保證數(shù)據(jù)安全。
整個應(yīng)用程序的metadata必須進行checkpoint;
最后再次強調(diào)可以通過transform和foreachRDD基于業(yè)務(wù)邏輯代碼進行邏輯控制來實現(xiàn)數(shù)據(jù)不重復(fù)消費和輸出不重復(fù)!這兩個方式類似于Spark Streaming的后門,可以做任意想象的控制操作!
備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
分享文章:第4課:SparkStreaming的Exactly-One的事務(wù)處理-創(chuàng)新互聯(lián)
文章源于:http://www.ef60e0e.cn/article/ccsode.html