新聞中心
本篇文章為大家展示了怎么徹底搞懂Kafka消息大小相關(guān)參數(shù)設(shè)置的規(guī)則,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
創(chuàng)新互聯(lián)建站是一家專注于成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)與策劃設(shè)計(jì),姑蘇網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設(shè)10多年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:姑蘇等地區(qū)。姑蘇做網(wǎng)站價(jià)格咨詢:13518219792
前段時(shí)間接到用戶要求,調(diào)整某個(gè)主題在 Kafka 集群消息大小為 4M。
根據(jù) Kafka 消息大小規(guī)則設(shè)定,生產(chǎn)端自行將 max.request.size 調(diào)整為 4M 大小,Kafka 集群為該主題設(shè)置主題級(jí)別參數(shù) max.message.bytes 的大小為 4M。
以上是針對(duì) Kafka 2.2.x 版本的設(shè)置,需要注意的是,在某些舊版本當(dāng)中,還需要調(diào)整相關(guān)關(guān)聯(lián)參數(shù),比如 replica.fetch.max.bytes 等。
從上面例子可看出,Kafka 消息大小的設(shè)置還是挺復(fù)雜的一件事,而且還分版本,需要注意的參數(shù)巨多,而且每個(gè)都長(zhǎng)得差不多,不但分版本,還需要注意生產(chǎn)端、broker、消費(fèi)端的設(shè)置,而且還要區(qū)分 broker 級(jí)別還是 topic 級(jí)別的設(shè)置,而且還需要清楚知道每個(gè)配置的含義。
下面通過(guò)相關(guān)參數(shù)的解析說(shuō)明,再結(jié)合實(shí)戰(zhàn)測(cè)試,幫助你快速搞明白這些參數(shù)的含義以及規(guī)則。
broker
broker 關(guān)于消息體大小相關(guān)的參數(shù)主要有 message.max.bytes、replica.fetch.min.bytes、replica.fetch.max.bytes、replica.fetch.response.max.bytes
1、message.max.bytes
Kafka 允許的最大 record batch size,什么是 record batch size ?簡(jiǎn)單來(lái)說(shuō)就是 Kafka 的消息集合批次,一個(gè)批次當(dāng)中會(huì)包含多條消息,生產(chǎn)者中有個(gè)參數(shù) batch.size,指的是生產(chǎn)者可以進(jìn)行消息批次發(fā)送,提高吞吐量,以下是 message.max.bytes 參數(shù)作用的源碼:
kafka.log.Log#analyzeAndValidateRecords
以上源碼可以看出 message.max.bytes 并不是限制消息體大小的,而是限制一個(gè)批次的消息大小,所以我們需要注意生產(chǎn)端對(duì)于 batch.size 的參數(shù)設(shè)置需要小于 message.max.bytes。
以下附帶 Kafka 官方解釋:
The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large.
In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.
This can be set per topic with the topic level
max.message.bytes
config.
翻譯如下:
Kafka 允許的最大記錄批量。 如果增加此數(shù)量,并且有一些消費(fèi)者的年齡大于 0.10.2,則消費(fèi)者的獲取大小也必須增加,以便他們可以獲取如此大的記錄批次。
在最新的消息格式版本中,為了提高效率,始終將記錄分組。 在以前的消息格式版本中,未壓縮的記錄不會(huì)分組,并且在這種情況下,此限制僅適用于單個(gè)記錄。
可以使用主題級(jí)別 “max.message.bytes” 配置針對(duì)每個(gè)主題進(jìn)行設(shè)置。
2、replica.fetch.min.bytes、replica.fetch.max.bytes、replica.fetch.response.max.bytes
kafka 的分區(qū)如果是多副本,那么 follower 副本就會(huì)源源不斷地從 leader 副本拉取消息進(jìn)行復(fù)制,這里也會(huì)有相關(guān)參數(shù)對(duì)消息大小進(jìn)行設(shè)置,其中 replica.fetch.max.bytes 是限制拉取分區(qū)中消息的大小,在 0.8.2 以前的版本中,如果 replica.fetch.max.bytes < message.max.bytes,就會(huì)造成 follower 副本復(fù)制不了消息。不過(guò)在后面的版本當(dāng)中,已經(jīng)對(duì)這個(gè)問(wèn)題進(jìn)行了修復(fù)。
replica.fetch.max.bytes 參見 2.2.x 版本的官方解釋:
The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).
翻譯如下:
嘗試為每個(gè)分區(qū)獲取的消息的字節(jié)數(shù)。 這不是絕對(duì)最大值,如果獲取的第一個(gè)非空分區(qū)中的第一個(gè)記錄批處理大于此值,那么仍將返回記錄批處理以確保進(jìn)度。代理接受的最大記錄批處理大小是通過(guò) message.max.bytes(代理配置)或 max.message.bytes(主題配置)定義的。
replica.fetch.min.bytes、replica.fetch.response.max.bytes 同理。
topic
1、max.message.bytes
該參數(shù)跟 message.max.bytes 參數(shù)的作用是一樣的,只不過(guò) max.message.bytes 是作用于某個(gè) topic,而 message.max.bytes 是作用于全局。
producer
1、max.request.size
該參數(shù)挺有意思的,看了 Kafka 生產(chǎn)端發(fā)送相關(guān)源碼后,發(fā)現(xiàn)消息在 append 到 RecordAccumulator 之前,會(huì)校驗(yàn)該消息是否大于 max.request.size,具體邏輯如下:
org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize
從以上源碼得出結(jié)論,Kafka 會(huì)首先判斷本次消息大小是否大于 maxRequestSize,如果本次消息大小 maxRequestSize,則直接拋出異常,不會(huì)繼續(xù)執(zhí)行追加消息到 batch。
并且還會(huì)在 Sender 線程發(fā)送數(shù)據(jù)到 broker 之前,會(huì)使用 max.request.size 限制發(fā)送請(qǐng)求數(shù)據(jù)的大小:
org.apache.kafka.clients.producer.internals.Sender#sendProducerData
也就是說(shuō),max.request.size 參數(shù)具備兩個(gè)特性:
1)限制單條消息大小
2)限制發(fā)送請(qǐng)求大小
參見 2.2.x 版本的官方解釋:
The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.
翻譯如下:
請(qǐng)求的最大大小(以字節(jié)為單位)。 此設(shè)置將限制生產(chǎn)者將在單個(gè)請(qǐng)求中發(fā)送的記錄批數(shù),以避免發(fā)送大量請(qǐng)求。 這實(shí)際上也是最大記錄批次大小的上限。 請(qǐng)注意,服務(wù)器對(duì)記錄批大小有自己的上限,該上限可能與此不同。
2、batch.size
batch.size 是 Kafka producer 非常重要的參數(shù),它的值對(duì) Producer 的吞吐量有著非常大的影響,因?yàn)槲覀冎溃占揭慌⒃侔l(fā)送到 broker,比每條消息都請(qǐng)求一次 broker,性能會(huì)有顯著的提高,但 batch.size 設(shè)置得非常大又會(huì)給機(jī)器內(nèi)存帶來(lái)極大的壓力,因此需要在項(xiàng)目中合理地增減 batch.size 值,才能提高 producer 的吞吐量。
org.apache.kafka.clients.producer.internals.RecordAccumulator#append
以上,將消息追加到消息緩沖區(qū)時(shí),會(huì)嘗試追加到一個(gè) ProducerBatch,如果 ProducerBatch 滿了,就去緩存區(qū)申請(qǐng) batch.size 大小的緩存創(chuàng)建一個(gè)新的 ProducerBatch 繼續(xù)追加消息。需要注意的是,如果消息大小本身就比 batch.size 大,這種情況每個(gè) ProducerBatch 只會(huì)包含一條消息。
最終 RecordAccumulator 緩存區(qū)看起來(lái)是這樣的:
參見 2.2.x 版本的官方解釋:
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
No attempt will be made to batch records larger than this size.
Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.
翻譯如下:
每當(dāng)將多個(gè)記錄發(fā)送到同一分區(qū)時(shí),生產(chǎn)者將嘗試將記錄一起批處理成更少的請(qǐng)求。 這有助于提高客戶端和服務(wù)器的性能。 此配置控制默認(rèn)的批處理大小(以字節(jié)為單位)。
不會(huì)嘗試批處理大于此大小的記錄。
發(fā)送給代理的請(qǐng)求將包含多個(gè)批次,每個(gè)分區(qū)一個(gè),并包含可發(fā)送的數(shù)據(jù)。
較小的批處理量將使批處理變得不那么普遍,并且可能會(huì)降低吞吐量(零的批處理量將完全禁用批處理)。 非常大的批處理大小可能會(huì)浪費(fèi)一些內(nèi)存,因?yàn)槲覀兛偸窃陬A(yù)期其他記錄時(shí)分配指定批處理大小的緩沖區(qū)。
那么針對(duì) max.request.size 、batch.size 之間大小的調(diào)優(yōu)就尤其重要,通常來(lái)說(shuō),max.request.size 大于 batch.size,這樣每次發(fā)送消息通常會(huì)包含多個(gè) ProducerBatch。
consumer
1、fetch.min.bytes、fetch.max.bytes、max.partition.fetch.bytes
1)fetch.max.bytes
參見 2.2.x 版本的官方解釋:
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via
message.max.bytes
(broker config) ormax.message.bytes
(topic config). Note that the consumer performs multiple fetches in parallel.
翻譯如下:
服務(wù)器為獲取請(qǐng)求應(yīng)返回的最大數(shù)據(jù)量。 使用者將批量獲取記錄,并且如果獲取的第一個(gè)非空分區(qū)中的第一個(gè)記錄批次大于此值,則仍將返回記錄批次以確保使用者可以取得進(jìn)展。 因此,這不是絕對(duì)最大值。 代理可接受的最大記錄批處理大小是通過(guò)“ message.max.bytes”(代理配置)或“ max.message.bytes”(主題配置)定義的。 請(qǐng)注意,使用者并行執(zhí)行多個(gè)提取。
fetch.min.bytes、max.partition.fetch.bytes 同理。
實(shí)戰(zhàn)測(cè)試
針對(duì)以上相關(guān)參數(shù)配置的解讀,還需要對(duì) max.request.size、batch.size、message.max.bytes(或者 max.message.bytes)三個(gè)參數(shù)進(jìn)一步驗(yàn)證。
1、測(cè)試消息大于 max.request.size 是否會(huì)被攔截
設(shè)置:
max.request.size = 1000, record-size = 2000
使用 kafka-producer-perf-test.sh 腳本測(cè)試:
$ {kafka_path}/bin/kafka-producer-perf-test.sh --topic test-topic2 --num-records 500000000000 --record-size 20000 --throughput 1 --producer-props bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 acks=-1 max.request.size=1000
測(cè)試結(jié)果:
可以得出結(jié)論,成功攔截了大于 max.request.size 的消息。
2、測(cè)試 max.message.bytes 參數(shù)用于校驗(yàn)批次大小還是校驗(yàn)消息大小
設(shè)置:
record-size = 500 batch.size = 2000 linger.ms = 1000 max.message.bytes = 1000 // 在控制臺(tái)調(diào)整主題級(jí)別配置即可
使用 kafka-producer-perf-test.sh 腳本測(cè)試:
$ {kafka_path}/bin/kafka-producer-perf-test.sh --topic test-topic1 --num-records 500000000000 --record-size 500 --throughput 5 --producer-props bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 acks=-1 batch.size=2000 linger.ms=1000
測(cè)試結(jié)果:
當(dāng) max.message.bytes = 2500 時(shí):
可以得出結(jié)論,max.message.bytes 參數(shù)校驗(yàn)的是批次大小,而不是消息大小。
3、測(cè)試消息大小比 batch.size 還大的情況下,是否還會(huì)發(fā)送消息,當(dāng) max.message.bytes 參數(shù)小于消息大小時(shí),是否會(huì)報(bào)錯(cuò)
record-size = 1000 batch.size = 500 linger.ms = 1000
使用 kafka-producer-perf-test.sh 腳本測(cè)試:
$ {kafka_path}/bin/kafka-producer-perf-test.sh --topic test-topic1 --num-records 500000000000 --record-size 1000 --throughput 5 --producer-props bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 acks=-1 batch.size=500 linger.ms=1000
測(cè)試結(jié)果:
可以得出結(jié)論,即使消息大小比 batch.size 還大,依然會(huì)繼續(xù)發(fā)送消息。
當(dāng) max.message.bytes = 900 時(shí):
可以得出結(jié)論,即使 batch.size < max.message.bytes,但由于消息大小比 batch.size 大的情況下依然會(huì)發(fā)送消息,如果沒(méi)有 max.request.size 參數(shù)控制消息大小的話,就有可能會(huì)報(bào)錯(cuò)。
這也說(shuō)明了文章開頭為什么直接修改 max.request.size 和 max.message.bytes 即可,而不需要調(diào)整 batch.size 的原因。
上述內(nèi)容就是怎么徹底搞懂Kafka消息大小相關(guān)參數(shù)設(shè)置的規(guī)則,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
分享文章:怎么徹底搞懂Kafka消息大小相關(guān)參數(shù)設(shè)置的規(guī)則
分享地址:http://www.ef60e0e.cn/article/pjoceh.html