中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

站酷設(shè)計(jì)官方網(wǎng)站磁力下載

站酷設(shè)計(jì)官方網(wǎng)站,磁力下載,有自建服務(wù)器做網(wǎng)站的嗎,個(gè)人養(yǎng)老金查詢文章目錄 1.場(chǎng)景描述 1.1 場(chǎng)景11.2 場(chǎng)景2 2.原理3.實(shí)戰(zhàn)開發(fā) 3.1 建表3.2 集成mybatis-plus3.3 集成RabbitMq 3.3.1 安裝mq3.3.2 springBoot集成mq 3.4 具體實(shí)現(xiàn) 3.4.1 mq配置類3.4.2 生產(chǎn)者3.4.3 消費(fèi)者 1.場(chǎng)景描述 消息中間件是分布式系統(tǒng)常用的組件,無論是異…

文章目錄

  • 1.場(chǎng)景描述
    • 1.1 場(chǎng)景1
    • 1.2 場(chǎng)景2
  • 2.原理
  • 3.實(shí)戰(zhàn)開發(fā)
    • 3.1 建表
    • 3.2 集成mybatis-plus
    • 3.3 集成RabbitMq
      • 3.3.1 安裝mq
      • 3.3.2 springBoot集成mq
    • 3.4 具體實(shí)現(xiàn)
      • 3.4.1 mq配置類
      • 3.4.2 生產(chǎn)者
      • 3.4.3 消費(fèi)者

1.場(chǎng)景描述

消息中間件是分布式系統(tǒng)常用的組件,無論是異步化、解耦、削峰等都有廣泛的應(yīng)用價(jià)值。我們通常會(huì)認(rèn)為,消息中間件是一個(gè)可靠的組件——這里所謂的可靠是指,只要我把消息成功投遞到了消息中間件,消息就不會(huì)丟失,即消息肯定會(huì)至少保證消息能被消費(fèi)者成功消費(fèi)一次,這是消息中間件最基本的特性之一,也就是我們常說的“AT LEAST ONCE”,即消息至少會(huì)被“成功消費(fèi)一遍”。

1.1 場(chǎng)景1

什么意思呢?舉個(gè)例子:一個(gè)消息M發(fā)送到了消息中間件,消息投遞到了消費(fèi)程序A,A接受到了消息,然后進(jìn)行消費(fèi),但在消費(fèi)到一半的時(shí)候程序重啟了,這時(shí)候這個(gè)消息并沒有標(biāo)記為消費(fèi)成功,這個(gè)消息還會(huì)繼續(xù)投遞給這個(gè)消費(fèi)者,直到其消費(fèi)成功了,消息中間件才會(huì)停止投遞。
這種情景就會(huì)出現(xiàn)消息可能被多次地投遞。

1.2 場(chǎng)景2

還有一種場(chǎng)景是程序A接受到這個(gè)消息M并完成消費(fèi)邏輯之后,正想通知消息中間件“我已經(jīng)消費(fèi)成功了”的時(shí)候,程序就重啟了,那么對(duì)于消息中間件來說,這個(gè)消息并沒有成功消費(fèi)過,所以他還會(huì)繼續(xù)投遞。這時(shí)候?qū)τ趹?yīng)用程序A來說,看起來就是這個(gè)消息明明消費(fèi)成功了,但是消息中間件還在重復(fù)投遞。

以上兩個(gè)場(chǎng)景對(duì)于消息隊(duì)列來說就是同一個(gè)messageId的消息重復(fù)投遞下來了。

我們利用消息id來判斷消息是否已經(jīng)消費(fèi)過,如果該信息被消費(fèi)過,那么消息表中已經(jīng) 會(huì)有一條數(shù)據(jù),由于消費(fèi)時(shí)會(huì)先執(zhí)行插入操作,此時(shí)會(huì)因?yàn)橹麈I沖突無法重復(fù)插入,我們就利用這個(gè)原理來進(jìn)行冪等的控制,消息內(nèi)容可以用json格式來進(jìn)行傳輸?shù)摹?/p>

3.實(shí)戰(zhàn)開發(fā)

3.1 建表

DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (`message_id` varchar(50) NOT NULL COMMENT '消息ID',`message_content` varchar(2000) DEFAULT NULL COMMENT '消息內(nèi)容',`status` int DEFAULT '0' COMMENT '消費(fèi)狀態(tài)(0-未消費(fèi)成功;1-消費(fèi)成功)',`retry_times` int DEFAULT '0' COMMENT '重試次數(shù)',`type` int DEFAULT '0' COMMENT '消費(fèi)類型',PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2 集成mybatis-plus

《springBoot集成mybatisPlus》

3.3 集成RabbitMq

3.3.1 安裝mq

推薦使用docker安裝rabbitmq,還未安裝的可以參考以下信息:

  • docker安裝

3.3.2 springBoot集成mq

  • 1.添加依賴
 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3.4 生產(chǎn)者具體實(shí)現(xiàn)

3.4.1 mq配置類

  • DirectRabbitConfig
    具體如何開啟可以參考《rabbitMq實(shí)現(xiàn)死信隊(duì)列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitmqConfig {//正常交換機(jī)的名字public final static String  EXCHANGE\_NAME = "exchange\_name";//正常隊(duì)列的名字public final static String QUEUE\_NAME="queue\_name";//死信交換機(jī)的名字public final static String  EXCHANGE\_DEAD = "exchange\_dead";//死信隊(duì)列的名字public final static String QUEUE\_DEAD="queue\_dead";//死信路由keypublic final static String DEAD\_KEY="dead.key";//創(chuàng)建正常交換機(jī)@Bean(EXCHANGE\_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)//持久化 mq重啟后數(shù)據(jù)還在.durable(true).build();}//創(chuàng)建正常隊(duì)列@Bean(QUEUE\_NAME)public Queue queue(){//正常隊(duì)列和死信進(jìn)行綁定 轉(zhuǎn)發(fā)到 死信隊(duì)列,配置參數(shù)Map<String,Object>map=getMap();return new Queue(QUEUE\_NAME,true,false,false,map);}//正常隊(duì)列綁定正常交換機(jī) 設(shè)置規(guī)則 執(zhí)行綁定 定義路由規(guī)則 requestmaping映射@Beanpublic Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,@Qualifier(EXCHANGE\_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange)//路由規(guī)則.with("app.#").noargs();}//創(chuàng)建死信隊(duì)列@Bean(QUEUE\_DEAD)public Queue queueDead(){return new Queue(QUEUE\_DEAD,true,false,false);}//創(chuàng)建死信交換機(jī)@Bean(EXCHANGE\_DEAD)public Exchange exchangeDead(){return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD).durable(true) //持久化 mq重啟后數(shù)據(jù)還在.build();}//綁定死信隊(duì)列和死信交換機(jī)@Beanpublic Binding deadBinding(){return BindingBuilder.bind(queueDead()).to(exchangeDead())//路由規(guī)則 正常路由key.with(DEAD\_KEY).noargs();}/\*\*獲取死信的配置信息\*\*\*/public Map<String,Object>getMap(){//3種方式 任選其一,選擇其他方式之前,先把交換機(jī)和隊(duì)列刪除了,在啟動(dòng)項(xiàng)目,否則報(bào)錯(cuò)。//方式一Map<String,Object> map=new HashMap<>(16);//死信交換器名稱,過期或被刪除(因隊(duì)列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);//死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)置,則使用消息的原來的路由鍵值map.put("x-dead-letter-routing-key", DEAD\_KEY);//方式二//消息的過期時(shí)間,單位:毫秒;達(dá)到時(shí)間 放入死信隊(duì)列// map.put("x-message-ttl",5000);//方式三//隊(duì)列最大長度,超過該最大值,則將從隊(duì)列頭部開始刪除消息;放入死信隊(duì)列一條數(shù)據(jù)// map.put("x-max-length",3);return map;}}
  • 延遲隊(duì)列配置
    具體如何開啟可以參考《rabbitMq實(shí)現(xiàn)死信隊(duì)列》

由于rabbitMq中不直接支持死信隊(duì)列,需要我們利用插件rabbitmq_delayed_messgae_exchage進(jìn)行開啟

/*** 定義延遲交換機(jī)*/
@Configuration
public class RabbitMQDelayedConfig {//隊(duì)列private static final String DELAYQUEUE = "delayedqueue";//交換機(jī)private static final String DELAYEXCHANGE = "delayedExchange";@Beanpublic Queue delayqueue(){return new Queue(DELAYQUEUE);}//自定義延遲交換機(jī)@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");/*** 1、交換機(jī)名稱* 2、交換機(jī)類型* 3、是否需要持久化* 4、是否需要自動(dòng)刪除* 5、其他參數(shù)*/return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);}//綁定隊(duì)列和延遲交換機(jī)@Beanpublic Binding delaybinding(){return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();}
}

3.4.2 生產(chǎn)者

  • 1.消費(fèi)隊(duì)列的生產(chǎn)者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class Sender_Direct {@Autowiredprivate AmqpTemplate rabbitTemplate;/*** 用于消費(fèi)訂單** @param orderId*/public void send2Direct(String orderId) {//創(chuàng)建消費(fèi)對(duì)象,并指定全局唯一ID(這里使用UUID,也可以根據(jù)業(yè)務(wù)規(guī)則生成,只要保證全局唯一即可)MessageProperties messageProperties = new MessageProperties();rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "內(nèi)容設(shè)置",  message -> {//設(shè)置消息的id為唯一messageProperties.setMessageId(UUID.randomUUID().toString());messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");message.getMessageProperties().setMessageId(orderId);return message;});}}

3.4.3 消費(fèi)者

1.開啟手動(dòng)ack配置

spring:application:name: shoprabbitmq:host: 192.168.1.102port: 5673virtual-host: /username: guestpassword: guestlistener:simple:# 表示消費(fèi)者消費(fèi)成功消息以后需要手工的進(jìn)行簽收(ack確認(rèn)),默認(rèn)為 autoacknowledge-mode: manual

消費(fèi)者要配置ack重試機(jī)制,具體參考前幾篇文章,使用的是mysql消息ID的唯一性,有時(shí)候可能生成一樣的訂單,具體的沒有進(jìn)行實(shí)驗(yàn),內(nèi)容是json生成的,可以執(zhí)行業(yè)務(wù)

import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class Receiver_Direct {private static final Integer delayTimes = 30;//延時(shí)消費(fèi)時(shí)間,單位:秒@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = {"smsQueue"})public void receiveD(Message message, Channel channel) throws IOException {try {// 獲取消息IdString messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody());//獲取消息//向數(shù)據(jù)庫插入數(shù)據(jù)MessageIdempotent messageIdempotent = new MessageIdempotent();messageIdempotent.setMessageId(messageId);messageIdempotent.setMessageContent(msg);messageIdempotent.setRetryTimes(0);System.out.println(messageIdempotent.toString());Boolean save = true;   //設(shè)置保存成功,消息投遞失敗是在確認(rèn)模式那里if (!save) {//說明屬于重重復(fù)請(qǐng)求//1、處理消息內(nèi)容的業(yè)務(wù),解析json數(shù)據(jù)//2、創(chuàng)建訂單,并保存Boolean flag = consumeOrder(new Shop());if (flag){//投入延遲隊(duì)列,如果30分鐘訂單還沒有消費(fèi),就刪除訂單rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{//設(shè)置發(fā)送消息的延長時(shí)間 單位:ms,表示30分鐘message1.getMessageProperties().setDelay(1000*60*30);return message1;});//更新消息狀態(tài),消費(fèi)成功,channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}else {//延遲投入死信,進(jìn)行重試channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}} else {//1、處理消息內(nèi)容的業(yè)務(wù),解析json數(shù)據(jù)//2、創(chuàng)建訂單,并保存//投入死信隊(duì)列channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}catch (Exception e){System.out.println("錯(cuò)誤信息");}}private boolean consumeOrder(Shop shop) {return true;}@RabbitListener(queues = {" delay.queue.demo.delay.queue"})public void dead(String payload, Message message, Channel channel) throws IOException {System.out.println("死信隊(duì)列:"+payload);//刪除消息 將數(shù)據(jù)庫狀態(tài)更新為失敗,更新郵件或者消息通知,有時(shí)候可以人工消費(fèi)long deliveryTag=message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag,true);}@RabbitListener(queues = "delayedqueue")public void receivemsg(Message messages){//查詢有沒有被消費(fèi),也就是更新成功,有時(shí)候需要樂觀鎖}
}

至此mq的消息重復(fù)以及冪等的信息處理就很完美的解決了,當(dāng)然本文以數(shù)據(jù)庫為例進(jìn)行實(shí)現(xiàn),感興趣的可以嘗試使用redis來進(jìn)行實(shí)現(xiàn)

http://www.risenshineclean.com/news/41502.html

相關(guān)文章:

  • wordpress 圖片 cdn臺(tái)州seo優(yōu)化公司
  • WordPress建站 用插件怎么推廣軟件
  • 深圳建設(shè)交易中心網(wǎng)站首頁福州百度關(guān)鍵詞優(yōu)化
  • 網(wǎng)站建設(shè)我要自學(xué)網(wǎng)優(yōu)化網(wǎng)站關(guān)鍵詞的技巧
  • 如何選定目標(biāo)關(guān)鍵詞及網(wǎng)站欄目名稱的確定企業(yè)建網(wǎng)站一般要多少錢
  • 太倉網(wǎng)站設(shè)計(jì)早晨設(shè)計(jì)泰州seo
  • 正版?zhèn)髌媸钟喂俜骄W(wǎng)站市場(chǎng)調(diào)研報(bào)告范文3000字
  • 南郊網(wǎng)站建設(shè)報(bào)價(jià)ks刷粉網(wǎng)站推廣馬上刷
  • 可以做微課ppt模板 網(wǎng)站有哪些網(wǎng)站制作開發(fā)
  • 為政府做網(wǎng)站的公司百度怎么發(fā)帖做推廣
  • wordpress登陸可見插件外貿(mào)網(wǎng)站推廣優(yōu)化
  • 北京注冊(cè)公司地址費(fèi)用外貿(mào)建站seo
  • wordpress建站需要寫代碼嗎seo是哪里
  • 大網(wǎng)絡(luò)公司做網(wǎng)站武漢seo服務(wù)外包
  • 廣州做網(wǎng)站的企業(yè)品牌傳播策劃方案
  • 成都網(wǎng)站建設(shè)排名百度瀏覽器網(wǎng)頁版入口
  • 網(wǎng)頁設(shè)計(jì)賺錢網(wǎng)站百度網(wǎng)站推廣怎么做
  • 小清新博客網(wǎng)站就業(yè)培訓(xùn)機(jī)構(gòu)有哪些
  • 很大氣的網(wǎng)站 營銷網(wǎng)絡(luò)營銷的認(rèn)知
  • 網(wǎng)站項(xiàng)目宣傳片編程培訓(xùn)
  • 動(dòng)態(tài)網(wǎng)站設(shè)計(jì)論文3000字登錄注冊(cè)入口
  • 沛縣做網(wǎng)站xlec中國關(guān)鍵詞網(wǎng)站
  • 做的比較好的政府網(wǎng)站臺(tái)州seo公司
  • 網(wǎng)站推廣外包百度推廣怎么才能效果好
  • 南寧哪里有做網(wǎng)站的公司關(guān)鍵詞優(yōu)化的方法有哪些
  • 什么網(wǎng)站可以快速做3d效果圖鄭州專業(yè)的網(wǎng)站公司
  • 哪里有做網(wǎng)站培訓(xùn)的百度熱議排名軟件
  • 動(dòng)漫設(shè)計(jì)與制作課程網(wǎng)站優(yōu)化設(shè)計(jì)公司
  • 重慶李家沱網(wǎng)站建設(shè)提高工作效率的方法不正確的是
  • 網(wǎng)站優(yōu)化包括整站優(yōu)化嗎惠州seo網(wǎng)站管理