wifi管理網(wǎng)站百度seo規(guī)則
目錄
- 核心基礎(chǔ)使用
- 1、入門案例
- 生產(chǎn)者
- 消費(fèi)者
- 2、消息發(fā)送方式
- 方式1:同步消息
- 方式2:異步消息
- 方式3:一次性消息
- 管控臺(tái)使用過程中可能出現(xiàn)的問題
- 3、消息消費(fèi)方式
- 集群模式(默認(rèn))
- 廣播模式
- 4、順序消息
- 分析圖:
- 代碼實(shí)現(xiàn):
- 生產(chǎn)者代碼:
- 消費(fèi)者代碼:
- Tag消息消費(fèi)過濾
- 5、延遲消息
- 消費(fèi)者代碼:
- 消費(fèi)者代碼:
- 消息過濾
- 6、Tag 標(biāo)簽過濾
- 生產(chǎn)者:
- 消費(fèi)者:
- 7、SQL92 過濾
- 生產(chǎn)者代碼:
- 消費(fèi)者代碼:
- 修改配置文件
核心基礎(chǔ)使用
1、入門案例
生產(chǎn)者和消費(fèi)者都需要用到同一個(gè)依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>
生產(chǎn)者
控制臺(tái)的消息
消費(fèi)者
RocketMQ有push(推模式)和pull(拉模式)兩種消費(fèi)消息的模式,推模式就是Broker主動(dòng)將消息推送給消費(fèi)者,拉模式就是消費(fèi)者主動(dòng)從Broker將消息拉回來。推模式本質(zhì)實(shí)際上是拉模式,是基于拉模式實(shí)現(xiàn)的
consumer啟動(dòng)之后,只要不關(guān)閉,一有消息就會(huì)被這個(gè)消費(fèi)者消費(fèi)
2、消息發(fā)送方式
方式1:同步消息
這種可靠性同步地發(fā)送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
消息中間件要保證消息不丟失,它里面有一個(gè)持久化機(jī)制的。
發(fā)來的消息默認(rèn)存在內(nèi)存中,但是如果消息中心宕機(jī)了,那么消息就全丟了。
所以這個(gè)消息中心里面有這樣一個(gè)存儲(chǔ)介質(zhì),消息中心最終會(huì)把消息存在【文件存儲(chǔ)】里面,每個(gè)Broker Server 有自己的文件存儲(chǔ)。
方式2:異步消息
異步消息通常用在對響應(yīng)時(shí)間敏感的業(yè)務(wù)場景,即發(fā)送端不能容忍長時(shí)間地等待Broker的響應(yīng)。
適合需要快速響應(yīng)的場景,不過可靠性比同步消息差一點(diǎn),因?yàn)槭钱惒桨l(fā)送,所以業(yè)務(wù)邏輯繼續(xù)往下走的時(shí)候,異步發(fā)送的消息可能會(huì)出現(xiàn)問題,這就是說它可靠性差點(diǎn)。
分析圖:
示意圖:
啟動(dòng)rocketmq之后發(fā)送異步消息成功。
方式3:一次性消息
性能更高,對丟失一兩條數(shù)據(jù)無所謂的,適合日志場景
代碼示意圖
生產(chǎn)者
消費(fèi)者
管控臺(tái)使用過程中可能出現(xiàn)的問題
存到消息的時(shí)間是機(jī)器時(shí)間,然后隔天再打開查詢,時(shí)間對不上
時(shí)間同步命令
用阿里的時(shí)間同步
3、消息消費(fèi)方式
集群模式(默認(rèn))
集群模式:消息是分散消費(fèi)的,分散到不同的消費(fèi)者去消費(fèi)的。
廣播模式
4、順序消息
分析圖:
消費(fèi)按照指定的順序進(jìn)行消費(fèi)
rocketMQ本身就是多線程的,默認(rèn)每個(gè)消費(fèi)者的線程數(shù)為5個(gè),每個(gè)消費(fèi)者可以有n個(gè)線程來進(jìn)行消費(fèi)。
屬于多線程消費(fèi)
每一個(gè)topic默認(rèn)有4個(gè)消息隊(duì)列 MessageQueue,如圖
順序消費(fèi)分析圖:
代碼實(shí)現(xiàn):
生產(chǎn)者代碼:
一些注解:
消費(fèi)者代碼:
Tag消息消費(fèi)過濾
“*”號(hào)表示所有消息都要消費(fèi)
想要看消息消費(fèi)前和消費(fèi)后的狀態(tài)的區(qū)別,下圖不確定是不是這么理解
注意點(diǎn):
加這個(gè)的話,每次都會(huì)從頭開始消費(fèi)-----待確定功能
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
5、延遲消息
現(xiàn)在RocketMq并不支持任意時(shí)間的延時(shí),需要設(shè)置幾個(gè)固定的延時(shí)等級(jí),
從1s到2h分別對應(yīng)著等級(jí)1到18
“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
消費(fèi)者代碼:
主要是這里的消息調(diào)用個(gè)延遲發(fā)送消息的方法而已
消費(fèi)者代碼:
這里跟其他消費(fèi)者代碼沒什么區(qū)別
消息過濾
6、Tag 標(biāo)簽過濾
生產(chǎn)者:
消費(fèi)者:
7、SQL92 過濾
RocketMQ只定義了一些基本語法來支持這個(gè)特性。你也可以很容易地?cái)U(kuò)展它。
數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
字符比較,比如:=,<>,IN;
IS NULL** 或者 IS NOT NULL;
邏輯符號(hào) AND,OR,NOT;
常量支持類型為:
數(shù)值,比如:**123,3.1415;
字符,比如:‘a(chǎn)bc’,必須用單引號(hào)包裹起來;
NULL,特殊的常量
布爾值,TRUE 或 FALSE
只有使用push模式的消費(fèi)者才能用使用SQL92標(biāo)準(zhǔn)的sql語句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
注意: 在使用SQL過濾的時(shí)候, 需要配置參數(shù)enablePropertyFilter=true
生產(chǎn)者代碼:
消費(fèi)者代碼:
修改配置文件
報(bào)錯(cuò)的原因是因?yàn)閘inux要修改下配置
enablePropertyFilter=true
:wq 保存退出
關(guān)閉和重新啟動(dòng)nameserver和broker
關(guān)閉nameserver:
sh mqshutdown namesrv
關(guān)閉broker
sh mqshutdown broker
1.啟動(dòng)NameServer
nohup sh mqnamesrv &
2.啟動(dòng)Broker
nohup sh mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
這里已經(jīng)變成true了
重新啟動(dòng)消費(fèi)者看會(huì)不會(huì)報(bào)錯(cuò)
過濾成功