rabbitmq安裝部署
生產(chǎn)者(producter)隊列消息的產(chǎn)生者,復制生產(chǎn)消息,并將消息傳入隊列生產(chǎn)者代碼:
(相關資料圖)
import pikaimport jsoncredentials = pika.PlainCredentials("admin","admin")#mq用戶名和密碼,用于認證#虛擬隊列需要指定參數(shù)virtual_host,如果是默認的可以不填connection = pika.BlockingConnection(pika.ConnectionParameters(host="10.0.0.24",port=5672,virtual_host="/",credentials=credentials))channel = connection.channel()# 創(chuàng)建一個AMQP信道#聲明隊列,并設置durable為True,為了避免rabbitMq-server掛掉數(shù)據(jù)丟失,將durable設為Truechannel.queue_declare(queue="1",durable=True)for i in range(10): # 創(chuàng)建10個q message = json.dumps({"OrderId":"1000%s"%i}) # exchange表示交換器,可以精確的指定消息應該發(fā)到哪個隊列中,route_key設置隊列的名稱,body表示發(fā)送的內容 channel.basic_publish(exchange="",routing_key="1",body=message) print(message)connection.close()操作前通過pika生命一個認證用的憑證,然后用pika創(chuàng)建rabbitmq的塊連接,再用上面的連接創(chuàng)建一個AMQP信道 。創(chuàng)建消息隊列的連接時,需要指定ip,斷開,虛擬主機,憑證。
然后根據(jù)上面的信道,聲明一個隊列,
我們可以看到,下面信道點隊列聲明里的queue參數(shù)值就隊列的名字。這里是遍歷0到9,然后打印了下消息,這里的生成的消息,是json序列化后的數(shù)據(jù)。然后將數(shù)據(jù)作為i,信道點基礎發(fā)布的body參數(shù)的值。上面信道點隊列聲明是創(chuàng)建一個隊列,隊列名字是’1‘,下面我們用信道點基本發(fā)布,是將我們創(chuàng)建的消息體發(fā)送到隊列中,路由_key就是指定隊列名稱,指定發(fā)布消息到哪個隊列,消息是作為body的參數(shù),
最后,需要將這個消息隊列的連接關閉。
我們通過頁面可以看到,已經(jīng)創(chuàng)建好了這個隊列,隊列名字為1,并且已經(jīng)通過遍歷生成的10個消息,調用十次信道點基礎發(fā)布方法,將這十個產(chǎn)生的消息發(fā)布到消息隊列中
我們可以再看下,可以看到我們創(chuàng)建的消息的具體內容。
消費者(consumer):隊列消息的接收者,扶著接收并處理消息隊列中的消息
import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters( host="10.0.0.24", port=5672, virtual_host="/", credentials=credentials))channel = connection.channel()#聲明消息隊列,消息在這個隊列中傳遞,如果不存在,則創(chuàng)建隊列channel.queue_declare(queue="1",durable=True)# 定義一個回調函數(shù)來處理消息隊列中消息,這里是打印出來def callback(ch,method,properties,body): ch.basic_ack(delivery_tag=method.delivery_tag) print(body.decode())#告訴rabbitmq,用callback來接收消息channel.basic_consume("1",callback)#開始接收信息,并進入阻塞狀態(tài),隊列里有信息才會調用callback進行處理channel.start_consuming()獲取消息,創(chuàng)建憑證,連接,信道,然后什么一下隊列。指定我們要獲取哪個隊列中的消息,如果沒有這個隊列,就會創(chuàng)建這個隊列,存在,那么后面使用這個信道,就會從這個隊列中獲取數(shù)據(jù)。信道是通過rabbitmq的連接對象來生成的,連接對象中放了連接用的憑證。所以,信道點基礎消費方法,指定是哪個消息隊列,那么就會從這個隊列中獲取消息。然后傳參回調函數(shù)。而回調函數(shù)中,
我們可以看到,基礎消費方法里面有消息回調,就是上面我們自定義的回調函數(shù)
這個方法定義了回調函數(shù)的寫法。第一個參數(shù)是信道
第二個參數(shù)是方法,第三個參數(shù)是屬性,第四個是body,這些不用管,只需要按如下格式,就可以從body,做個解碼,就將信道點基礎消費中指定的隊列中的消息,取出來了,我們是用回調函數(shù)來接收消息,當需要獲取消息的時候,就需要執(zhí)行信道點開始消費的方法。這里好像是遍歷隊列一個一個的將消息獲取出來。那么怎樣實現(xiàn),實時監(jiān)聽消息,實時消費呢
RabbitMq持久化MQ默認建立的臨時的queue和exchange,如果不聲明持久化,一旦rabbitmq掛掉,queue,exchange將會全部丟失,所以我們一般在創(chuàng)建queue或者exchange的時候會聲明持久化
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表消息隊列持久化存儲,F(xiàn)alse 非持久化存儲result = channel.queue_declare(queue = "python-test",durable = True)
使用True
重啟消息隊列服務
消息隊列還在,但是消息被清空了
當我改為false的時候,因為隊列1已經(jīng)存在,并且是Tue聲明的,所以這里就報錯了
我們設置為false,然后聲明一個不存在的隊列2
創(chuàng)建好了隊列,并且10個消息
重啟一下消息隊列服務
剛剛上面創(chuàng)建的隊列2已經(jīng)不存在,這已經(jīng)不是消息被清空了,而是隊列直接被清除了
也就是這個Ture,是保留隊列用的,持久化隊列的。
channel.queue_declare(queue="2",durable=True)
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建.durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test", durable = True)
注意:如果已存在一個非持久化的queue或exchange,執(zhí)行上述代碼會報錯,因為當前狀態(tài)不能更該queue 或 exchange存儲屬性,需要刪除重建,如果queue和exchange中一個聲明了持久化,另一個沒有聲明持久化,則不允許綁定
我們在1處改了,但是在2處沒有修改。結果有問題。
隊列2不存在,所以沒有將消息放進去
而exchange這里,沒有寫將消息推送到聲明的python-test里面,所以里面也沒有消息
這次是聲明的exchange,并且將消息推送到python-test里面
還是沒有看到有東西呀
我們這里發(fā)布個消息,可以看到,是需要路由的
加上路由,再次執(zhí)行程序
由于隊列2 不存在,好像還是不行
我在這里給它bind一個路由
感覺還是沒有弄明白,先放棄了
原來是如下方式呀。
首先,在python-test2里面,
給exchange綁定隊列1和2
1和2目前的消息數(shù)量
我往路由1里面push一個消息
push成功
然后再看隊列1里面,可以看到多了一條剛剛push的消息
接下來用程序實現(xiàn),聲明exchange,然后發(fā)布方法不變,發(fā)布到exchage中,因為已經(jīng)綁定了兩個路由了,這里指定路由key,根據(jù)路由key,可以將消息push到對應的隊列中去
我們可以看到,之前是頁面點擊push了一條,上面程序push了十條到exchange,現(xiàn)在這個隊列就有11條數(shù)據(jù)??墒沁@個exchange和隊列的綁定,是我自己在頁面上綁定的,這個應該不合理。以后有時間看下,怎么用程序綁定。
我們可以看到,應該是程序中缺少使用這個綁定方法吧
雖然exchange和queue都聲明了持久化,但如果消息只存在內存里,rabbitmq重啟后,內存里的東西還是會丟失,所以必須聲明消息也是持久化,從內存轉存到到硬盤
# 向隊列插入數(shù)值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化channel.basic_publish(exchange = "",routing_key = "python-test",body = message, properties=pika.BasicProperties(delivery_mode = 2))
我們這里先重啟一下rabbitmq,把之前的寫入隊列的消息清空
不過我們看到,這里已經(jīng)有持久化存儲的消息了,之前好像是頁面點擊推送的消息
總共一條,持久化1條。持久化的,即使重啟服務,消息也不會丟失
我們再去推送一條
可以看到剛剛推送的這條也是持久化存儲的
我們在發(fā)布的方法里面,添加屬性發(fā)布的模式是2,
剛才是2條持久化的,現(xiàn)在新增10條數(shù)據(jù),且是持久化的消息
如果改成1
可以看到,剛剛新增了10條消息,但是這10條消息沒有持久化。
消費者(consume)調用callback函數(shù)時,會存在處理消息失敗的風險,如果處理失敗,則消息會丟失,但是也可以選擇消費者處理失敗時,將消息回退給rabbitmq,重新再被消費者消費,這個時候需要設置確認標識。
channel.basic_consume(callback,queue = "python-test",# no_ack 設置成 False,在調用callback函數(shù)時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉 no_ack = False)
目前隊列2中有10條沒有持久化的,有12條持久化的消息
執(zhí)行消費程序
再看隊列2中,可以看到之前12條持久化和10條沒有持久化的消息數(shù)據(jù)都已經(jīng)被消費了。我們可以看到消費者這里,多了一個消費者。消費者有個tag,還有ack的確認。在詳情那里,也可以看到 消費者數(shù)量是1
我們push了一條消息,但是沒有發(fā)現(xiàn)推送到隊列中,難道是因為隊列綁定exchange的原因?
push的時候,有個持久化的選擇,發(fā)現(xiàn)還是沒有push進去
在exchange這里push了,
發(fā)現(xiàn)隊列1有數(shù)據(jù),2沒有消息
往路由key這里發(fā)送多次消息
還是沒有,難道上面都是失敗的發(fā)送嘛
我們再看消費者程序,我們看到運行程序之后,這個程序一直沒有退出,處于監(jiān)聽狀態(tài),正如我們在隊列中看到的那樣,有個消費者是up狀態(tài),也就是這個消費者一直在監(jiān)聽我們上面的那個隊列,程序并沒有退出。因此,我們上面在頁面push的sss之類的消息,都被這個消費者消費掉了,因此沒有看到新增的消息。
我們將上面的消費者程序停掉之后,就可以看到隊列下面已經(jīng)顯示沒有消費者了,然后再推送消息的時候,頁面選擇持久化,
我們可以看到,推送的消息,是持久化的。由上面的學習,了解到,消息是否持久化,好像是取決于生產(chǎn)者的設置,而不是說消息沒有持久化,我給它用命令持久化一下,至于是否可以用命令持久化一下,本來不需要持久化的消息,暫且不考慮。
在上一章中,我們創(chuàng)建了一個工作隊列,工作隊列模式的設想是每一條消息只會被轉發(fā)給一個消費者。本章將會講解完全不一樣的場景: 我們會把一個消息轉發(fā)給多個消費者,這種模式稱之為發(fā)布-訂閱模式。RabbitMq消息模式的核心思想是:一個生產(chǎn)者并不會直接往一個隊列中發(fā)送消息,事實上,生產(chǎn)者根本不知道它發(fā)送的消息將被轉發(fā)到哪些隊列。實際上,生產(chǎn)者只能把消息發(fā)送給一個exchange,exchange只做一件簡單的事情:一方面它們接收從生產(chǎn)者發(fā)送過來的消息,另一方面,它們把接收到的消息推送給隊列。一個exchage必須清楚地知道如何處理一條消息. rabbitmq的發(fā)布與訂閱要借助交換機(Exchange)的原理實現(xiàn):
Exchange 一共有三種工作模式:fanout, direct, topicd
這種模式下,傳遞到exchange的消息將會==轉發(fā)到所有于其綁定的queue上
不需要指定routing_key,即使指定了也是無效的。需要提前將exchange和queue綁定,一個exchange可以綁定多個queue,一個queue可以綁定多個exchange。需要先啟動訂閱者,此模式下的隊列是consume隨機生成的,發(fā)布者僅僅發(fā)布消息到exchange,由exchange轉消息至queue。exchange交換器首先我們創(chuàng)建一個fanout類型的交換器,我們稱之為:python-test:
channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")
廣播模式交換器很簡單,從字面意思也能理解,它其實就是把接收到的消息推送給所有它知道的隊列。? 想查看當前系統(tǒng)中有多少個exchange,可以從控制臺查看
可以看到有很多以amq.*開頭的交換器,以及(AMQP default)默認交換器,這些是默認創(chuàng)建的交換器。? 在前面,我們并不知道交換器的存在,但是依然可以將消息發(fā)送到隊列中,那其實并不是因為我們可以不使用交換器,實際上是我們使用了默認的交換器(我們通過指定交換器為字字符串:""),回顧一下我們之前是如何發(fā)送消息的:
channel.basic_publish(exchange="",routing_key="1",body=message)
第一個參數(shù)是交換器的名字,空字符串表示它是一個默認或無命名的交換器,消息將會由指定的路由鍵(第二個參數(shù),routingKey,后面會講)轉發(fā)到隊列。? 你可能會有疑問:既然exchange可以指定為空字符串(""),那么可否指定為null?? ? 答案是:不能!
通過跟蹤發(fā)布消息的代碼,在AMQImpl類中的Publish()方面中,可以看到,不光是exchange不能為null,同時routingKey路由鍵也不能為null,否則會拋出異常:
在前面的例子中,我們使用的隊列都是有具體的隊列名,創(chuàng)建命名隊列是很必要的,因為我們需要將消費者指向同一名字的隊列。因此,要想在生產(chǎn)者和消費者中間共享隊列就必須要使用命名隊列。
import pikaimport jsoncredentials = pika.PlainCredentials("admin", "admin") # mq用戶名和密碼# 虛擬隊列需要指定參數(shù) virtual_host,如果是默認的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel=connection.channel()# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")for i in range(10): message=json.dumps({"OrderId":"1000%s"%i})# 向隊列插入數(shù)值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置 channel.basic_publish(exchange = "python-test",routing_key = "",body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message)connection.close()import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 創(chuàng)建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除result = channel.queue_declare("4")# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")# 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去channel.queue_bind(exchange = "python-test",queue = "4")# 定義一個回調函數(shù)來處理消息隊列中的消息,這里是打印出來def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode())channel.basic_consume(result.method.queue,callback,# 設置成 False,在調用callback函數(shù)時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉 auto_ack = False)channel.start_consuming()import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 創(chuàng)建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除result = channel.queue_declare("2",durable=True)# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")# 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去channel.queue_bind(exchange = "python-test",queue = "2")# 定義一個回調函數(shù)來處理消息隊列中的消息,這里是打印出來def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode())channel.basic_consume(result.method.queue,callback,# 設置成 False,在調用callback函數(shù)時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉 auto_ack = False)channel.start_consuming()當前的隊列如下
發(fā)布消息,exchange類型不對
下面這就是直連類型
進去之后把找個已經(jīng)存在的exchange刪除了,這個暫時沒用
發(fā)布,這里也沒有指的路由key
可以看到新建的exchange類型是fanout
因為沒有綁定隊列,所以程序推送的消息,好像是丟失了
開啟訂閱者1,聲明隊列4并綁定到前面創(chuàng)建的python-test這個exchange。
查看,隊列4已經(jīng)創(chuàng)建
有個消費者正連接著4
并且訂閱者1聲明的隊列,也跟指定的exchange已經(jīng)綁定了,路由key,默認就是用的隊列名稱
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg "durable" for queue "2" in vhost "/": received "false" but current is "true"")
開啟訂閱者2,但是報錯了,因為隊列2已經(jīng)存在了,并且是Ture,是持久化的,而這里信道點隊列聲明2,是沒有指定那個參數(shù),那就是默認是Flase,非持久化的隊列,重啟下服務這個隊列就不存在了。因此保持了。我們先將這個已經(jīng)存在的隊列刪除,然后重新聲明一下吧,或者是直接給它加個持久化的參數(shù)也行
加上之后,就能正常開啟這個訂閱者2了
我們創(chuàng)建的4,是非持久化的隊列,這里這個d的標記,可能就是durable參數(shù),是否持久化隊列的意思吧
我們重新執(zhí)行一次發(fā)布者程序,發(fā)布者并沒有指定路由key,只是指定了exchange,而訂閱者1和2程序里面,都是有綁定這個exchange的
我們可以看到,訂閱者1獲取到了發(fā)布到這個exchage的消息
訂閱者2也獲取到了發(fā)布到這個exchage的消息
再來看下這個exchange的情況
它對應的兩個隊列
隊列2有個消費者
隊列4也有個消費者,這兩個消費者各自對應一個隊列,每個消費者請求過來是的端口不同,消費者tag不同。兩個隊列中的消息,都被訂閱者程序獲取并打印在pycharm上進行消費了,因此,隊列中也就沒有數(shù)據(jù)了。
難道,一個隊列,就是一個訂閱者嗎?當發(fā)布者發(fā)布消息的時候,難道是基礎發(fā)布方法里面,指定exchange,不指定路由key,這樣就會將生產(chǎn)者生產(chǎn)的消息,發(fā)送給所有綁定這個exchange的隊列嗎,而訂閱者和隊列一一對應,然后每個訂閱者就從自己對應的隊列中將這個消息消費掉嗎?
把兩個訂閱者,都停止掉,查看目前這兩個隊列,都是沒有消息的。
我執(zhí)行發(fā)布者程序,發(fā)布消息,指定exchange,不指定路由key。
我們可以看到,這種情況下,的確是將消息發(fā)布給所有綁定這個exchange的隊列了,如下,2和4隊列都綁定了,所以都接收到了十條消息。
我們發(fā)布消息的參數(shù),指定消息是持久化的,因為隊列2是個持久化的隊列,因此,進入隊列2的消息也是持久化的
由于聲明隊列4,不是持久化的隊列,因此,即使發(fā)布消息時,指定消息是持久化的,但是實際上這個消息也是沒有在這個非持久化的隊列中進行持久化,也只是臨時的罷了。
我開啟訂閱者1
訂閱者1對應著隊列4,隊列4的消息已經(jīng)被消費了,已經(jīng)在上圖中打印出來了。
開啟訂閱者2
訂閱者2對應的隊列是2,也將消息消費掉了,并在訂閱者2程序中打印了出來
如果,隊列或者消息是臨時的,消費者還沒消費的消息,因為重啟服務,那么就會丟失消息,消費者應該就消費不到那個丟失的消息了。
這種工作模式的原理是消息發(fā)送至exchange,exchange根據(jù)**路由鍵(routing_key)**轉發(fā)到相對應的queue上。
import pikaimport jsoncredentials = pika.PlainCredentials("admin", "admin") # mq用戶名和密碼# 虛擬隊列需要指定參數(shù) virtual_host,如果是默認的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel=connection.channel()# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="direct")for i in range(10): message=json.dumps({"OrderId":"1000%s"%i})# 指定 routing_key。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = "python-test",routing_key = "OrderId",body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message)connection.close()import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 創(chuàng)建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除result = channel.queue_declare("",exclusive=True)# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="direct")# 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去channel.queue_bind(exchange = "python-test",queue = result.method.queue,routing_key="OrderId")# 定義一個回調函數(shù)來處理消息隊列中的消息,這里是打印出來def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode())#channel.basic_qos(prefetch_count=1)# 告訴rabbitmq,用callback來接受消息channel.basic_consume(result.method.queue,callback,# 設置成 False,在調用callback函數(shù)時,未收到確認標識,消息會重回隊列。True,無論調用callback成功與否,消息都被消費掉 auto_ack = False)channel.start_consuming()將之前測試用的exchanges刪除,隊列也刪除
使用direct類型的exchange,發(fā)布消息
沒有隊列生成
開啟消費者程序,exchange聲明的類型是direct,隊列綁定exchange,指定路由key,這個路由key,并沒有這個名字的隊列
開啟上面的消費者程序之后,就生成了一個隊列。這個生成的隊列,進入可以看到是有消費者在監(jiān)聽這個隊列的。這個隊列,以上面命名的路由key,來綁定了前面定義的exchange。
我們進入這個exchange查看下,路由key,定向到某個隊列
我們看下發(fā)布消息的程序,就是exchange聲明里面,定義了direct方式,而基礎發(fā)布方法里面,就指定發(fā)布到上面定義的exchange,然后指定路由key為之前執(zhí)行消費者程序時,隨機生成名字的隊列,綁定exchange時使用的路由key。這樣,我們發(fā)布消息的時候,發(fā)布給exchange,就會根據(jù)路由key,然后找到對應的隊列,將消息推送到這個隊列中。
由于我們的訂閱者,一直在監(jiān)聽,當上面發(fā)布消息到隊列中后,訂閱者就從exchange下根據(jù)路由key,找到對應的隊列,然后將隊列中的消息消費,打印到pycharm上,
這種模式和第二種差不多,exchange也是通過路由鍵routing_key來轉發(fā)消息到指定的queue。不同之處在于:**routing_key使用正則表達式支持模糊匹配,**但匹配規(guī)則又與常規(guī)正則表達式不同,比如"#"是匹配全部,“*”是匹配一個詞。舉例:routing_key =“#orderid#”,意思是將消息轉發(fā)至所有 routing_key 包含 “orderid” 字符的隊列中。代碼和模式二 類似,
我們用上面的代碼改 一下,再復制處兩個訂閱者,只需要修改下路由key為帶2的 帶3的數(shù)字就可以
我們再改一下
我們看頁面,可以看到又多了兩個隊列了
可以看到這個exchange對應三個隊列,路由key都是帶有OrderId,
我們將路由key,改為匹配的方式,然后發(fā)布消息
演示失敗
參考鏈接:https://blog.csdn.net/weixin_45144837/article/details/104335115
標簽: