中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

高德地圖有外資背景嗎優(yōu)化大師官方

高德地圖有外資背景嗎,優(yōu)化大師官方,網(wǎng)站開發(fā)要懂英文嗎,vps服務(wù)器10元一月背景: 本文會介紹多種案例,教大家如何使用rocketmq。 一般rocketmq使用在微服務(wù)項目中,屬于分模塊使用。這里使用springboot單體項目來模擬使用。 本文以windows系統(tǒng)來做案例。 下載rocketmq和啟動: RocketMQ 在 windows 上運行…

背景:

本文會介紹多種案例,教大家如何使用rocketmq。

一般rocketmq使用在微服務(wù)項目中,屬于分模塊使用。這里使用springboot單體項目來模擬使用。

本文以windows系統(tǒng)來做案例。

下載rocketmq和啟動:

RocketMQ 在 windows 上運行 - 知乎 (zhihu.com)icon-default.png?t=N6B9https://zhuanlan.zhihu.com/p/644944370?

一、創(chuàng)建springboot項目

一直next進行下去就可以了。

二、pom文件依賴

   <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><!-- 還有其它需要的jar包自由引入(注:fastjson不要使用低于1.2.60版本,會有安全漏洞) --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

?三、代碼實現(xiàn)

1.1目錄結(jié)構(gòu)

1.2生產(chǎn)者消費者解釋

  • 生產(chǎn)者:?

?什么是生產(chǎn)者?就比喻個簡單的例子。比如我們要新增用戶,那么這個新增保存動作可以認為是生產(chǎn)者,他產(chǎn)生了數(shù)據(jù),要將數(shù)據(jù)保存進數(shù)據(jù)庫。

  • 消費者:

什么是消費者??就比喻個簡單的例子。用戶在新增的時候他會調(diào)用接口,用于保存到數(shù)據(jù)庫,那么處理這個數(shù)據(jù)的方法你可以理解為消費者。不過在mq中,生產(chǎn)者是將消息發(fā)送到mq服務(wù)隊列中,會根據(jù)主題Topic的不同,發(fā)往不同的頻道。而消費者只需要監(jiān)聽這個Topic主題即可。只要這個topic有消息來了,那么消費者就會進行消費。后面代碼里有詳細的注釋告知大家如何使用生產(chǎn)者和消費者。

1.3application.yml配置文件

# Tomcat
server:tomcat:uri-encoding: UTF-8max-threads: 1000min-spare-threads: 30servlet:context-path: /port: 8090rocketmq:name-server: 127.0.0.1:9876 # 訪問地址producer:group: Pro_Group # 必須指定groupsend-message-timeout: 3000 # 消息發(fā)送超時時長,默認3sretry-times-when-send-failed: 3 # 同步發(fā)送消息失敗重試次數(shù),默認2retry-times-when-send-async-failed: 3 # 異步發(fā)送消息失敗重試次數(shù),默認2

1.4生產(chǎn)者服務(wù)

import com.alibaba.fastjson.JSON;
import com.example.rocketmqdemo.model.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.util.List;@Slf4j
@Component
public class MQProducerService {@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;// 建議正常規(guī)模項目統(tǒng)一用一個TOPICprivate static final String topic = "RLT_TEST_TOPIC";// 直接注入使用,用于發(fā)送消息到broker服務(wù)器@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** Tag:用于區(qū)分過濾同一主題下的不同業(yè)務(wù)類型的消息,非常實用* 普通發(fā)送(這里的參數(shù)對象User可以隨意定義,可以發(fā)送個對象,也可以是字符串等)*/public void send(User user) {rocketMQTemplate.convertAndSend(topic + ":tag1", user);
//        rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等價于上面一行}/*** 發(fā)送同步消息(阻塞當(dāng)前線程,等待broker響應(yīng)發(fā)送結(jié)果,這樣不太容易丟失消息)* (msgBody也可以是對象,sendResult為返回的發(fā)送結(jié)果)*/public SendResult sendMsg(String msgBody) {SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));return sendResult;}/*** 發(fā)送異步消息(通過線程池執(zhí)行發(fā)送到broker的消息任務(wù),執(zhí)行完后回調(diào):在SendCallback中可處理相關(guān)成功失敗時的邏輯)* (適合對響應(yīng)時間敏感的業(yè)務(wù)場景)*/public void sendAsyncMsg(String msgBody) {rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 處理消息發(fā)送成功邏輯log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {// 處理消息發(fā)送異常邏輯log.info("【sendMsg】sendResult={}", "發(fā)送異常" + throwable.getMessage());}});}/*** 發(fā)送延時消息(上面的發(fā)送同步消息,delayLevel的值就為0,因為不延時)* 在start版本中 延時消息一共分為18個等級分別為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendDelayMsg(String msgBody, int delayLevel) {rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);}/*** 發(fā)送單向消息(只負責(zé)發(fā)送消息,不等待應(yīng)答,不關(guān)心發(fā)送結(jié)果,如日志)*/public void sendOneWayMsg(String msgBody) {rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());}/*** 發(fā)送帶tag的消息,直接在topic后面加上":tag"*/public SendResult sendTagMsg(String msgBody) {return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());}/**** 服務(wù)生產(chǎn)者,順序消息* 把消息確保投遞到同一條queue* 保證了消息的順序性*/public void sendFIFOMsg(List<User> users) {//順序消息//選擇器規(guī)則構(gòu)建rocketMQTemplate.setMessageQueueSelector((list, message, o) -> {int id = Integer.valueOf((String) o);int hash = (id % list.size());return list.get(hash);});if (!CollectionUtils.isEmpty(users)) {for (User user : users) {MessageBuilder.withPayload(users.toString()).build();rocketMQTemplate.sendOneWayOrderly(topic+":sendFIFOMsg", user, String.valueOf(user.getId()));}}}
}

1.5消費者服務(wù)

import com.alibaba.fastjson.JSON;
import com.example.rocketmqdemo.model.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;@Slf4j
@Component
public class MQConsumerService {// Tag:用于區(qū)分過濾同一主題下的不同業(yè)務(wù)類型的消息,非常實用// topic需要和生產(chǎn)者的topic一致,consumerGroup屬性是必須指定的,內(nèi)容可以隨意// selectorExpression的意思指的就是tag,默認為“*”,不設(shè)置的話會監(jiān)聽所有消息@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")public class ConsumerSend implements RocketMQListener<User> {// 監(jiān)聽到消息就會執(zhí)行此方法@Overridepublic void onMessage(User user) {log.info("tag1監(jiān)聽到消息:user={}", JSON.toJSONString(user));}}// 注意:這個ConsumerSend2和上面ConsumerSend在沒有添加tag做區(qū)分時,不能共存,// 不然生產(chǎn)者發(fā)送一條消息,這兩個都會去消費,如果類型不同會有一個報錯,所以實際運用中最好加上tag,寫這只是讓你看知道就行@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two",selectorExpression = "xxx")public class ConsumerSend2 implements RocketMQListener<String> {@Overridepublic void onMessage(String str) {log.info("ConsumerSend2監(jiān)聽到消息:str={}", str);}}// MessageExt:是一個消息接收通配符,不管發(fā)送的是String還是對象,都可接收,當(dāng)然也可以像上面明確指定類型(我建議還是指定類型較方便)@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")public class Consumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("tag2監(jiān)聽到消息:msg={}", msg);}}/*** 消費者順序消費消息* 順序消費*/@Service@RocketMQMessageListener(consumerGroup = "Orderly-Consumer", topic = "RLT_TEST_TOPIC",selectorExpression = "sendFIFOMsg", consumeMode = ConsumeMode.ORDERLY)public class OrderlyConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println("線程"+Thread.currentThread()+"內(nèi)容為:"+ new String(message.getBody())+"隊列序號:"+message.getQueueId()+",消息msgId:"+message.getMsgId());}}
}

1.6User實體類

import lombok.Data;@Data
public class User {private String id;private String name;private Integer age;private String sex;private String desc;}

1.7開始調(diào)用生產(chǎn)者服務(wù)、消費者自動監(jiān)聽消費

import com.example.rocketmqdemo.model.User;
import com.example.rocketmqdemo.producer.MQProducerService;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;@RestController
@RequestMapping("/rocketmq")
public class MqController {@Autowiredprivate MQProducerService mqProducerService;@GetMapping("/send")public void send() {User user = new User();user.setAge(28);user.setName("曹震");user.setSex("男");mqProducerService.send(user);}@GetMapping("/sendTag")public ResponseEntity<SendResult> sendTag() {SendResult sendResult = mqProducerService.sendTagMsg("帶有tag的字符消息");return ResponseEntity.ok(sendResult);}@GetMapping("/sendMsg")public ResponseEntity<SendResult> sendMsg() {SendResult sendResult = mqProducerService.sendMsg("曹震測試");return ResponseEntity.ok(sendResult);}@GetMapping("/sendFIFOMsg")public void sendFIFOMsg() {List<User> users = new ArrayList<>();User user = new User();user.setId("1");user.setSex("男");user.setName("曹震");user.setAge(28);user.setDesc("創(chuàng)建訂單");users.add(user);User user1 = new User();user1.setId("2");user1.setSex("男");user1.setName("賈耀旗");user1.setAge(25);user1.setDesc("創(chuàng)建訂單");users.add(user1);User user2 = new User();user2.setId("1");user2.setSex("男");user2.setName("曹震");user2.setAge(28);user2.setDesc("訂單付款");users.add(user2);User user3 = new User();user3.setId("1");user3.setSex("男");user3.setName("曹震");user3.setAge(28);user3.setDesc("訂單完成");users.add(user3);User user4 = new User();user4.setId("1");user4.setSex("男");user4.setName("曹震");user4.setAge(28);user4.setDesc("訂單推送");users.add(user4);User user5 = new User();user5.setId("2");user5.setSex("男");user5.setName("賈耀旗");user5.setAge(25);user5.setDesc("訂單付款");users.add(user5);User user6 = new User();user6.setId("2");user6.setSex("男");user6.setName("賈耀旗");user6.setAge(25);user6.setDesc("訂單完成");users.add(user6);mqProducerService.sendFIFOMsg(users);}}

1.8我們來啟動服務(wù),看下效果

我們以

@GetMapping("/sendFIFOMsg")
public void sendFIFOMsg() {} 這個方法進行測試??梢钥闯鲞@里的代碼其實是內(nèi)容順序是亂的,我們先看調(diào)用成功后的結(jié)果:

?

線程Thread[ConsumeMessageThread_3,5,main]內(nèi)容為:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"創(chuàng)建訂單"}隊列序號:1,消息msgId:A9FE29E30E3800DAD5DC7F03A485001C
2023-08-25 15:55:45.162  INFO 3640 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A485001C cost: 1 ms
線程Thread[ConsumeMessageThread_4,5,main]內(nèi)容為:{"id":"2","name":"賈耀旗","age":25,"sex":"男","desc":"創(chuàng)建訂單"}隊列序號:2,消息msgId:A9FE29E30E3800DAD5DC7F03A486001E
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_4] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A486001E cost: 1 ms
線程Thread[ConsumeMessageThread_4,5,main]內(nèi)容為:{"id":"2","name":"賈耀旗","age":25,"sex":"男","desc":"訂單付款"}隊列序號:2,消息msgId:A9FE29E30E3800DAD5DC7F03A4860026
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_4] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860026 cost: 0 ms
線程Thread[ConsumeMessageThread_4,5,main]內(nèi)容為:{"id":"2","name":"賈耀旗","age":25,"sex":"男","desc":"訂單完成"}隊列序號:2,消息msgId:A9FE29E30E3800DAD5DC7F03A4870028
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_4] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4870028 cost: 0 ms
線程Thread[ConsumeMessageThread_5,5,main]內(nèi)容為:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"訂單付款"}隊列序號:1,消息msgId:A9FE29E30E3800DAD5DC7F03A4860020
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_5] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860020 cost: 0 ms
線程Thread[ConsumeMessageThread_5,5,main]內(nèi)容為:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"訂單完成"}隊列序號:1,消息msgId:A9FE29E30E3800DAD5DC7F03A4860022
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_5] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860022 cost: 0 ms
線程Thread[ConsumeMessageThread_5,5,main]內(nèi)容為:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"訂單推送"}隊列序號:1,消息msgId:A9FE29E30E3800DAD5DC7F03A4860024
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_5] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860024 cost: 0 ms

?我們可以看到已經(jīng)進行了消費操作,大家有沒有看到同一個id的用戶他們消費隊列信息是一樣的

思考:我們在創(chuàng)建數(shù)據(jù)的時候,明明數(shù)據(jù)的順序不是一致的,我們將消息發(fā)送到隊列中,這個時候應(yīng)該是按照FIFO的形式去消費才對,應(yīng)該是亂的順序消費才對。為什么這里會把同一個id的信息在一起消費呢?而且還是按照創(chuàng)建訂單順序去消費的?

對了,我們在使用mq的時候會出現(xiàn)兩筆訂單,處理訂單流程順序的問題,比如:訂單1還沒有處理完,訂單2也發(fā)消息給mq了,這時候應(yīng)該回去消費訂單2,那么訂單1怎么?這個過程中還可能造成臟數(shù)據(jù)問題。

那么我們就需要保證訂單的順序消費了,那么順序消費怎么處理呢?可以看上面代碼。我們看到生產(chǎn)者有將用戶的id進行hash計算,然后得到值,這個值相同的數(shù)據(jù)放在同一隊列中,這樣是不是就保證了消息的順序消費?

?

四 、思考

我們上面已經(jīng)保證了數(shù)據(jù)的順序消費,那么如何保證數(shù)據(jù)不丟失呢?如何保證數(shù)據(jù)重復(fù)消費問題?

大家可以思考下。后續(xù)我會繼續(xù)在本文章中進行補充和代碼實踐。

?

http://www.risenshineclean.com/news/48803.html

相關(guān)文章:

  • 煙臺網(wǎng)站建設(shè).comseo自然搜索優(yōu)化排名
  • 網(wǎng)站域名過期怎么做重慶網(wǎng)站快速排名提升
  • 大上海小程序開發(fā)搜索引擎優(yōu)化的主題
  • 萬維網(wǎng)的網(wǎng)站抖音優(yōu)化排名
  • 南寧美麗南方官方網(wǎng)站建設(shè)意見企業(yè)網(wǎng)站建設(shè)的基本流程
  • 網(wǎng)站開通后百度廣告收費
  • 固始做網(wǎng)站網(wǎng)絡(luò)銷售哪個平臺最好
  • 幼兒園50個主題網(wǎng)絡(luò)圖關(guān)鍵詞優(yōu)化的作用
  • 張家港安監(jiān)站網(wǎng)址應(yīng)用商店下載
  • 網(wǎng)站空間有什么用外貿(mào)網(wǎng)站營銷推廣
  • 外貿(mào)網(wǎng)站建設(shè)步驟網(wǎng)店營銷
  • 動漫制作專業(yè)可以專升本嗎上海有哪些優(yōu)化網(wǎng)站推廣公司
  • 瀘州網(wǎng)站開發(fā)公司襄陽百度開戶
  • 娛樂手機網(wǎng)站開發(fā)優(yōu)化大師破解版app
  • 個人做網(wǎng)站的流程中國網(wǎng)絡(luò)優(yōu)化公司排名
  • 機械網(wǎng)站推廣怎么做公眾號軟文素材
  • 怎么做企業(yè)招聘網(wǎng)站希愛力的作用與功效
  • 濟南做網(wǎng)站推廣哪家好網(wǎng)銷是做什么的
  • 優(yōu)秀排版設(shè)計網(wǎng)站聊城網(wǎng)站開發(fā)
  • 邢臺做移動網(wǎng)站費用國內(nèi)新聞最新
  • 做酸菜視頻網(wǎng)站今日頭條國際新聞
  • 什么是網(wǎng)絡(luò)營銷?網(wǎng)絡(luò)營銷的特點有哪些?昆明seo關(guān)鍵詞
  • 做公司網(wǎng)站需要準(zhǔn)備什么開魯網(wǎng)站seo不用下載
  • 北京市昌平建設(shè)工程招標(biāo)網(wǎng)站世界杯比分查詢
  • 做ppt哪個網(wǎng)站好nba今日數(shù)據(jù)
  • 建設(shè)一個視頻網(wǎng)站友鏈提交入口
  • 移動互聯(lián)網(wǎng)網(wǎng)站建設(shè)廣告宣傳費用一般多少
  • 鞋子 東莞網(wǎng)站建設(shè)營銷軟件代理推廣
  • 網(wǎng)站的制作優(yōu)化網(wǎng)站收費標(biāo)準(zhǔn)
  • 青浦網(wǎng)絡(luò)公司網(wǎng)站官方推廣平臺