1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
      Kafka分區(qū)分配計(jì)算(分區(qū)器Partitions)

      KafkaProducer在調(diào)用send方法發(fā)送消息至broker的過程中,首先是經(jīng)過攔截器Inteceptors處理,然后是經(jīng)過序列化Serializer處理,之后就到了Partitions階段,即分區(qū)分配計(jì)算階段。在某些應(yīng)用場景下,業(yè)務(wù)邏輯需要控制每條消息落到合適的分區(qū)中,有些情形下則只要根據(jù)默認(rèn)的分配規(guī)則即可。在KafkaProducer計(jì)算分配時(shí),首先根據(jù)的是ProducerRecord中的partition字段指定的序號計(jì)算分區(qū)。讀者有可能剛睡醒,看到這個(gè)ProducerRecord似曾相識,沒有關(guān)系,先看段Kafka生產(chǎn)者的示例片段:

      在申扎等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站設(shè)計(jì)、網(wǎng)站制作 網(wǎng)站設(shè)計(jì)制作按需開發(fā)網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計(jì),營銷型網(wǎng)站建設(shè),外貿(mào)網(wǎng)站制作,申扎網(wǎng)站建設(shè)費(fèi)用合理。

      Producer producer = new KafkaProducer(properties);
      String message = "kafka producer demo";
      ProducerRecord producerRecord = new ProducerRecord(topic,message);
      try {
          producer.send(producerRecord).get();
      } catch (InterruptedException e) {
          e.printStackTrace();
      } catch (ExecutionException e) {
          e.printStackTrace();
      }

      沒錯(cuò),ProducerRecord只是一個(gè)封裝了消息的對象而已,ProducerRecord一共有5個(gè)成員變量,即:

      private final String topic;//所要發(fā)送的topic
      private final Integer partition;//指定的partition序號
      private final Headers headers;//一組鍵值對,與RabbitMQ中的headers類似,kafka0.11.x版本才引入的一個(gè)屬性
      private final K key;//消息的key
      private final V value;//消息的value,即消息體
      private final Long timestamp;//消息的時(shí)間戳,可以分為Create_Time和LogAppend_Time之分,這個(gè)以后的文章中再表。123456

      在KafkaProducer的源碼(1.0.0)中,計(jì)算分區(qū)時(shí)調(diào)用的是下面的partition()方法:

      /**
       * computes partition for given record.
       * if the record has partition returns the value otherwise
       * calls configured partitioner class to compute the partition.
       */
      private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
          Integer partition = record.partition();
          return partition != null ?
                  partition :
                  partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
      }

      可以看出的確是先判斷有無指明ProducerRecord的partition字段,如果沒有指明,則再進(jìn)一步計(jì)算分區(qū)。上面這段代碼中的partitioner在默認(rèn)情況下是指Kafka默認(rèn)實(shí)現(xiàn)的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法實(shí)現(xiàn)如下:

      /**
       * Compute the partition for the given record.
       *
       * @param topic The topic name
       * @param key The key to partition on (or null if no key)
       * @param keyBytes serialized key to partition on (or null if no key)
       * @param value The value to partition on or null
       * @param valueBytes serialized value to partition on or null
       * @param cluster The current cluster metadata
       */
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
          List partitions = cluster.partitionsForTopic(topic);
          int numPartitions = partitions.size();
          if (keyBytes == null) {
              int nextValue = nextValue(topic);
              List availablePartitions = cluster.availablePartitionsForTopic(topic);
              if (availablePartitions.size() > 0) {
                  int part = Utils.toPositive(nextValue) % availablePartitions.size();
                  return availablePartitions.get(part).partition();
              } else {
                  // no partitions are available, give a non-available partition
                  return Utils.toPositive(nextValue) % numPartitions;
              }
          } else {
              // hash the keyBytes to choose a partition
              return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
          }
      }
      
      private int nextValue(String topic) {
          AtomicInteger counter = topicCounterMap.get(topic);
          if (null == counter) {
              counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
              AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
              if (currentCounter != null) {
                  counter = currentCounter;
              }
          }
          return counter.getAndIncrement();
      }

      由上源碼可以看出partition的計(jì)算方式:

      • 如果key為null,則按照一種輪詢的方式來計(jì)算分區(qū)分配
      • 如果key不為null則使用稱之為murmur的Hash算法(非加密型Hash函數(shù),具備高運(yùn)算性能及低碰撞率)來計(jì)算分區(qū)分配。

      KafkaProducer中還支持自定義分區(qū)分配方式,與org.apache.kafka.clients.producer.internals.DefaultPartitioner一樣首先實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class為對應(yīng)的自定義分區(qū)器(Partitioners)即可,即:

      properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");

      自定義DemoPartitioner主要是實(shí)現(xiàn)Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的計(jì)算方式,詳細(xì)參考如下:

      public class DemoPartitioner implements Partitioner {
      
          private final AtomicInteger atomicInteger = new AtomicInteger(0);
      
          @Override
          public void configure(Map configs) {}
      
          @Override
          public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
              List partitions = cluster.partitionsForTopic(topic);
              int numPartitions = partitions.size();
              if (null == keyBytes || keyBytes.length<1) {
                  return atomicInteger.getAndIncrement() % numPartitions;
              }
              //借用String的hashCode的計(jì)算方式
              int hash = 0;
              for (byte b : keyBytes) {
                  hash = 31 * hash + b;
              }
              return hash % numPartitions;
          }
      
          @Override
          public void close() {}
      }

      這個(gè)自定義分區(qū)器的實(shí)現(xiàn)比較簡單,讀者可以根據(jù)自身業(yè)務(wù)的需求來靈活實(shí)現(xiàn)分配分區(qū)的計(jì)算方式,比如:一般大型電商都有多個(gè)倉庫,可以將倉庫的名稱或者ID作為Key來靈活的記錄商品信息。


      本文的重點(diǎn)是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識點(diǎn)高級進(jìn)階干貨,希望對想成為架構(gòu)師的朋友有一定的參考和幫助

      需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

      Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )
      Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )
      Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )
      Kafka 分區(qū)分配計(jì)算(分區(qū)器 Partitions )


      當(dāng)前文章:Kafka分區(qū)分配計(jì)算(分區(qū)器Partitions)
      瀏覽路徑:http://www.ef60e0e.cn/article/gsdogo.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        三台县| 宁明县| 太仆寺旗| 昂仁县| 思南县| 江源县| 揭阳市| 湖州市| 苍南县| 永清县| 东宁县| 柯坪县| 阜宁县| 苏州市| 景德镇市| 屯门区| 永安市| 辉南县| 绥滨县| 南汇区| 西和县| 醴陵市| 邹平县| 全椒县| 开江县| 临武县| 海伦市| 洪泽县| 郸城县| 旬阳县| 土默特右旗| 门源| 麻城市| 大安市| 吉木萨尔县| 礼泉县| 靖江市| 嘉峪关市| 东平县| 衡山县| 芒康县|