1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      第4課:SparkStreaming的Exactly-One的事務(wù)處理-創(chuàng)新互聯(lián)

      Spark Streaming的事務(wù)處理和關(guān)系型數(shù)據(jù)庫的事務(wù)的概念有所不同,關(guān)系型數(shù)據(jù)庫事務(wù)關(guān)注的是語句級別的一致性,例如銀行轉(zhuǎn)賬。而Spark Streaming的事務(wù)關(guān)注的是某次job執(zhí)行的一致性。也就是如何保證Job在處理數(shù)據(jù)的過程中做到如下兩點:

      創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、壽光網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5響應(yīng)式網(wǎng)站電子商務(wù)商城網(wǎng)站建設(shè)、集團公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為壽光等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
      • 不丟失數(shù)據(jù)

      • 不重復(fù)處理數(shù)據(jù)

      SparkStreaming程序執(zhí)行架構(gòu)大致如下:

      第4課:Spark Streaming的Exactly-One的事務(wù)處理

      一、我們先來說說丟失數(shù)據(jù)的情況:

      1. Receiver接收到數(shù)據(jù)后,首先會在Executor級別上保存數(shù)據(jù)(根據(jù)StorageLevel的設(shè)置),例如socketTextStream的Receiver。在內(nèi)存和磁盤上保留2份副本數(shù)據(jù)

      def socketTextStream(
          hostname: String,
          port: Int,
          storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
        ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
        socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
      }

      如果StorageLevel設(shè)置的是只進行內(nèi)存級別的存儲,那么當(dāng)程序崩潰后,即便對Driver進行了Checkpoint,然后重新啟動程序。該部分數(shù)據(jù)也會丟失。因為Driver的Checkpoint并不對計算數(shù)據(jù)進行保存。

      我們假設(shè)StorageLevel設(shè)置了磁盤級別的存儲,也不能完全保證數(shù)據(jù)不被丟失,因為Receiver并不是接收一條數(shù)據(jù)寫一次磁盤,而是按照數(shù)據(jù)塊為單位寫數(shù)據(jù)。然后將數(shù)據(jù)塊的元數(shù)據(jù)信息發(fā)送給Driver,Driver的Checkpoint記錄的數(shù)Block的元數(shù)據(jù)信息。當(dāng)數(shù)據(jù)塊寫到一半的時候,或者是元數(shù)據(jù)還沒有發(fā)送給Driver的時候,Executor崩潰了,數(shù)據(jù)也就丟失啦。

      解決方案:為了減少這種情況的發(fā)送,可以在Receiver端引入WAL寫機制,因為WAL寫的頻率要比數(shù)據(jù)塊的頻率高的多。這樣,當(dāng)Executor恢復(fù)的時候,可以讀取WAL日志恢復(fù)數(shù)據(jù)塊。

      但是通過WAL方式會極大的損傷Spark Streaming中Receivers接受數(shù)據(jù)的性能;

      WAL也不能完全的解決數(shù)據(jù)丟失的問題,就像Oracle一樣,日志文件的寫,也是先寫到內(nèi)存中,然后根據(jù)一定的觸發(fā)條件再將數(shù)據(jù)寫到磁盤。如果還沒有來的及寫WAL日志,此時數(shù)據(jù)也會有不一致的情況(數(shù)據(jù)已經(jīng)接收,但是還沒有寫到WAL的這部分數(shù)據(jù)是恢復(fù)不出來的。)。

      Spark Streaming 1.3的時候為了避免WAL的性能損失和實現(xiàn)Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲系統(tǒng)!!!此時兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢,至此,Spark Streaming+Kafka就構(gòu)建了完美的流處理世界!!!所有的Executors通過Kafka API直接消費數(shù)據(jù),直接管理Offset,所以也不會重復(fù)消費數(shù)據(jù);事務(wù)實現(xiàn)啦!!!

      2. Driver崩潰,此時Job正在處理的數(shù)據(jù),包括Receiver已經(jīng)接收到還未被處理的數(shù)據(jù)將全部丟失。

      解決方案:對Driver進行Checkpoint,此處的Checkpoint和RDD的Checkpoint并不一樣。

      我們看看Checkpoint都包含哪些屬性:

      private[streaming]
      class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
        extends Logging with Serializable {
        val master = ssc.sc.master
        val framework = ssc.sc.appName
        val jars = ssc.sc.jars
        val graph = ssc.graph
        val checkpointDir = ssc.checkpointDir
        val checkpointDuration = ssc.checkpointDuration
        val pendingTimes = ssc.scheduler.getPendingTimes().toArray
        val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
        val sparkConfPairs = ssc.conf.getAll

      其中g(shù)raph是DStreamGraph的實例化,它里面包含了InputDStream

      private val inputStreams = new ArrayBuffer[InputDStream[_]]()

      我們以DirectKafkaInputDStream為例,其中包含了checkpointData

      protected[streaming] override val checkpointData =
        new DirectKafkaInputDStreamCheckpointData

      其中只是包含:

      class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
        def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
          data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
        }

      就是每個batch 的唯一標(biāo)識 time 對象,以及每個KafkaRDD對應(yīng)的的Kafka偏移信息。

      所以:

       checkpoint 是非常高效的。沒有涉及到實際數(shù)據(jù)的存儲。一般大小只有幾十K,因為只存了Kafka的偏移量等信息。

       checkpoint 采用的是序列化機制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函數(shù)應(yīng)該也會被序列化。如果采用了CheckPoint機制,而你的程序包做了做了變更,恢復(fù)后可能會有一定的問題。

      二、關(guān)于數(shù)據(jù)重復(fù)處理涉及兩個方面:

      1. 數(shù)據(jù)被重復(fù)讀取:在使用Kafka的情況下,Receiver收到數(shù)據(jù)且保存到了HDFS等持久化引擎但是沒有來得及進行updateOffsets,此時Receiver崩潰后重新啟動就會通過管理Kafka的ZooKeeper中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時SparkStreaming認為是成功的,但是Kafka認為是失敗的(因為沒有更新offset到ZooKeeper中),此時就會導(dǎo)致數(shù)據(jù)重新消費的情況。

      2. 數(shù)據(jù)輸出多次重寫

        為什么會有這個問題,因為Spark Streaming在計算的時候基于Spark Core,Spark Core天生會做以下事情導(dǎo)致Spark Streaming的部分結(jié)果重復(fù)輸出(例如數(shù)據(jù)輸出后,該Task的后續(xù)程序發(fā)生錯誤,而任務(wù)發(fā)生錯誤,Spark Core會進入如下程序):

        Task重試;慢任務(wù)推測(兩個相同任務(wù)可能會同時執(zhí)行),Stage重復(fù);Job重試;

       具體解決方案:

      設(shè)置spark.task.maxFailures次數(shù)為1;

      設(shè)置spark.speculation為關(guān)閉狀態(tài)(因為慢任務(wù)推測其實非常消耗性能,所以關(guān)閉后可以顯著提高Spark Streaming處理性能)

      Spark Streaming on Kafka的話,Job失敗后可以設(shè)置auto.offset.reset為“l(fā)argest”的方式;

      Exactly Once的事務(wù)處理必須滿足:

      1. Receiver數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來源和可靠的Receiver,且通過WAL來保證數(shù)據(jù)安全。

      2. 整個應(yīng)用程序的metadata必須進行checkpoint;

      最后再次強調(diào)可以通過transform和foreachRDD基于業(yè)務(wù)邏輯代碼進行邏輯控制來實現(xiàn)數(shù)據(jù)不重復(fù)消費和輸出不重復(fù)!這兩個方式類似于Spark Streaming的后門,可以做任意想象的控制操作!

      備注:

      1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
      2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
      3、新浪微博: http://www.weibo.com/ilovepains

      另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


      分享文章:第4課:SparkStreaming的Exactly-One的事務(wù)處理-創(chuàng)新互聯(lián)
      文章源于:http://www.ef60e0e.cn/article/ccsode.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        台东市| 高邑县| 当涂县| 津南区| 怀宁县| 高青县| 卫辉市| 湛江市| 安泽县| 抚远县| 太原市| 北辰区| 钦州市| 股票| 丽江市| 井冈山市| 五常市| 禹城市| 昆明市| 灌南县| 交口县| 奉化市| 平阳县| 呼图壁县| 龙井市| 会泽县| 德惠市| 嫩江县| 洪泽县| 梓潼县| 桓台县| 繁昌县| 白朗县| 名山县| 宝清县| 祁东县| 容城县| 那曲县| 友谊县| 芦溪县| 和田市|