1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問(wèn)題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
      Kafka的Lag計(jì)算誤區(qū)及正確實(shí)現(xiàn)

      前言

      消息堆積是消息中間件的一大特色,消息中間件的流量削峰、冗余存儲(chǔ)等功能正是得益于消息中間件的消息堆積能力。然而消息堆積其實(shí)是一把亦正亦邪的雙刃劍,如果應(yīng)用場(chǎng)合不恰當(dāng)反而會(huì)對(duì)上下游的業(yè)務(wù)造成不必要的麻煩,比如消息堆積勢(shì)必會(huì)影響上下游整個(gè)調(diào)用鏈的時(shí)效性,有些中間件如RabbitMQ在發(fā)生消息堆積時(shí)在某些情況下還會(huì)影響自身的性能。對(duì)于Kafka而言,雖然消息堆積不會(huì)對(duì)其自身性能帶來(lái)多大的困擾,但難免不會(huì)影響上下游的業(yè)務(wù),堆積過(guò)多有可能會(huì)造成磁盤爆滿,或者觸發(fā)日志清除策略而造成消息丟失的情況。如何利用好消息堆積這把雙刃劍,監(jiān)控是最為關(guān)鍵的一步。

      成都創(chuàng)新互聯(lián)是一家專業(yè)提供臨猗企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、H5網(wǎng)站設(shè)計(jì)、小程序制作等業(yè)務(wù)。10年已為臨猗眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。

      正文

      消息堆積是消費(fèi)滯后(Lag)的一種表現(xiàn)形式,消息中間件服務(wù)端中所留存的消息與消費(fèi)掉的消息之間的差值即為消息堆積量,也稱之為消費(fèi)滯后(Lag)量。對(duì)于Kafka而言,消息被發(fā)送至Topic中,而Topic又分成了多個(gè)分區(qū)(Partition),每一個(gè)Partition都有一個(gè)預(yù)寫式的日志文件,雖然Partition可以繼續(xù)細(xì)分為若干個(gè)段文件(Segment),但是對(duì)于上層應(yīng)用來(lái)說(shuō)可以將Partition看成最小的存儲(chǔ)單元(一個(gè)由多個(gè)Segment文件拼接的“巨型文件”)。每個(gè)Partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到Partition中。我們來(lái)看下圖,其就是Partition的一個(gè)真實(shí)寫照:
      Kafka 的 Lag 計(jì)算誤區(qū)及正確實(shí)現(xiàn)

      上圖中有四個(gè)概念:

      1. LogStartOffset:表示一個(gè)Partition的起始位移,初始為0,雖然消息的增加以及日志清除策略的影響,這個(gè)值會(huì)階段性的增大。
      2. ConsumerOffset:消費(fèi)位移,表示Partition的某個(gè)消費(fèi)者消費(fèi)到的位移位置。
      3. HighWatermark:簡(jiǎn)稱HW,代表消費(fèi)端所能“觀察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
      4. LogEndOffset:簡(jiǎn)稱LEO, 代表Partition的最高日志位移,其值對(duì)消費(fèi)者不可見(jiàn)。比如在ISR(In-Sync-Replicas)副本數(shù)等于3的情況下(如下圖所示),消息發(fā)送到Leader A之后會(huì)更新LEO的值,F(xiàn)ollower B和Follower C也會(huì)實(shí)時(shí)拉取Leader A中的消息來(lái)更新自己,HW就表示A、B、C三者同時(shí)達(dá)到的日志位移,也就是A、B、C三者中LEO最小的那個(gè)值。由于B、C拉取A消息之間延時(shí)問(wèn)題,所以HW必然不會(huì)一直與Leader的LEO相等,即LEO>=HW。
        Kafka 的 Lag 計(jì)算誤區(qū)及正確實(shí)現(xiàn)

      要計(jì)算Kafka中某個(gè)消費(fèi)者的滯后量很簡(jiǎn)單,首先看看其消費(fèi)了幾個(gè)Topic,然后針對(duì)每個(gè)Topic來(lái)計(jì)算其中每個(gè)Partition的Lag,每個(gè)Partition的Lag計(jì)算就顯得非常的簡(jiǎn)單了,參考下圖:

      由圖可知消費(fèi)Lag=HW - ConsumerOffset。對(duì)于這里大家有可能有個(gè)誤區(qū),就是認(rèn)為L(zhǎng)ag應(yīng)該是LEO與ConsumerOffset之間的差值。LEO是對(duì)消費(fèi)者不可見(jiàn)的,既然不可見(jiàn)何來(lái)消費(fèi)滯后一說(shuō)。

      那么這里就引入了一個(gè)新的問(wèn)題,HW和ConsumerOffset的值如何獲取呢?
      Kafka 的 Lag 計(jì)算誤區(qū)及正確實(shí)現(xiàn)

      首先來(lái)說(shuō)說(shuō)ConsumerOffset,Kafka中有兩處可以存儲(chǔ),一個(gè)是Zookeeper,而另一個(gè)是”consumer_offsets這個(gè)內(nèi)部topic中,前者是0.8.x版本中的使用方式,但是隨著版本的迭代更新,現(xiàn)在越來(lái)越趨向于后者。就拿1.0.0版本來(lái)說(shuō),雖然默認(rèn)是存儲(chǔ)在”consumer_offsets”中,但是保不齊用于就將其存儲(chǔ)在了Zookeeper中了。這個(gè)問(wèn)題倒也不難解決,針對(duì)兩種方式都去拉取,然后哪個(gè)有值的取哪個(gè)。不過(guò)這里還有一個(gè)問(wèn)題,對(duì)于消費(fèi)位移來(lái)說(shuō),其一般不會(huì)實(shí)時(shí)的更新,而更多的是定時(shí)更新,這樣可以提高整體的性能。那么這個(gè)定時(shí)的時(shí)間間隔就是ConsumerOffset的誤差區(qū)間之一。

      再來(lái)說(shuō)說(shuō)HW,其也是Kafka中Partition的一個(gè)狀態(tài)。有可能你會(huì)察覺(jué)到在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”這樣一個(gè)屬性,但是這個(gè)值不是LEO而是HW。

      那么怎樣正確的計(jì)算消費(fèi)的Lag呢?對(duì)Kafka熟悉的同學(xué)可能會(huì)想到Kafka中自帶的kafka-consumer_groups.sh腳本中就有Lag的信息,示例如下:

      [root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
      TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-ID
      topic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
      topic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
      topic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
      topic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID

      我們深究一下kafka-consumer_groups.sh腳本,發(fā)現(xiàn)只有一句代碼:

      exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

      其含義就是執(zhí)行kafka.admin.ConsumerGroupCommand而已。進(jìn)一步深究,在ConsumerGroupCommand內(nèi)部抓住了2句關(guān)鍵代碼:

      val consumerGroupService = new KafkaConsumerGroupService(opts)
      val (state, assignments) = consumerGroupService.describeGroup()

      代碼詳解:consumerGroupService的類型是ConsumerGroupServicesealed trait類型),而KafkaConsumerGroupService只是ConsumerGroupService的一種實(shí)現(xiàn),還有一種實(shí)現(xiàn)是ZkConsumerGroupService,分別對(duì)應(yīng)新版的消費(fèi)方式(消費(fèi)位移存儲(chǔ)在__consumer_offsets中)和舊版的消費(fèi)方式(消費(fèi)位移存儲(chǔ)在zk中),詳細(xì)計(jì)算步驟參考下一段落的內(nèi)容。opt參數(shù)是指“ –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等參數(shù)。第2句代碼是調(diào)用describeGroup()方法來(lái)獲取具體的信息,即二元組中的assignments,這個(gè)assignments中保存了上面打印信息中的所有內(nèi)容。

      Scala小知識(shí):
      在Scala中trait(特征)相當(dāng)于Java的接口,實(shí)際上它比接口更大強(qiáng)大。與Java中的接口不同的是,它還可以定義屬性和方法的實(shí)現(xiàn)(JDK8起的接口默認(rèn)方法)。一般情況下Scala中的類只能繼承單一父類,但是如果是trait的話就可以繼承多個(gè),從結(jié)果來(lái)看是實(shí)現(xiàn)了多重繼承。被sealed聲明的trait僅能被同一文件的類繼承。

      ZkConsumerGroupService中計(jì)算消費(fèi)lag的步驟如下:

      1. 通過(guò)zk獲取一些基本信息,對(duì)應(yīng)上面打印信息中的:TOPIC、PARTITION、CONSUMER-ID等,不過(guò)不會(huì)有HOST和CLIENT-ID。
      2. 通過(guò)OffsetFetchRequest請(qǐng)求獲取消費(fèi)位移(offset),如果獲取失敗則在通過(guò)zk獲取。
      3. 通過(guò)OffsetReuqest請(qǐng)求獲取分區(qū)的LogEndOffset(簡(jiǎn)稱為L(zhǎng)EO,可見(jiàn)的LEO)。
      4. 計(jì)算LogEndOffset與消費(fèi)位移的差值來(lái)獲取lag。

      KafkaConsumerGroupService中計(jì)算消費(fèi)lag的步驟如下:

      1. 通過(guò)DescibeGroupsRequest請(qǐng)求獲取一些基本信息,不僅包括TOPIC、PARTITION、CONSUMER-ID,還有HOST和CLIENT-ID。其實(shí)還有通過(guò)
        FindCoordinatorRequest請(qǐng)求來(lái)獲取coordinator信息,如果不了解coordinator在這里也沒(méi)影響。
      2. 通過(guò)OffsetFetchRequest請(qǐng)求獲取消費(fèi)位移。
      3. 通過(guò)OffsetReuqest請(qǐng)求獲取分區(qū)的LogEndOffset(簡(jiǎn)稱為L(zhǎng)EO)。
      4. 計(jì)算LogEndOffset與消費(fèi)位移的差值來(lái)獲取lag。

      可以看到KafkaConsumerGroupService與ZkConsumerGroupService的計(jì)算Lag的方式都差不多,但是KafkaConsumerGroupService能獲取更多消費(fèi)詳情,并且ZkConsumerGroupService也被標(biāo)注為@Deprecated的了,后面內(nèi)容都針對(duì)KafkaConsumerGroupService來(lái)做說(shuō)明。既然Kafka已經(jīng)為我們提供了線程的方法來(lái)獲取Lag,那么我們有何必再重復(fù)造輪子,這里筆者寫了一個(gè)調(diào)用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala語(yǔ)言編寫的,在Java的程序里使用類似scala.collection.Seq這樣的全名稱以防止混淆):

      String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
      ConsumerGroupCommand.ConsumerGroupCommandOptions opts =
              new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
      ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
              new ConsumerGroupCommand.KafkaConsumerGroupService(opts);
      scala.Tuple2, scala.Option>> res = kafkaConsumerGroupService.describeGroup();
      scala.collection.Seq pasSeq = res._2.get();
      scala.collection.Iterator iterable = pasSeq.iterator();
      while (iterable.hasNext()) {
          ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();
          System.out.println(String.format("\n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",
                  pas.topic().get(), pas.partition().get(), pas.offset().get(),
                  pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),
                  pas.host().get(), pas.clientId().get()));
      }

      在使用時(shí),你可以封裝一下這段代碼然后返回一個(gè)類似List的東西給上層業(yè)務(wù)代碼做進(jìn)一步的使用。ConsumerGroupCommand.PartitionAssignmentState的代碼如下:

      case class PartitionAssignmentState(
        group: String, coordinator: Option[Node], topic: Option[String],
        partition: Option[Int], offset: Option[Long], lag: Option[Long],
        consumerId: Option[String], host: Option[String],
        clientId: Option[String], logEndOffset: Option[Long])

      Scala小知識(shí):
      對(duì)于case class, 在這里你可以簡(jiǎn)單的把它看成是一個(gè)JavaBean,但是它遠(yuǎn)比JavaBean強(qiáng)大,比如它會(huì)自動(dòng)生成equals、hashCode、toString、copy、伴生對(duì)象、apply、unapply等等東西。在 scala 中,對(duì)保護(hù)(Protected)成員的訪問(wèn)比 java 更嚴(yán)格一些。因?yàn)樗辉试S保護(hù)成員在定義了該成員的的類的子類中被訪問(wèn)。而在java中,用protected關(guān)鍵字修飾的成員,除了定義了該成員的類的子類可以訪問(wèn),同一個(gè)包里的其他類也可以進(jìn)行訪問(wèn)。Scala中,如果沒(méi)有指定任何的修飾符,則默認(rèn)為 public。這樣的成員在任何地方都可以被訪問(wèn)。

      如果你正在試著運(yùn)行上面一段程序,你會(huì)發(fā)現(xiàn)編譯失敗,報(bào)錯(cuò):cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in ‘kafka.admin.ConsumerGroupCommand‘。這時(shí)候需要將所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState類前面的protected修飾符去掉才能編譯通過(guò)。


      本文的重點(diǎn)是你有沒(méi)有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過(guò)多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助

      需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

      Kafka 的 Lag 計(jì)算誤區(qū)及正確實(shí)現(xiàn)
      Kafka 的 Lag 計(jì)算誤區(qū)及正確實(shí)現(xiàn)
      Kafka 的 Lag 計(jì)算誤區(qū)及正確實(shí)現(xiàn)
      Kafka 的 Lag 計(jì)算誤區(qū)及正確實(shí)現(xiàn)


      分享標(biāo)題:Kafka的Lag計(jì)算誤區(qū)及正確實(shí)現(xiàn)
      本文鏈接:http://www.ef60e0e.cn/article/psdjcs.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        通化县| 太和县| 罗定市| 永康市| 岳西县| 建阳市| 石阡县| 黄冈市| 洛川县| 朝阳县| 三门峡市| 垦利县| 绥中县| 榆林市| 丹巴县| 塘沽区| 闽侯县| 宾川县| 句容市| 门源| 娱乐| 石狮市| 张家港市| 瑞金市| 揭阳市| 渭南市| 凤阳县| 长丰县| 红河县| 许昌县| 云浮市| 闸北区| 尉犁县| 永靖县| 舒兰市| 乡宁县| 台北市| 故城县| 台前县| 济阳县| 合阳县|