新聞中心
前言
消息堆積是消息中間件的一大特色,消息中間件的流量削峰、冗余存儲(chǔ)等功能正是得益于消息中間件的消息堆積能力。然而消息堆積其實(shí)是一把亦正亦邪的雙刃劍,如果應(yīng)用場(chǎng)合不恰當(dāng)反而會(huì)對(duì)上下游的業(yè)務(wù)造成不必要的麻煩,比如消息堆積勢(shì)必會(huì)影響上下游整個(gè)調(diào)用鏈的時(shí)效性,有些中間件如RabbitMQ在發(fā)生消息堆積時(shí)在某些情況下還會(huì)影響自身的性能。對(duì)于Kafka而言,雖然消息堆積不會(huì)對(duì)其自身性能帶來(lái)多大的困擾,但難免不會(huì)影響上下游的業(yè)務(wù),堆積過(guò)多有可能會(huì)造成磁盤爆滿,或者觸發(fā)日志清除策略而造成消息丟失的情況。如何利用好消息堆積這把雙刃劍,監(jiān)控是最為關(guān)鍵的一步。
成都創(chuàng)新互聯(lián)是一家專業(yè)提供臨猗企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、H5網(wǎng)站設(shè)計(jì)、小程序制作等業(yè)務(wù)。10年已為臨猗眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。
正文
消息堆積是消費(fèi)滯后(Lag)的一種表現(xiàn)形式,消息中間件服務(wù)端中所留存的消息與消費(fèi)掉的消息之間的差值即為消息堆積量,也稱之為消費(fèi)滯后(Lag)量。對(duì)于Kafka而言,消息被發(fā)送至Topic中,而Topic又分成了多個(gè)分區(qū)(Partition),每一個(gè)Partition都有一個(gè)預(yù)寫式的日志文件,雖然Partition可以繼續(xù)細(xì)分為若干個(gè)段文件(Segment),但是對(duì)于上層應(yīng)用來(lái)說(shuō)可以將Partition看成最小的存儲(chǔ)單元(一個(gè)由多個(gè)Segment文件拼接的“巨型文件”)。每個(gè)Partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到Partition中。我們來(lái)看下圖,其就是Partition的一個(gè)真實(shí)寫照:
上圖中有四個(gè)概念:
- LogStartOffset:表示一個(gè)Partition的起始位移,初始為0,雖然消息的增加以及日志清除策略的影響,這個(gè)值會(huì)階段性的增大。
- ConsumerOffset:消費(fèi)位移,表示Partition的某個(gè)消費(fèi)者消費(fèi)到的位移位置。
- HighWatermark:簡(jiǎn)稱HW,代表消費(fèi)端所能“觀察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
- LogEndOffset:簡(jiǎn)稱LEO, 代表Partition的最高日志位移,其值對(duì)消費(fèi)者不可見(jiàn)。比如在ISR(In-Sync-Replicas)副本數(shù)等于3的情況下(如下圖所示),消息發(fā)送到Leader A之后會(huì)更新LEO的值,F(xiàn)ollower B和Follower C也會(huì)實(shí)時(shí)拉取Leader A中的消息來(lái)更新自己,HW就表示A、B、C三者同時(shí)達(dá)到的日志位移,也就是A、B、C三者中LEO最小的那個(gè)值。由于B、C拉取A消息之間延時(shí)問(wèn)題,所以HW必然不會(huì)一直與Leader的LEO相等,即LEO>=HW。
要計(jì)算Kafka中某個(gè)消費(fèi)者的滯后量很簡(jiǎn)單,首先看看其消費(fèi)了幾個(gè)Topic,然后針對(duì)每個(gè)Topic來(lái)計(jì)算其中每個(gè)Partition的Lag,每個(gè)Partition的Lag計(jì)算就顯得非常的簡(jiǎn)單了,參考下圖:
由圖可知消費(fèi)Lag=HW - ConsumerOffset。對(duì)于這里大家有可能有個(gè)誤區(qū),就是認(rèn)為L(zhǎng)ag應(yīng)該是LEO與ConsumerOffset之間的差值。LEO是對(duì)消費(fèi)者不可見(jiàn)的,既然不可見(jiàn)何來(lái)消費(fèi)滯后一說(shuō)。
那么這里就引入了一個(gè)新的問(wèn)題,HW和ConsumerOffset的值如何獲取呢?
首先來(lái)說(shuō)說(shuō)ConsumerOffset,Kafka中有兩處可以存儲(chǔ),一個(gè)是Zookeeper,而另一個(gè)是”consumer_offsets這個(gè)內(nèi)部topic中,前者是0.8.x版本中的使用方式,但是隨著版本的迭代更新,現(xiàn)在越來(lái)越趨向于后者。就拿1.0.0版本來(lái)說(shuō),雖然默認(rèn)是存儲(chǔ)在”consumer_offsets”中,但是保不齊用于就將其存儲(chǔ)在了Zookeeper中了。這個(gè)問(wèn)題倒也不難解決,針對(duì)兩種方式都去拉取,然后哪個(gè)有值的取哪個(gè)。不過(guò)這里還有一個(gè)問(wèn)題,對(duì)于消費(fèi)位移來(lái)說(shuō),其一般不會(huì)實(shí)時(shí)的更新,而更多的是定時(shí)更新,這樣可以提高整體的性能。那么這個(gè)定時(shí)的時(shí)間間隔就是ConsumerOffset的誤差區(qū)間之一。
再來(lái)說(shuō)說(shuō)HW,其也是Kafka中Partition的一個(gè)狀態(tài)。有可能你會(huì)察覺(jué)到在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”這樣一個(gè)屬性,但是這個(gè)值不是LEO而是HW。
那么怎樣正確的計(jì)算消費(fèi)的Lag呢?對(duì)Kafka熟悉的同學(xué)可能會(huì)想到Kafka中自帶的kafka-consumer_groups.sh腳本中就有Lag的信息,示例如下:
[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID
我們深究一下kafka-consumer_groups.sh腳本,發(fā)現(xiàn)只有一句代碼:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"
其含義就是執(zhí)行kafka.admin.ConsumerGroupCommand而已。進(jìn)一步深究,在ConsumerGroupCommand內(nèi)部抓住了2句關(guān)鍵代碼:
val consumerGroupService = new KafkaConsumerGroupService(opts)
val (state, assignments) = consumerGroupService.describeGroup()
代碼詳解:consumerGroupService的類型是ConsumerGroupServicesealed trait類型),而KafkaConsumerGroupService只是ConsumerGroupService的一種實(shí)現(xiàn),還有一種實(shí)現(xiàn)是ZkConsumerGroupService,分別對(duì)應(yīng)新版的消費(fèi)方式(消費(fèi)位移存儲(chǔ)在__consumer_offsets中)和舊版的消費(fèi)方式(消費(fèi)位移存儲(chǔ)在zk中),詳細(xì)計(jì)算步驟參考下一段落的內(nèi)容。opt參數(shù)是指“ –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等參數(shù)。第2句代碼是調(diào)用describeGroup()方法來(lái)獲取具體的信息,即二元組中的assignments,這個(gè)assignments中保存了上面打印信息中的所有內(nèi)容。
Scala小知識(shí):
在Scala中trait(特征)相當(dāng)于Java的接口,實(shí)際上它比接口更大強(qiáng)大。與Java中的接口不同的是,它還可以定義屬性和方法的實(shí)現(xiàn)(JDK8起的接口默認(rèn)方法)。一般情況下Scala中的類只能繼承單一父類,但是如果是trait的話就可以繼承多個(gè),從結(jié)果來(lái)看是實(shí)現(xiàn)了多重繼承。被sealed聲明的trait僅能被同一文件的類繼承。
ZkConsumerGroupService中計(jì)算消費(fèi)lag的步驟如下:
- 通過(guò)zk獲取一些基本信息,對(duì)應(yīng)上面打印信息中的:TOPIC、PARTITION、CONSUMER-ID等,不過(guò)不會(huì)有HOST和CLIENT-ID。
- 通過(guò)OffsetFetchRequest請(qǐng)求獲取消費(fèi)位移(offset),如果獲取失敗則在通過(guò)zk獲取。
- 通過(guò)OffsetReuqest請(qǐng)求獲取分區(qū)的LogEndOffset(簡(jiǎn)稱為L(zhǎng)EO,可見(jiàn)的LEO)。
- 計(jì)算LogEndOffset與消費(fèi)位移的差值來(lái)獲取lag。
KafkaConsumerGroupService中計(jì)算消費(fèi)lag的步驟如下:
- 通過(guò)DescibeGroupsRequest請(qǐng)求獲取一些基本信息,不僅包括TOPIC、PARTITION、CONSUMER-ID,還有HOST和CLIENT-ID。其實(shí)還有通過(guò)
FindCoordinatorRequest請(qǐng)求來(lái)獲取coordinator信息,如果不了解coordinator在這里也沒(méi)影響。 - 通過(guò)OffsetFetchRequest請(qǐng)求獲取消費(fèi)位移。
- 通過(guò)OffsetReuqest請(qǐng)求獲取分區(qū)的LogEndOffset(簡(jiǎn)稱為L(zhǎng)EO)。
- 計(jì)算LogEndOffset與消費(fèi)位移的差值來(lái)獲取lag。
可以看到KafkaConsumerGroupService與ZkConsumerGroupService的計(jì)算Lag的方式都差不多,但是KafkaConsumerGroupService能獲取更多消費(fèi)詳情,并且ZkConsumerGroupService也被標(biāo)注為@Deprecated的了,后面內(nèi)容都針對(duì)KafkaConsumerGroupService來(lái)做說(shuō)明。既然Kafka已經(jīng)為我們提供了線程的方法來(lái)獲取Lag,那么我們有何必再重復(fù)造輪子,這里筆者寫了一個(gè)調(diào)用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala語(yǔ)言編寫的,在Java的程序里使用類似scala.collection.Seq這樣的全名稱以防止混淆):
String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts =
new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
new ConsumerGroupCommand.KafkaConsumerGroupService(opts);
scala.Tuple2, scala.Option>> res = kafkaConsumerGroupService.describeGroup();
scala.collection.Seq pasSeq = res._2.get();
scala.collection.Iterator iterable = pasSeq.iterator();
while (iterable.hasNext()) {
ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();
System.out.println(String.format("\n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",
pas.topic().get(), pas.partition().get(), pas.offset().get(),
pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),
pas.host().get(), pas.clientId().get()));
}
在使用時(shí),你可以封裝一下這段代碼然后返回一個(gè)類似List
case class PartitionAssignmentState(
group: String, coordinator: Option[Node], topic: Option[String],
partition: Option[Int], offset: Option[Long], lag: Option[Long],
consumerId: Option[String], host: Option[String],
clientId: Option[String], logEndOffset: Option[Long])
Scala小知識(shí):
對(duì)于case class, 在這里你可以簡(jiǎn)單的把它看成是一個(gè)JavaBean,但是它遠(yuǎn)比JavaBean強(qiáng)大,比如它會(huì)自動(dòng)生成equals、hashCode、toString、copy、伴生對(duì)象、apply、unapply等等東西。在 scala 中,對(duì)保護(hù)(Protected)成員的訪問(wèn)比 java 更嚴(yán)格一些。因?yàn)樗辉试S保護(hù)成員在定義了該成員的的類的子類中被訪問(wèn)。而在java中,用protected關(guān)鍵字修飾的成員,除了定義了該成員的類的子類可以訪問(wèn),同一個(gè)包里的其他類也可以進(jìn)行訪問(wèn)。Scala中,如果沒(méi)有指定任何的修飾符,則默認(rèn)為 public。這樣的成員在任何地方都可以被訪問(wèn)。
如果你正在試著運(yùn)行上面一段程序,你會(huì)發(fā)現(xiàn)編譯失敗,報(bào)錯(cuò):cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in ‘kafka.admin.ConsumerGroupCommand‘。這時(shí)候需要將所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState類前面的protected修飾符去掉才能編譯通過(guò)。
本文的重點(diǎn)是你有沒(méi)有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過(guò)多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助
需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取
分享標(biāo)題:Kafka的Lag計(jì)算誤區(qū)及正確實(shí)現(xiàn)
本文鏈接:http://www.ef60e0e.cn/article/psdjcs.html