新聞中心
KafkaProducer在調(diào)用send方法發(fā)送消息至broker的過程中,首先是經(jīng)過攔截器Inteceptors處理,然后是經(jīng)過序列化Serializer處理,之后就到了Partitions階段,即分區(qū)分配計(jì)算階段。在某些應(yīng)用場景下,業(yè)務(wù)邏輯需要控制每條消息落到合適的分區(qū)中,有些情形下則只要根據(jù)默認(rèn)的分配規(guī)則即可。在KafkaProducer計(jì)算分配時(shí),首先根據(jù)的是ProducerRecord中的partition字段指定的序號計(jì)算分區(qū)。讀者有可能剛睡醒,看到這個(gè)ProducerRecord似曾相識,沒有關(guān)系,先看段Kafka生產(chǎn)者的示例片段:
在申扎等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站設(shè)計(jì)、網(wǎng)站制作 網(wǎng)站設(shè)計(jì)制作按需開發(fā)網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計(jì),營銷型網(wǎng)站建設(shè),外貿(mào)網(wǎng)站制作,申扎網(wǎng)站建設(shè)費(fèi)用合理。
Producer producer = new KafkaProducer(properties);
String message = "kafka producer demo";
ProducerRecord producerRecord = new ProducerRecord(topic,message);
try {
producer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
沒錯(cuò),ProducerRecord只是一個(gè)封裝了消息的對象而已,ProducerRecord一共有5個(gè)成員變量,即:
private final String topic;//所要發(fā)送的topic
private final Integer partition;//指定的partition序號
private final Headers headers;//一組鍵值對,與RabbitMQ中的headers類似,kafka0.11.x版本才引入的一個(gè)屬性
private final K key;//消息的key
private final V value;//消息的value,即消息體
private final Long timestamp;//消息的時(shí)間戳,可以分為Create_Time和LogAppend_Time之分,這個(gè)以后的文章中再表。123456
在KafkaProducer的源碼(1.0.0)中,計(jì)算分區(qū)時(shí)調(diào)用的是下面的partition()方法:
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
可以看出的確是先判斷有無指明ProducerRecord的partition字段,如果沒有指明,則再進(jìn)一步計(jì)算分區(qū)。上面這段代碼中的partitioner在默認(rèn)情況下是指Kafka默認(rèn)實(shí)現(xiàn)的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法實(shí)現(xiàn)如下:
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
由上源碼可以看出partition的計(jì)算方式:
- 如果key為null,則按照一種輪詢的方式來計(jì)算分區(qū)分配
- 如果key不為null則使用稱之為murmur的Hash算法(非加密型Hash函數(shù),具備高運(yùn)算性能及低碰撞率)來計(jì)算分區(qū)分配。
KafkaProducer中還支持自定義分區(qū)分配方式,與org.apache.kafka.clients.producer.internals.DefaultPartitioner一樣首先實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class為對應(yīng)的自定義分區(qū)器(Partitioners)即可,即:
properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");
自定義DemoPartitioner主要是實(shí)現(xiàn)Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的計(jì)算方式,詳細(xì)參考如下:
public class DemoPartitioner implements Partitioner {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public void configure(Map configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes || keyBytes.length<1) {
return atomicInteger.getAndIncrement() % numPartitions;
}
//借用String的hashCode的計(jì)算方式
int hash = 0;
for (byte b : keyBytes) {
hash = 31 * hash + b;
}
return hash % numPartitions;
}
@Override
public void close() {}
}
這個(gè)自定義分區(qū)器的實(shí)現(xiàn)比較簡單,讀者可以根據(jù)自身業(yè)務(wù)的需求來靈活實(shí)現(xiàn)分配分區(qū)的計(jì)算方式,比如:一般大型電商都有多個(gè)倉庫,可以將倉庫的名稱或者ID作為Key來靈活的記錄商品信息。
本文的重點(diǎn)是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識點(diǎn)高級進(jìn)階干貨,希望對想成為架構(gòu)師的朋友有一定的參考和幫助
需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取
當(dāng)前文章:Kafka分區(qū)分配計(jì)算(分區(qū)器Partitions)
瀏覽路徑:http://www.ef60e0e.cn/article/gsdogo.html