1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢(xún)
      選擇下列產(chǎn)品馬上在線溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問(wèn)題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
      RocketMQ怎么在SpringBoot項(xiàng)目中使用

      這篇文章給大家介紹RocketMQ怎么在Spring Boot項(xiàng)目中使用,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

      創(chuàng)新互聯(lián)專(zhuān)注為客戶(hù)提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、福田網(wǎng)絡(luò)推廣、重慶小程序開(kāi)發(fā)公司、福田網(wǎng)絡(luò)營(yíng)銷(xiāo)、福田企業(yè)策劃、福田品牌公關(guān)、搜索引擎seo、人物專(zhuān)訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供福田建站搭建服務(wù),24小時(shí)服務(wù)熱線:028-86922220,官方網(wǎng)址:www.cdcxhl.com

      前言

      MQ,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在傳統(tǒng)的互聯(lián)網(wǎng)架構(gòu)中通常使用MQ來(lái)對(duì)上下游來(lái)做解耦合。

      舉例:當(dāng)A系統(tǒng)對(duì)B系統(tǒng)進(jìn)行消息通訊,如A系統(tǒng)發(fā)布一條系統(tǒng)公告,B系統(tǒng)可以訂閱該頻道進(jìn)行系統(tǒng)公告同步,整個(gè)過(guò)程中A系統(tǒng)并不關(guān)系B系統(tǒng)會(huì)不會(huì)同步,由訂閱該頻道的系統(tǒng)自行處理。

      什么是RocketMQ?#

      官方說(shuō)明:

      隨著使用越來(lái)越多的隊(duì)列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。我們盡力通過(guò)節(jié)流,斷路器或降級(jí)來(lái)解決此問(wèn)題,但效果不佳。因此,我們那時(shí)開(kāi)始關(guān)注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。

      看到這里可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。

      具有以下特性:

      • 支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對(duì)點(diǎn)(P2P)消息模型

      • 能夠保證嚴(yán)格的消息順序,在一個(gè)隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞

      • 提供豐富的消息拉取模式,支持拉(pull)和推(push)兩種消息模式

      • 單一隊(duì)列百萬(wàn)消息的堆積能力,億級(jí)消息堆積能力

      • 支持多種消息協(xié)議,如 JMS、MQTT 等

      • 分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語(yǔ)義

      RocketMQ環(huán)境安裝#

      下載地址:https://rocketmq.apache.org/dowloading/releases/

      從官方下載二進(jìn)制或者源碼來(lái)進(jìn)行使用。源碼編譯需要Maven3.2x,JDK8

      在根目錄進(jìn)行打包:

      mvn -Prelease-all -DskipTests clean packager -U

      distribution/target/apache-rocketmq文件夾中會(huì)存在一個(gè)文件夾版,zip,tar三個(gè)可運(yùn)行的完整程序。

      使用rocketmq-4.6.0.zip:

      • 啟動(dòng)名稱(chēng)服務(wù) mqnamesrv.cmd

      • 啟動(dòng)數(shù)據(jù)中心 mqbroker.cmd -n localhost:9876

      SpringBoot環(huán)境中使用RocketMQ#

      SpringBoot 入門(mén):https://www.jb51.net/article/177449.htm

      SpringBoot 常用start:https://www.jb51.net/article/177451.htm

      當(dāng)前環(huán)境版本為:

      • SpringBoot 2.0.6.RELEASE

      • SpringCloud Finchley.RELEASE

      • SpringCldod Alibaba 0.2.1.RELEASE

      • RocketMQ 4.3.0

      在項(xiàng)目工程中導(dǎo)入:

      
      
       org.apache.rocketmq
       rocketmq-client
       ${rocketmq.version}
      
      

      由于我們這邊已經(jīng)有工程了所以就不在進(jìn)行創(chuàng)建這種過(guò)程了。主要是看看如何使用RocketMQ。

      創(chuàng)建RocketMQProperties配置屬性類(lèi),類(lèi)中內(nèi)容如下:

      @ConfigurationProperties(prefix = "rocketmq")
      public class RocketMQProperties {
       private boolean isEnable = false;
       private String namesrvAddr = "localhost:9876";
       private String groupName = "default";
       private int producerMaxMessageSize = 1024;
       private int producerSendMsgTimeout = 2000;
       private int producerRetryTimesWhenSendFailed = 2;
       private int consumerConsumeThreadMin = 5;
       private int consumerConsumeThreadMax = 30;
       private int consumerConsumeMessageBatchMaxSize = 1;
       //省略get set
      }

      現(xiàn)在我們所有子系統(tǒng)中的生產(chǎn)者,消費(fèi)者對(duì)應(yīng):

      isEnable 是否開(kāi)啟mq

      namesrvAddr 集群地址

      groupName 分組名稱(chēng)

      設(shè)置為統(tǒng)一已方便系統(tǒng)對(duì)接,如有其它需求在進(jìn)行擴(kuò)展,類(lèi)中我們已經(jīng)給了默認(rèn)值也可以在配置文件或配置中心中獲取配置,配置如下:

      #發(fā)送同一類(lèi)消息的設(shè)置為同一個(gè)group,保證唯一,默認(rèn)不需要設(shè)置,rocketmq會(huì)使用ip@pid(pid代表jvm名字)作為唯一標(biāo)示
      rocketmq.groupName=please_rename_unique_group_name
      #是否開(kāi)啟自動(dòng)配置
      rocketmq.isEnable=true
      #mq的nameserver地址
      rocketmq.namesrvAddr=127.0.0.1:9876
      #消息最大長(zhǎng)度 默認(rèn)1024*4(4M)
      rocketmq.producer.maxMessageSize=4096
      #發(fā)送消息超時(shí)時(shí)間,默認(rèn)3000
      rocketmq.producer.sendMsgTimeout=3000
      #發(fā)送消息失敗重試次數(shù),默認(rèn)2
      rocketmq.producer.retryTimesWhenSendFailed=2
      #消費(fèi)者線程數(shù)量
      rocketmq.consumer.consumeThreadMin=5
      rocketmq.consumer.consumeThreadMax=32
      #設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條
      rocketmq.consumer.consumeMessageBatchMaxSize=1

      創(chuàng)建消費(fèi)者接口 RocketConsumer.java 該接口用戶(hù)約束消費(fèi)者需要的核心步驟:

      /**
       * 消費(fèi)者接口
       * 
       * @author SimpleWu
       *
       */
      public interface RocketConsumer {
      
      /**
       * 初始化消費(fèi)者
       */
       public abstract void init();
      
       /**
       * 注冊(cè)監(jiān)聽(tīng)
       * 
       * @param messageListener
       */
       public void registerMessageListener(MessageListener messageListener);
      
      }

      創(chuàng)建抽象消費(fèi)者 AbstractRocketConsumer.java:

      /**
       * 消費(fèi)者基本信息
       * 
       * @author SimpelWu
       */
      public abstract class AbstractRocketConsumer implements RocketConsumer {
      
       protected String topics;
       protected String tags;
       protected MessageListener messageListener;
       protected String consumerTitel;
       protected MQPushConsumer mqPushConsumer;
      
       /**
       * 必要的信息
       * 
       * @param topics
       * @param tags
       * @param consumerTitel
       */
       public void necessary(String topics, String tags, String consumerTitel) {
       this.topics = topics;
       this.tags = tags;
       this.consumerTitel = consumerTitel;
       }
      
       public abstract void init();
      
       @Override
       public void registerMessageListener(MessageListener messageListener) {
       this.messageListener = messageListener;
       }
       
      }

      在類(lèi)中我們必須指定這個(gè)topics,tags與消息監(jiān)聽(tīng)邏輯

      public abstract void init();該方法是用于初始化消費(fèi)者,由子類(lèi)實(shí)現(xiàn)。

      接下來(lái)我們編寫(xiě)自動(dòng)配置類(lèi)RocketMQConfiguation.java,該類(lèi)用戶(hù)初始化一個(gè)默認(rèn)的生產(chǎn)者連接,以及加載所有的消費(fèi)者。

      @EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件

      @Configuration 標(biāo)注為配置類(lèi)

      @ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當(dāng)配置中指定rocketmq.isEnable = true的時(shí)候才會(huì)生效

      核心內(nèi)容如下:

      /**
       * mq配置
       * 
       * @author SimpleWu
       */
      @Configuration
      @EnableConfigurationProperties({ RocketMQProperties.class })
      @ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
      public class RocketMQConfiguation {
      
       private RocketMQProperties properties;
      
       private ApplicationContext applicationContext;
      
       private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);
      
       public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) {
       this.properties = properties;
       this.applicationContext = applicationContext;
       }
      
       /**
       * 注入一個(gè)默認(rèn)的消費(fèi)者
       * @return
       * @throws MQClientException
       */
       @Bean
       public DefaultMQProducer getRocketMQProducer() throws MQClientException {
       if (StringUtils.isEmpty(properties.getGroupName())) {
        throw new MQClientException(-1, "groupName is blank");
       }
      
       if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
        throw new MQClientException(-1, "nameServerAddr is blank");
       }
       DefaultMQProducer producer;
       producer = new DefaultMQProducer(properties.getGroupName());
      
       producer.setNamesrvAddr(properties.getNamesrvAddr());
       // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
      
       // 如果需要同一個(gè)jvm中不同的producer往不同的mq集群發(fā)送消息,需要設(shè)置不同的instanceName
       // producer.setInstanceName(instanceName);
       producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
       producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
       // 如果發(fā)送消息失敗,設(shè)置重試次數(shù),默認(rèn)為2次
       producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());
      
       try {
        producer.start();
        log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),
         properties.getNamesrvAddr());
       } catch (MQClientException e) {
        log.error(String.format("producer is error {}", e.getMessage(), e));
        throw e;
       }
       return producer;
      
       }
      
       /**
       * SpringBoot啟動(dòng)時(shí)加載所有消費(fèi)者
       */
       @PostConstruct
       public void initConsumer() {
       Map consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
       if (consumers == null || consumers.size() == 0) {
        log.info("init rocket consumer 0");
       }
       Iterator beans = consumers.keySet().iterator();
       while (beans.hasNext()) {
        String beanName = (String) beans.next();
        AbstractRocketConsumer consumer = consumers.get(beanName);
        consumer.init();
        createConsumer(consumer);
        log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,
         consumer.topics);
       }
       }
      
       /**
       * 通過(guò)消費(fèi)者信心創(chuàng)建消費(fèi)者
       * 
       * @param consumerPojo
       */
       public void createConsumer(AbstractRocketConsumer arc) {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
       consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
       consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
       consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
       consumer.registerMessageListener(arc.messageListenerConcurrently);
       /**
        * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開(kāi)始消費(fèi)還是隊(duì)列尾部開(kāi)始消費(fèi) 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
        */
       // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
       /**
        * 設(shè)置消費(fèi)模型,集群還是廣播,默認(rèn)為集群
        */
       // consumer.setMessageModel(MessageModel.CLUSTERING);
      
       /**
        * 設(shè)置一次消費(fèi)消息的條數(shù),默認(rèn)為1條
        */
       consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
       try {
        consumer.subscribe(arc.topics, arc.tags);
        consumer.start();
        arc.mqPushConsumer=consumer;
       } catch (MQClientException e) {
        log.error("info consumer title {}", arc.consumerTitel, e);
       }
      
       }
      
      }

      然后在src/main/resources文件夾中創(chuàng)建目錄與文件META-INF/spring.factories里面添加自動(dòng)配置類(lèi)即可開(kāi)啟啟動(dòng)配置,我們只需要導(dǎo)入依賴(lài)即可:

      org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      com.xcloud.config.rocketmq.RocketMQConfiguation

      接下來(lái)在服務(wù)中導(dǎo)入依賴(lài),然后通過(guò)我們的抽象類(lèi)獲取所有必要信息對(duì)消費(fèi)者進(jìn)行創(chuàng)建,該步驟會(huì)在所有消費(fèi)者初始化完成后進(jìn)行,且只會(huì)管理是Spring Bean的消費(fèi)者。

      下面我們看看如何創(chuàng)建一個(gè)消費(fèi)者,創(chuàng)建消費(fèi)者的步驟非常簡(jiǎn)單,只需要繼承AbstractRocketConsumer然后再加上Spring的@Component就能夠完成消費(fèi)者的創(chuàng)建,我們可以在類(lèi)中自定義消費(fèi)的主題與標(biāo)簽。

      在項(xiàng)目可以根據(jù)需求當(dāng)消費(fèi)者創(chuàng)建失敗的時(shí)候是否繼續(xù)啟動(dòng)工程。

      創(chuàng)建一個(gè)默認(rèn)的消費(fèi)者 DefaultConsumerMQ.java

      @Component
      public class DefaultConsumerMQ extends AbstractRocketConsumer {
       /**
       * 初始化消費(fèi)者
       */
       @Override
       public void init() {
       // 設(shè)置主題,標(biāo)簽與消費(fèi)者標(biāo)題
       super.necessary("TopicTest", "*", "這是標(biāo)題");
       //消費(fèi)者具體執(zhí)行邏輯
       registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
        msgs.forEach(msg -> {
         System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));
        });
        // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
       });
       }
      }

      super.necessary("TopicTest", "*", "這是標(biāo)題"); 是必須要設(shè)置的,代表該消費(fèi)者監(jiān)聽(tīng)TopicTest主題下所有tags,標(biāo)題那個(gè)字段是我自己定義的,所以對(duì)于該配置來(lái)說(shuō)沒(méi)什么意義。

      我們可以在這里注入Spring的Bean來(lái)進(jìn)行任意邏輯處理。

      創(chuàng)建一個(gè)消息發(fā)送類(lèi)進(jìn)行測(cè)試

      @Override
      public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
       Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET));
       // 發(fā)送消息到一個(gè)Broker
       SendResult sendResult = defaultMQProducer.send(msg);
       // 通過(guò)sendResult返回消息是否成功送達(dá)
       System.out.printf("%s%n", sendResult);
       return null;
      }

      我們來(lái)通過(guò)Http請(qǐng)求測(cè)試:

      http://localhost:10001/demo/base/mq/hello consumer message boyd hello 
      http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿

      好了到這里簡(jiǎn)單的start算是設(shè)計(jì)完成了,后面還有一些:順序消息生產(chǎn),順序消費(fèi)消息,異步消息生產(chǎn)等一系列功能,官人可參照官方去自行處理。

      • ActiveMQ 沒(méi)經(jīng)過(guò)大規(guī)模吞吐量場(chǎng)景的驗(yàn)證,社區(qū)不高不活躍。

      • RabbitMQ 集群動(dòng)態(tài)擴(kuò)展麻煩,且與當(dāng)前程序語(yǔ)言不至于難以定制化。

      • kafka 支持主要的MQ功能,功能無(wú)法達(dá)到程序需求的要求,所以不使用,且與當(dāng)前程序語(yǔ)言不至于難以定制化。

      • rocketMQ 經(jīng)過(guò)全世界的女人的洗禮,已經(jīng)很強(qiáng)大;MQ功能較為完善,還是分布式的,擴(kuò)展性好;支持復(fù)雜MQ業(yè)務(wù)場(chǎng)景。(業(yè)務(wù)復(fù)雜可做首選)

      關(guān)于RocketMQ怎么在Spring Boot項(xiàng)目中使用就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。


      本文名稱(chēng):RocketMQ怎么在SpringBoot項(xiàng)目中使用
      網(wǎng)站URL:http://www.ef60e0e.cn/article/pjdjjh.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        民和| 阳江市| 泸溪县| 堆龙德庆县| 茌平县| 孝昌县| 崇阳县| 青冈县| 遵化市| 平凉市| 双桥区| 永顺县| 木里| 东平县| 新郑市| 子洲县| 化州市| 新巴尔虎左旗| 新闻| 南靖县| 牟定县| 吉木萨尔县| 含山县| 湖州市| 芦山县| 资源县| 定结县| 个旧市| 瑞金市| 同心县| 皋兰县| 平果县| 桦甸市| 富锦市| 灵台县| 西丰县| 南木林县| 财经| 育儿| 临城县| 遂川县|