1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問(wèn)題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
      生產(chǎn)常用Spark累加器剖析之二-創(chuàng)新互聯(lián)

      Driver端

      創(chuàng)新互聯(lián)科技有限公司專業(yè)互聯(lián)網(wǎng)基礎(chǔ)服務(wù)商,為您提供簡(jiǎn)陽(yáng)服務(wù)器托管高防服務(wù)器租用,成都IDC機(jī)房托管,成都主機(jī)托管等互聯(lián)網(wǎng)服務(wù)。
      1. Driver端初始化構(gòu)建Accumulator并初始化,同時(shí)完成了Accumulator注冊(cè),Accumulators.register(this)時(shí)Accumulator會(huì)在序列化后發(fā)送到Executor端
      2. Driver接收到ResultTask完成的狀態(tài)更新后,會(huì)去更新Value的值 然后在Action操作執(zhí)行后就可以獲取到Accumulator的值了

      Executor端

      1. Executor端接收到Task之后會(huì)進(jìn)行反序列化操作,反序列化得到RDD和function。同時(shí)在反序列化的同時(shí)也去反序列化Accumulator(在readObject方法中完成),同時(shí)也會(huì)向TaskContext完成注冊(cè)
      2. 完成任務(wù)計(jì)算之后,隨著Task結(jié)果一起返回給Driver

      結(jié)合源碼分析

      Driver端初始化

      ??Driver端主要經(jīng)過(guò)以下步驟,完成初始化操作:

      val accum = sparkContext.accumulator(0, “AccumulatorTest”)
      val acc = new Accumulator(initialValue, param, Some(name))
      Accumulators.register(this)

      Executor端反序列化得到Accumulator

      ??反序列化是在調(diào)用ResultTask的runTask方式時(shí)候做的操作:

      // 會(huì)反序列化出來(lái)RDD和自己定義的function
      val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
         ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

      ??在反序列化的過(guò)程中,會(huì)調(diào)用Accumulable中的readObject方法:

      private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
          in.defaultReadObject()
          // value的初始值為zero;該值是會(huì)被序列化的
          value_ = zero
          deserialized = true
          // Automatically register the accumulator when it is deserialized with the task closure.
          //
          // Note internal accumulators sent with task are deserialized before the TaskContext is created
          // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
          // metrics, still need to register here.
          val taskContext = TaskContext.get()
          if (taskContext != null) {
            // 當(dāng)前反序列化所得到的對(duì)象會(huì)被注冊(cè)到TaskContext中
            // 這樣TaskContext就可以獲取到累加器
            // 任務(wù)運(yùn)行結(jié)束之后,就可以通過(guò)context.collectAccumulators()返回給executor
            taskContext.registerAccumulator(this)
          }
        }

      注意

      Accumulable.scala中的value_,是不會(huì)被序列化的,@transient關(guān)鍵詞修飾了

      @volatile @transient private var value_ : R = initialValue // Current value on master

      累加器在各個(gè)節(jié)點(diǎn)的累加操作

      針對(duì)傳入function中不同的操作,對(duì)應(yīng)有不同的調(diào)用方法,以下列舉幾種(在Accumulator.scala中):

      def += (term: T) { value_ = param.addAccumulator(value_, term) }
      def add(term: T) { value_ = param.addAccumulator(value_, term) }
      def ++= (term: R) { value_ = param.addInPlace(value_, term)}

      根據(jù)不同的累加器參數(shù),有不同實(shí)現(xiàn)的AccumulableParam(在Accumulator.scala中):

      trait AccumulableParam[R, T] extends Serializable {
        /**
        def addAccumulator(r: R, t: T): R
        def addInPlace(r1: R, r2: R): R
        def zero(initialValue: R): R
      }

      不同的實(shí)現(xiàn)如下圖所示:
      生產(chǎn)常用Spark累加器剖析之二
      以IntAccumulatorParam為例:

      implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
        def addInPlace(t1: Int, t2: Int): Int = t1 + t2
        def zero(initialValue: Int): Int = 0
      }

      我們發(fā)現(xiàn)IntAccumulatorParam實(shí)現(xiàn)的是trait AccumulatorParam[T]:

      trait AccumulatorParam[T] extends AccumulableParam[T, T] {
        def addAccumulator(t1: T, t2: T): T = {
          addInPlace(t1, t2)
        }
      }

      在各個(gè)節(jié)點(diǎn)上的累加操作完成之后,就會(huì)緊跟著返回更新之后的Accumulators的value_值

      聚合操作

      在Task.scala中的run方法,會(huì)執(zhí)行如下:

      // 返回累加器,并運(yùn)行task
      // 調(diào)用TaskContextImpl的collectAccumulators,返回值的類型為一個(gè)Map
      (runTask(context), context.collectAccumulators())

      在Executor端已經(jīng)完成了一系列操作,需要將它們的值返回到Driver端進(jìn)行聚合匯總,整個(gè)順序如圖累加器執(zhí)行流程:
      生產(chǎn)常用Spark累加器剖析之二
      根據(jù)執(zhí)行流程,我們可以發(fā)現(xiàn),在執(zhí)行完collectAccumulators方法之后,最終會(huì)在DAGScheduler中調(diào)用updateAccumulators(event),而在該方法中會(huì)調(diào)用Accumulators的add方法,從而完成聚合操作:

      def add(values: Map[Long, Any]): Unit = synchronized {
        // 遍歷傳進(jìn)來(lái)的值
        for ((id, value) <- values) {
          if (originals.contains(id)) {
            // Since we are now storing weak references, we must check whether the underlying data
            // is valid.
            // 根據(jù)id從注冊(cè)的Map中取出對(duì)應(yīng)的累加器
            originals(id).get match {
              // 將值給累加起來(lái),最終將結(jié)果加到value里面
             // ++=是被重載了
              case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
              case None =>
                throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
            }
          } else {
            logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
          }
        }
      }

      獲取累加器的值

      通過(guò)accum.value方法可以獲取到累加器的值

      至此,累加器執(zhí)行完畢。

      另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。


      分享文章:生產(chǎn)常用Spark累加器剖析之二-創(chuàng)新互聯(lián)
      瀏覽路徑:http://www.ef60e0e.cn/article/dpgope.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        潼南县| 旬阳县| 达州市| 宾阳县| 汶川县| 邵东县| 澄江县| 贵定县| 观塘区| 藁城市| 革吉县| 邹平县| 英吉沙县| 清水县| 吴川市| 恩平市| 高安市| 湖州市| 四会市| 漳州市| 大宁县| 台南县| 治多县| 永川市| 大渡口区| 扎赉特旗| 西宁市| 桐城市| 安达市| 洪湖市| 金门县| 吉安县| 梅河口市| 厦门市| 策勒县| 天柱县| 宁化县| 唐河县| 新蔡县| 石景山区| 汽车|