1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢(xún)
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問(wèn)題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
      (版本定制)第12課:SparkStreaming源碼解讀之Executor容錯(cuò)安全性

      本期內(nèi)容:

      創(chuàng)新互聯(lián)是一家以網(wǎng)絡(luò)技術(shù)公司,為中小企業(yè)提供網(wǎng)站維護(hù)、成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、網(wǎng)站備案、服務(wù)器租用、國(guó)際域名空間、軟件開(kāi)發(fā)、微信小程序開(kāi)發(fā)等企業(yè)互聯(lián)網(wǎng)相關(guān)業(yè)務(wù),是一家有著豐富的互聯(lián)網(wǎng)運(yùn)營(yíng)推廣經(jīng)驗(yàn)的科技公司,有著多年的網(wǎng)站建站經(jīng)驗(yàn),致力于幫助中小企業(yè)在互聯(lián)網(wǎng)讓打出自已的品牌和口碑,讓企業(yè)在互聯(lián)網(wǎng)上打開(kāi)一個(gè)面向全國(guó)乃至全球的業(yè)務(wù)窗口:建站歡迎聯(lián)系:028-86922220

          1、Executor的WAL容錯(cuò)機(jī)制

          2、消息重放

      Executor的安全容錯(cuò)主要是數(shù)據(jù)的安全容錯(cuò),那為什么不考慮數(shù)據(jù)計(jì)算的安全容錯(cuò)呢?

      原因是計(jì)算的時(shí)候Spark Streaming是借助于Spark Core上RDD的安全容錯(cuò)的,所以天然的安全可靠的。

      Executor的安全容錯(cuò)主要有:

          1、數(shù)據(jù)副本:

               有兩種方式:a.借助底層的BlockManager,BlockManager做備份,通過(guò)傳入的StorageLevel進(jìn)行備份。

                                    b. WAL方式進(jìn)行容錯(cuò)。

          2、接受到數(shù)據(jù)之后,不做副本,但是數(shù)據(jù)源支持存放,所謂存放就是可以反復(fù)的讀取源數(shù)據(jù)。

      容錯(cuò)的弊端:耗時(shí)間、耗空間。

          

      簡(jiǎn)單的看下源代碼:

      /** Store block and report it to driver */
      def pushAndReportBlock(
          receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
        ) {
      val blockId = blockIdOption.getOrElse(nextBlockId)
      val time = System.currentTimeMillis
      val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
        logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
      val numRecords = blockStoreResult.numRecords
      val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
      trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
        logDebug(s"Reported block $blockId")
      }

      private val receivedBlockHandler: ReceivedBlockHandler = {
      if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
      throw new SparkException(
      "Cannot enable receiver write-ahead log without checkpoint directory set. " +
      "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
      "See documentation for more details.")
          }
      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通過(guò)WAL容錯(cuò)
        } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通過(guò)BlockManager進(jìn)行容錯(cuò)
        }
      }
      def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
      var numRecords = None: Option[Long]
      val putResult: Seq[(BlockId, BlockStatus)] = block match {
      case ArrayBufferBlock(arrayBuffer) =>
            numRecords = Some(arrayBuffer.size.toLong)
            blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
      tellMaster = true)
      case IteratorBlock(iterator) =>
      val countIterator = new CountingIterator(iterator)
      val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
      tellMaster = true)
            numRecords = countIterator.count
            putResult
      case ByteBufferBlock(byteBuffer) =>
            blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
      case o =>
      throw new SparkException(
      s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
        }
      if (!putResult.map { _._1 }.contains(blockId)) {
      throw new SparkException(
      s"Could not store $blockId to block manager with storage level $storageLevel")
        }
      BlockManagerBasedStoreResult(blockId, numRecords)
      }

      簡(jiǎn)單流程圖:

      (版本定制)第12課:Spark Streaming源碼解讀之Executor容錯(cuò)安全性(版本定制)第12課:Spark Streaming源碼解讀之Executor容錯(cuò)安全性

      參考博客:http://blog.csdn.net/hanburgud/article/details/51471089


      本文標(biāo)題:(版本定制)第12課:SparkStreaming源碼解讀之Executor容錯(cuò)安全性
      轉(zhuǎn)載源于:http://www.ef60e0e.cn/article/pijshh.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        肇源县| 重庆市| 通江县| 安阳县| 镇平县| 天镇县| 广昌县| 浑源县| 长兴县| 永吉县| 玛曲县| 丘北县| 明星| 喀喇| 鄯善县| 长沙市| 南丹县| 贵溪市| 盖州市| 平利县| 舞钢市| 察雅县| 将乐县| 饶平县| 顺平县| 梁河县| 汪清县| 汉沽区| 正定县| 浙江省| 石阡县| 龙山县| 南靖县| 汝南县| 武陟县| 三门县| 高雄市| 道孚县| 无极县| 两当县| 广德县|