1. <ul id="0c1fb"></ul>

      <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
      <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区

      RELATEED CONSULTING
      相關(guān)咨詢(xún)
      選擇下列產(chǎn)品馬上在線(xiàn)溝通
      服務(wù)時(shí)間:8:30-17:00
      你可能遇到了下面的問(wèn)題
      關(guān)閉右側(cè)工具欄

      新聞中心

      這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
      redis:從入門(mén)到入土:6.Redis消息隊(duì)列-創(chuàng)新互聯(lián)
      Redis學(xué)習(xí)
      • 一:認(rèn)識(shí)消息隊(duì)列
      • 二:基于List實(shí)現(xiàn)消息隊(duì)列
        • 1.基于List結(jié)構(gòu)模擬消息隊(duì)列
        • 2.基于List的消息隊(duì)列有哪些優(yōu)缺點(diǎn)
      • 三:基于PubSub的消息隊(duì)列
        • 1.說(shuō)明
        • 2.基于PubSub的消息隊(duì)列的優(yōu)缺點(diǎn)
      • 四:Redis消息隊(duì)列-基于Stream的消息隊(duì)列
        • 1.發(fā)送消息的命令
        • 2.讀取消息的方式之一:XREAD
          • 2.1.使用示例
          • 2.2.問(wèn)題
      • 五:基于Stream的消息隊(duì)列-消費(fèi)者組
        • 1.常用命令
        • 2.消費(fèi)者監(jiān)聽(tīng)消息的基本思路
        • 3.STREAM類(lèi)型消息隊(duì)列的XREADGROUP命令特點(diǎn)
        • 4.對(duì)比
      • 六: 基于Redis的Stream結(jié)構(gòu)作為消息隊(duì)列,實(shí)現(xiàn)異步秒殺下單
        • 1.需求
        • 2.代碼
      • 七:總結(jié)

      10年積累的成都做網(wǎng)站、網(wǎng)站制作經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶(hù)對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶(hù)得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)制作后付款的網(wǎng)站建設(shè)流程,更有介休免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。一:認(rèn)識(shí)消息隊(duì)列

      什么是消息隊(duì)列:字面意思就是存放消息的隊(duì)列。最簡(jiǎn)單的消息隊(duì)列模型包括3個(gè)角色:

      • 消息隊(duì)列:存儲(chǔ)和管理消息,也被稱(chēng)為消息代理(Message Broker)
      • 生產(chǎn)者:發(fā)送消息到消息隊(duì)列
      • 消費(fèi)者:從消息隊(duì)列獲取消息并處理消息

      使用隊(duì)列的好處在于 解耦, 所謂解耦,舉一個(gè)生活中的例子就是:快遞員(生產(chǎn)者)把快遞放到快遞柜里邊(Message Queue)去,我們(消費(fèi)者)從快遞柜里邊去拿東西,這就是一個(gè)異步,如果耦合,那么這個(gè)快遞員相當(dāng)于直接把快遞交給你,這事固然好,但是萬(wàn)一你不在家,那么快遞員就會(huì)一直等你,這就浪費(fèi)了快遞員的時(shí)間,所以這種思想在我們?nèi)粘i_(kāi)發(fā)中,是非常有必要的。

      這種場(chǎng)景在我們秒殺中就變成了:我們下單之后,利用redis去進(jìn)行校驗(yàn)下單條件,再通過(guò)隊(duì)列把消息發(fā)送出去,然后再啟動(dòng)一個(gè)線(xiàn)程去消費(fèi)這個(gè)消息,完成解耦,同時(shí)也加快我們的響應(yīng)速度。
      在這里插入圖片描述

      這里我們可以使用一些現(xiàn)成的mq,比如kafka,rabbitmq等等,我們也可以直接使用redis提供的mq方案,降低我們的部署和學(xué)習(xí)成本。

      二:基于List實(shí)現(xiàn)消息隊(duì)列 1.基于List結(jié)構(gòu)模擬消息隊(duì)列

      消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。而Redis的list數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。

      隊(duì)列是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP來(lái)實(shí)現(xiàn)。
      不過(guò)要注意的是,當(dāng)隊(duì)列中沒(méi)有消息時(shí)RPOP或LPOP操作會(huì)返回null,并不像JVM的阻塞隊(duì)列那樣會(huì)阻塞并等待消息。因此這里應(yīng)該使用BRPOP或者BLPOP來(lái)實(shí)現(xiàn)阻塞效果。
      在這里插入圖片描述
      在這里插入圖片描述

      2.基于List的消息隊(duì)列有哪些優(yōu)缺點(diǎn)

      優(yōu)點(diǎn):

      • 利用Redis存儲(chǔ),不受限于JVM內(nèi)存上限
      • 基于Redis的持久化機(jī)制,數(shù)據(jù)安全性有保證
      • 可以滿(mǎn)足消息有序性

      缺點(diǎn):

      • 無(wú)法避免消息丟失
      • 只支持單消費(fèi)者
      三:基于PubSub的消息隊(duì)列 1.說(shuō)明

      PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel,生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。

      SUBSCRIBE channel [channel] :訂閱一個(gè)或多個(gè)頻道
      PUBLISH channel msg :向一個(gè)頻道發(fā)送消息
      PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道

      在這里插入圖片描述
      在這里插入圖片描述

      2.基于PubSub的消息隊(duì)列的優(yōu)缺點(diǎn)

      優(yōu)點(diǎn):

      • 采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費(fèi)

      缺點(diǎn):

      • 不支持?jǐn)?shù)據(jù)持久化
      • 無(wú)法避免消息丟失
      • 消息堆積有上限,超出時(shí)數(shù)據(jù)丟失.數(shù)據(jù)不會(huì)持久化,但是會(huì)緩存在客戶(hù)端
      四:Redis消息隊(duì)列-基于Stream的消息隊(duì)列

      Stream 是 Redis 5.0 引入的一種新數(shù)據(jù)類(lèi)型,可以實(shí)現(xiàn)一個(gè)功能非常完善的消息隊(duì)列。

      1.發(fā)送消息的命令

      在這里插入圖片描述

      例如:
      在這里插入圖片描述

      2.讀取消息的方式之一:XREAD 2.1.使用示例

      在這里插入圖片描述
      例如,使用XREAD讀取第一個(gè)消息:
      在這里插入圖片描述

      XREAD阻塞方式,讀取最新的消息:
      在這里插入圖片描述

      在業(yè)務(wù)開(kāi)發(fā)中,我們可以循環(huán)的調(diào)用XREAD阻塞方式來(lái)查詢(xún)最新消息,從而實(shí)現(xiàn)持續(xù)監(jiān)聽(tīng)隊(duì)列的效果,偽代碼如下
      在這里插入圖片描述

      2.2.問(wèn)題

      注意:當(dāng)我們指定起始ID為$時(shí),代表讀取最新的消息,如果我們處理一條消息的過(guò)程中,又有超過(guò)1條以上的消息到達(dá)隊(duì)列,則下次獲取時(shí)也只能獲取到最新的一條,會(huì)出現(xiàn)漏讀消息的問(wèn)題

      STREAM類(lèi)型消息隊(duì)列的XREAD命令特點(diǎn):

      • 消息可回溯
      • 一個(gè)消息可以被多個(gè)消費(fèi)者讀取
      • 可以阻塞讀取
      • 有消息漏讀的風(fēng)險(xiǎn)
      五:基于Stream的消息隊(duì)列-消費(fèi)者組

      消費(fèi)者組(Consumer Group):將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽(tīng)同一個(gè)隊(duì)列。具備下列特點(diǎn):
      在這里插入圖片描述

      1.常用命令

      創(chuàng)建消費(fèi)者組:

      XGROUP CREATE KEY GROUPNAME ID [MKSTREAM] 
      
      key:隊(duì)列名稱(chēng)
      groupName:消費(fèi)者組名稱(chēng)
      ID:起始ID標(biāo)示,$代表隊(duì)列中最后一個(gè)消息,0則代表隊(duì)列中第一個(gè)消息
      MKSTREAM:隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建隊(duì)列
      其它常見(jiàn)命令:
      

      刪除指定的消費(fèi)者組

      XGROUP DESTORY key groupName

      給指定的消費(fèi)者組添加消費(fèi)者

      XGROUP CREATECONSUMER key groupname consumername

      刪除消費(fèi)者組中的指定消費(fèi)者

      XGROUP DELCONSUMER key groupname consumername

      從消費(fèi)者組讀取消息:

      XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
      • group:消費(fèi)組名稱(chēng)
      • consumer:消費(fèi)者名稱(chēng),如果消費(fèi)者不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)消費(fèi)者
      • count:本次查詢(xún)的大數(shù)量
      • BLOCK milliseconds:當(dāng)沒(méi)有消息時(shí)最長(zhǎng)等待時(shí)間
      • NOACK:無(wú)需手動(dòng)ACK,獲取到消息后自動(dòng)確認(rèn)
      • STREAMS key:指定隊(duì)列名稱(chēng)
      • ID:獲取消息的起始ID:

      “>”:從下一個(gè)未消費(fèi)的消息開(kāi)始
      其它:根據(jù)指定id從pending-list中獲取已消費(fèi)但未確認(rèn)的消息,例如0,是從pending-list中的第一個(gè)消息開(kāi)始

      2.消費(fèi)者監(jiān)聽(tīng)消息的基本思路

      在這里插入圖片描述

      3.STREAM類(lèi)型消息隊(duì)列的XREADGROUP命令特點(diǎn)
      • 消息可回溯
      • 可以多消費(fèi)者爭(zhēng)搶消息,加快消費(fèi)速度
      • 可以阻塞讀取
      • 沒(méi)有消息漏讀的風(fēng)險(xiǎn)
      • 有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次
      4.對(duì)比

      在這里插入圖片描述

      六: 基于Redis的Stream結(jié)構(gòu)作為消息隊(duì)列,實(shí)現(xiàn)異步秒殺下單 1.需求
      • 創(chuàng)建一個(gè)Stream類(lèi)型的消息隊(duì)列,名為stream.orders
      • 修改之前的秒殺下單Lua腳本,在認(rèn)定有搶購(gòu)資格后,直接向stream.orders中添加消息,內(nèi)容包含voucherId、userId、orderId
      • 項(xiàng)目啟動(dòng)時(shí),開(kāi)啟一個(gè)線(xiàn)程任務(wù),嘗試獲取stream.orders中的消息,完成下單
      2.代碼

      Lua腳本

      -- 1.參數(shù)列表
      
      --1.1 優(yōu)惠卷id,用戶(hù)id,訂單id
      local voucherId = ARGV[1]
      local userId = ARGV[2]
      local orderId = ARGV[3]
      
      -- 2.key 庫(kù)存key,訂單key
      local stockKey = 'secKill:stock:' .. voucherId
      local orderKey = 'secKill:order:' .. voucherId
      
      -- 判斷庫(kù)存是否充足
      if (tonumber(redis.call('get', stockKey)<= 0) then
      -- 不充足,返回1
          return 1
      end;
      
      --判斷用戶(hù)是否下單 存在返回2
      if(redis.call('sismember',orderKey,userId) == 1) then
          return 2
      end
      
      --扣庫(kù)存
      redis.call('incrby', stockKey,-1)
      --下單(保存用戶(hù))
      redis.call("sadd",orderKey,userId)
      -- 發(fā)送消息隊(duì)列到隊(duì)列當(dāng)中
      redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
      -- 成功
      return 0

      VoucherOrderServiceImpl

      package com.***.flow.impl;
      
      import cn.hutool.core.bean.BeanUtil;
      import com.***.flow.entity.Vouchers;
      import com.****.flow.service.FlowProcessService;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.data.redis.connection.stream.*;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.stereotype.Service;
      
      import javax.annotation.PostConstruct;
      import java.time.Duration;
      import java.util.List;
      import java.util.concurrent.*;
      
      @Slf4j
      @Service
      public class FlowProcessServiceImpl implements FlowProcessService {@Autowired
          private StringRedisTemplate stringRedisTemplate;
      
          @Autowired
          //線(xiàn)程池
          private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
      
          //啟動(dòng)時(shí)就開(kāi)啟線(xiàn)程池執(zhí)行任務(wù)
          @PostConstruct
          private void init() {SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());
          }
      
      
          private class VocherOrderHandler implements Runnable {@Override
              public void run() {while (true) {try {//1.獲取消息隊(duì)列的訂單信息
                          // XREADGROUP GROUP g1 c1 count 1 block streams streams.order >List>list = stringRedisTemplate.opsForStream().read(
                                  Consumer.from("g1", "c1"),
                                  StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                                  StreamOffset.create("streams:order", ReadOffset.lastConsumed())
      
                          );
      
                          //2.判斷消息獲取是否成功
                          if (list == null || list.isEmpty()) {//2.1 獲取失敗,說(shuō)明沒(méi)有消息,繼續(xù)下一次循環(huán)
                              continue;
                          }
      
      
                          //3.解析消息中的訂單信息
                          MapRecordvalues = list.get(0);
                          Vouchers vouchers = BeanUtil.fillBeanWithMap(values, new Vouchers(), true);
      
                          handleVoucherOrder(vouchers);
      
      
                          //4.ack確認(rèn)
                          stringRedisTemplate.opsForStream().acknowledge("streams:order", "g1", record.getId());
                      } catch (Exception e) {//沒(méi)有被處理,消息在pendingList 里面
      
                          log.info("處理訂單異常:", e);
                          handlePendingList();
                      }
                  }
              }
          }
      
          private void handlePendingList() {while (true) {try {//1.獲取pending_list里面的訂單信息
                      // XREADGROUP GROUP g1 c1 count 1  streams streams.order 0
                      List>list = stringRedisTemplate.opsForStream().read(
                              Consumer.from("g1", "c1"),
                              StreamReadOptions.empty().count(1),
                              StreamOffset.create("streams:order", ReadOffset.from("2"))
      
                      );
      
                      //2.判斷消息獲取是否成功
                      if (list == null || list.isEmpty()) {//2.1 獲取失敗,說(shuō)明沒(méi)有異常信息,結(jié)束循環(huán)
                          break;
                      }
      
      
                      //3.解析消息中的訂單信息
                      MapRecordvalues = list.get(0);
                      Vouchers vouchers = BeanUtil.fillBeanWithMap(values, new Vouchers(), true);
      
                      handleVoucherOrder(vouchers);
      
      
                      //4.ack確認(rèn)
                      stringRedisTemplate.opsForStream().acknowledge("streams:order", "g1", record.getId());
                  } catch (Exception e) {//沒(méi)有被處理,消息在pendingList 里面
                      log.info("處理訂單異常:", e);
                  }
              }
          }
      }
      
        
      
      }
      七:總結(jié)
      • 這個(gè)stream 內(nèi)容有些復(fù)雜,只需要記一下邏輯就好
      • 黑馬redis 入門(mén)到入土

      你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧


      本文標(biāo)題:redis:從入門(mén)到入土:6.Redis消息隊(duì)列-創(chuàng)新互聯(lián)
      當(dāng)前URL:http://www.ef60e0e.cn/article/dgsopc.html
      99热在线精品一区二区三区_国产伦精品一区二区三区女破破_亚洲一区二区三区无码_精品国产欧美日韩另类一区
      1. <ul id="0c1fb"></ul>

        <noscript id="0c1fb"><video id="0c1fb"></video></noscript>
        <noscript id="0c1fb"><listing id="0c1fb"><thead id="0c1fb"></thead></listing></noscript>

        霞浦县| 东乡族自治县| 新晃| 屯门区| 道真| 凤山县| 蒙城县| 聂拉木县| 北京市| 丰原市| 桂林市| 慈利县| 墨江| 兴和县| 珲春市| 车致| 桃园县| 阿合奇县| 建昌县| 英吉沙县| 延安市| 夏津县| 邵东县| 灵宝市| 赣榆县| 花垣县| 山丹县| 西乡县| 迁安市| 东兰县| 汝城县| 怀远县| 武城县| 连城县| 烟台市| 渝北区| 赣榆县| 临邑县| 惠东县| 玉田县| 综艺|