1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問(wèn)題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
      KafkaTopicshell操作+基準(zhǔn)測(cè)試+javaAPI-創(chuàng)新互聯(lián)
      Kafka Topic shell操作+基準(zhǔn)測(cè)試+java API
      • 1- Kafka的相關(guān)使用操作

        創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供張家港網(wǎng)站建設(shè)、張家港做網(wǎng)站、張家港網(wǎng)站設(shè)計(jì)、張家港網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、張家港企業(yè)網(wǎng)站模板建站服務(wù),10年張家港做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
        • shell命令使用
        • Java API的使用
      • 2- Kafka的核心原理:

        • 分片和副本機(jī)制
      1 消息隊(duì)列的基本介紹 1.1 消息隊(duì)列產(chǎn)生背景

      什么是消息隊(duì)列呢?

      消息: 數(shù)據(jù) 只不過(guò)這個(gè)數(shù)據(jù)具有一種流動(dòng)狀態(tài)
      隊(duì)列: 存儲(chǔ)數(shù)據(jù)的容器 只不過(guò)這個(gè)容器具有FIFO(先進(jìn)先出)特性
      
      消息隊(duì)列: 數(shù)據(jù)在隊(duì)列中, 從隊(duì)列的一端傳遞到另一端的過(guò)程, 數(shù)據(jù)在整個(gè)隊(duì)列中產(chǎn)生一種流動(dòng)狀態(tài)
      1.2 常見(jiàn)的消息隊(duì)列的產(chǎn)品

      常見(jiàn)的消息隊(duì)列的產(chǎn)品:

      • 1- ActiveMQ: 出現(xiàn)時(shí)間比較早的一款消息隊(duì)列的組件, 目前整個(gè)社區(qū)活躍度非常低, 此軟件使用人群也在不斷的減少, 此軟件在前幾年中被Java工程師主要使用
      • 2- RabbitMQ: 目前在Java領(lǐng)域中使用非常頻繁的一款消息隊(duì)列的產(chǎn)品, 其社區(qū)活躍度相對(duì)不錯(cuò), 支持多種語(yǔ)言開(kāi)發(fā)
      • 3- RocketMQ: 是由阿里推出一款的消息隊(duì)列的中間件產(chǎn)品, 目前主要是在阿里系范圍內(nèi)使用, 目前支持的開(kāi)發(fā)語(yǔ)言相對(duì)較少一些, 比如成熟的客戶端還是JAVA
      • 4- Kafka: 是一款大數(shù)據(jù)領(lǐng)域下的消息隊(duì)列的產(chǎn)品, 主要應(yīng)用在大數(shù)據(jù)領(lǐng)域在, 在業(yè)務(wù)領(lǐng)域中使用較少
      • 5- Pulsar: 最近一兩年新起的一款消息隊(duì)列的組件, 也是Aapache頂級(jí)開(kāi)源項(xiàng)目, 目前主要由StreamNative公司進(jìn)行商業(yè)運(yùn)營(yíng)中
      1.3 消息隊(duì)列的作用是什么
      • 1- 同步轉(zhuǎn)異步
      • 2- 應(yīng)用解耦合
      • 3- 流量削峰 : 在秒殺場(chǎng)景中, 突然會(huì)有龐大的并發(fā)量, 但是過(guò)后就沒(méi)有了
      • 4- 消息驅(qū)動(dòng)系統(tǒng)
      1.4 消息隊(duì)列的兩種消費(fèi)模型
      • 點(diǎn)對(duì)點(diǎn): 數(shù)據(jù)被生產(chǎn)到容器后, 最終這個(gè)數(shù)據(jù)只能被一個(gè)消費(fèi)方來(lái)消費(fèi)數(shù)據(jù)
      • 發(fā)布訂閱: 數(shù)據(jù)被生產(chǎn)到容器后, 可以被多個(gè)消費(fèi)方同時(shí)接收到
      2 Kafka的基本介紹

      ? Kafka是Apache旗下的一款開(kāi)源免費(fèi)的消息隊(duì)列的中間件產(chǎn)品,最早是由領(lǐng)英公司開(kāi)發(fā)的, 后期共享給Apache, 目前已經(jīng)是Apache旗下的頂級(jí)開(kāi)源的項(xiàng)目, 采用語(yǔ)言為Scala

      ? 官方網(wǎng)站: http://www.kafka.apache.org

      適用場(chǎng)景: 數(shù)據(jù)傳遞工作, 需要將數(shù)據(jù)從一端傳遞到另一端, 此時(shí)可以通過(guò)Kafka來(lái)實(shí)現(xiàn), 不局限兩端的程序

      ? 在實(shí)時(shí)領(lǐng)域中, 主要是用于流式的數(shù)據(jù)處理工作

      3 Kafka的架構(gòu)
      Kafka Cluster: kafka集群
      broker: kafka的節(jié)點(diǎn)
      producer: 生產(chǎn)者
      consumer: 消費(fèi)者
      Topic: 主題/話題 理解就是一個(gè)大的邏輯容器(管道)
      	shard: 分片. 一個(gè)Topic可以被分為N多個(gè)分片, 分片的數(shù)量與節(jié)點(diǎn)數(shù)據(jù)沒(méi)有關(guān)系
      	replicas: 副本, 可以對(duì)每一個(gè)分片構(gòu)建多個(gè)副本, 副本數(shù)量最多和節(jié)點(diǎn)數(shù)量一致(包含本身) 保證數(shù)據(jù)不丟失
      zookeeper: 存儲(chǔ)管理集群的元數(shù)據(jù)信息
      4 Kafka的安裝操作

      參考Kafka的集群安裝文檔 完成整個(gè)安裝工作即可

      如果安裝后, 無(wú)法啟動(dòng), 可能遇到的問(wèn)題:

      1) 配置文件中忘記修改broker id 
      2) 忘記修改監(jiān)聽(tīng)的地址, 或者修改了但是前面的注釋沒(méi)有打開(kāi)

      如何啟動(dòng)Kafka集群:

      啟動(dòng)zookeeper集群: 每個(gè)節(jié)點(diǎn)都要執(zhí)行
      cd /export/server/zookeeper/bin
      ./zkServer.sh start
      
      啟動(dòng)完成后 需要通過(guò) jps -m 查看是否啟動(dòng) , 并且還需要通過(guò):
      ./zkServer.sh status 查看狀態(tài), 必須看到一個(gè)leader 兩個(gè)follower才認(rèn)為啟動(dòng)成功了
      
      啟動(dòng)Kafka集群: 
      
      單節(jié)點(diǎn): 每個(gè)節(jié)點(diǎn)都需要執(zhí)行
      cd /export/server/kafka_2.12-2.4.1/bin
      前臺(tái)啟動(dòng):
      	./kafka-server-start.sh ../config/server.properties
      后臺(tái)啟動(dòng):
      	nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &
      注意: 第一次啟動(dòng), 建議先前臺(tái)啟動(dòng), 觀察是否可以正常啟動(dòng), 如果OK, ctrl +C 退出, 然后掛載到后臺(tái)

      如何停止:

      單節(jié)點(diǎn): 每個(gè)節(jié)點(diǎn)都需要執(zhí)行
      cd /export/server/kafka_2.12-2.4.1/bin
      操作:
      	jps 然后通過(guò) kill -9
      	或者:
      	./kafka-server-stop.sh

      配置一鍵化腳本: 僅用于啟動(dòng)Kafka 不會(huì)啟動(dòng)zookeeper, zookeeper還是需要單獨(dú)啟動(dòng), 或者配置zookeeper的一鍵化腳本

      • 1- 創(chuàng)建一個(gè)onekey目錄: node1節(jié)點(diǎn)
      mkdir -p /export/onekey
      • 2- 將資料中提供的一鍵化腳本上傳到此目錄下
      cd /export/onekey/
      上傳即可
      • 3- 對(duì)shell腳本賦執(zhí)行的權(quán)限
      cd /export/onekey/
      chmod 755 *.sh
      • 4- 通過(guò)對(duì)應(yīng)的腳本來(lái)執(zhí)行啟動(dòng)和關(guān)閉即可
      5 Kafka的相關(guān)使用

      ? Kafka是一個(gè)消息隊(duì)列的中間件產(chǎn)品, 主要的作用: 將數(shù)據(jù)從程序一端傳遞到另一端的操作, 所以說(shuō)學(xué)習(xí)Kafka主要學(xué)習(xí)如何使用Kafka生產(chǎn)數(shù)據(jù), 以及如何使用Kafka消費(fèi)數(shù)據(jù)

      5.1 Kafka的shell命令使用
      • 1- 如何創(chuàng)建Topic : kafka-topic.sh
      ./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test01 --partitions 3 --replication-factor 2
      • 2- 查看當(dāng)前有那些topic:
      ./kafka-topics.sh  --list  --zookeeper node1:2181,node2:2181,node3:2181
      • 3- 查看某一個(gè)Topic的詳細(xì)信息:
      ./kafka-topics.sh --describe --zookeeper  node1:2181,node2:2181,node3:2181 --topic test01
      • 4- 如何刪除Topic
      ./kafka-topics.sh --delete --zookeeper node1:2181,node2:2181,node3:2181 --topic test01
      注意: 
      	默認(rèn)情況下, 刪除一個(gè)topic 僅僅是標(biāo)記刪除, 主要原因: Kafka擔(dān)心直接刪除, 會(huì)導(dǎo)致誤刪數(shù)據(jù)
      	
      	如果想執(zhí)行刪除的時(shí)候, 直接將topic完整的刪除掉: 此時(shí)需要在server.properties配置中修改一下配置為True
      		delete.topic.enable=true
      	
      	如果本身Topic中的數(shù)據(jù)量非常少, 或者沒(méi)有任何的使用, 此時(shí)Topic會(huì)自動(dòng)先執(zhí)行邏輯刪除, 然后在物理刪除, 不管是否配置delete.topic.enable
      • 5- 如何修改Topic
      Topic 僅允許增大分片, 不允許減少分片 同時(shí)也不支持修改副本的數(shù)量
      
      增大分區(qū):
      ./kafka-topics.sh  --alter --zookeeper node1:2181,node2:2181,node3:2181 --topic test01  --partitions 5
      • 6- 如何模擬生產(chǎn)者: 發(fā)送數(shù)據(jù)
      ./kafka-console-producer.sh  --broker-list node1:9092,node2:9092,node3:9092 --topic test01
      >
      • 7- 如何模擬消費(fèi)者: 消費(fèi)數(shù)據(jù)
      ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test01
      
      默認(rèn)從當(dāng)前的時(shí)間開(kāi)始消費(fèi)數(shù)據(jù), 如果想從頭開(kāi)始消費(fèi), 可以添加 --from-beginning  參數(shù)即可
      5.2 Kafka的基準(zhǔn)測(cè)試

      ? Kafka的基準(zhǔn)測(cè)試: 主要指的是將安裝完成的Kafka集群, 進(jìn)行測(cè)試操作, 測(cè)試其能否承載多大的并發(fā)量(讀寫的效率)

      ? 注意: 在進(jìn)行Kafka的基準(zhǔn)測(cè)試的時(shí)候, 受Topic的分片和副本的數(shù)量影響會(huì)比較大, 一般在測(cè)試的時(shí)候, 會(huì)構(gòu)建多個(gè)topic, 每一個(gè)topic設(shè)置不同的分片和副本的數(shù)量, 比如: 一個(gè)設(shè)置分片多一些, 副本多一些 一個(gè)設(shè)置分片多一些, 副本少些…

      • 1- 創(chuàng)建一個(gè)Topic
      ./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 --topic test02 --partitions 6 --replication-factor 1
      • 2- 測(cè)試寫入的數(shù)據(jù)的效率:
      cd /export/server/kafka/bin
      ./kafka-producer-perf-test.sh --topic test02 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
      
      屬性說(shuō)明:
      	--num-records : 發(fā)送的總消息量
      	--throughput : 指定吞吐量(限流)  -1 不限制
      	--record-size: 每條數(shù)據(jù)的大小(字節(jié))
          --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 : 設(shè)置kafka的地址和消息發(fā)送模式
      • 3- 測(cè)試讀取消息的效率
      cd /export/server/kafka/bin
      ./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test02 --fetch-size 1048576 --messages 5000000
      
      屬性說(shuō)明:
      	--fetch-size : 每次從Kafka端拉取數(shù)據(jù)量
      	--message: 測(cè)試的總消息量
      假設(shè)Kafka的節(jié)點(diǎn)數(shù)量是無(wú)限多的:
      	topic分片數(shù)量越多, 理論上讀寫效率越高
      	topic副本數(shù)量越多, 整體執(zhí)行效率越差
      
      一般可以將分片的數(shù)量設(shè)置為副本數(shù)量的三倍左右 可以測(cè)試出比較最佳的性能  副本調(diào)整為1
      5.3 Kafka的Java API的操作
      • 1- 創(chuàng)建一個(gè)Maven的項(xiàng)目, 導(dǎo)入相關(guān)的依賴
      aliyunhttp://maven.aliyun.com/nexus/content/groups/public/true false neverorg.apache.kafkakafka-clients2.4.1org.apache.commonscommons-io1.3.2org.slf4jslf4j-log4j121.7.6log4jlog4j1.2.16 org.apache.maven.plugins maven-compiler-plugin 3.1  1.8 1.8 
      • 2- 創(chuàng)建兩個(gè)包結(jié)構(gòu): com.kafka.producer / com.kafka.consumer
      5.3.1 演示如何將數(shù)據(jù)生產(chǎn)到Kafka
      package com.itheima.kafka.producer;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.Producer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      
      import java.util.Properties;
      
      public class KafkaProducerTest {public static void main(String[] args) {// 第一步: 創(chuàng)建kafka的生產(chǎn)者核心對(duì)象: KafkaProducer  傳入相關(guān)的配置
              Properties props = new Properties();
              props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
              props.put("acks", "all");
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              Producerproducer = new KafkaProducer<>(props);
      
              //2. 執(zhí)行發(fā)送數(shù)據(jù)操作
              for (int i = 0; i< 10; i++) {ProducerRecordproducerRecord = new ProducerRecord<>(
                          "test01", "張三"+i
                  );
                  producer.send(producerRecord);
              }
      
              //3. 執(zhí)行close 釋放資源
              producer.close();
      
          }
      }
      
      5.3.2 演示如何從Kafka消費(fèi)數(shù)據(jù)
      package com.itheima.kafka.consumer;
      
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      
      import java.time.Duration;
      import java.util.Arrays;
      import java.util.Properties;
      
      public class KafkaConsumerTest {public static void main(String[] args) {// 1- 創(chuàng)建Kafka的消費(fèi)者的核心對(duì)象: KafkaConsumer
              Properties props = new Properties();
              props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
              props.put("group.id", "test"); // 消費(fèi)者組的ID
              props.put("enable.auto.commit", "true"); // 是否自動(dòng)提交偏移量offset
              props.put("auto.commit.interval.ms", "1000"); // 自動(dòng)提交的間隔時(shí)間
              props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的類
              props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的類
      
              KafkaConsumerconsumer = new KafkaConsumer<>(props);
              //2. 訂閱topic: 表示消費(fèi)者從那個(gè)topic來(lái)消費(fèi)數(shù)據(jù)  可以指定多個(gè)
              consumer.subscribe(Arrays.asList("test01"));
      
              while (true) {// 3. 從kafka中獲取消息數(shù)據(jù), 參數(shù)表示當(dāng)kafka中沒(méi)有消息的時(shí)候, 等待的超時(shí)時(shí)間, 如果過(guò)了等待的時(shí)間, 返回空對(duì)象(對(duì)象存在, 但是內(nèi)部沒(méi)有數(shù)據(jù)  相當(dāng)于空容器)
                  ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(100));
                  for (ConsumerRecordrecord : records) {long offset = record.offset();
                      String key = record.key();
                      String value = record.value();
                      // 偏移量: 每一條數(shù)據(jù) 其實(shí)就是一個(gè)偏移量 , 每個(gè)分片單獨(dú)統(tǒng)計(jì)消息到達(dá)了第幾個(gè)偏移量 偏移量從 0 開(kāi)始的
                      System.out.println("消息的偏移量為:"+offset+"; key值為:"+key + "; value的值為:"+ value);
                  }
              }
      
          }
      
      }

      你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧


      網(wǎng)頁(yè)名稱:KafkaTopicshell操作+基準(zhǔn)測(cè)試+javaAPI-創(chuàng)新互聯(lián)
      URL分享:http://www.ef60e0e.cn/article/deoicp.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        攀枝花市| 陵川县| 永川市| 安新县| 吉首市| 津市市| 古浪县| 环江| 岢岚县| 新河县| 保康县| 屯门区| 崇信县| 巫溪县| 周至县| 岳阳市| 松阳县| 营山县| 勐海县| 平顶山市| 满洲里市| 香河县| 子长县| 沧州市| 阳曲县| 万全县| 牙克石市| 葵青区| 南岸区| 钦州市| 陕西省| 巴青县| 黑河市| 互助| 登封市| 钦州市| 浦江县| 广水市| 阜康市| 荔波县| 黄石市|