好的網(wǎng)站具備什么條件外包公司值得去嗎
服務(wù)通信
分布式系統(tǒng)通信兩種方式:
- 直接遠(yuǎn)程調(diào)用(同步)
- 借助第三方間接通信(異步)
同步通訊的問題
Feign就屬于同步通訊。存在的如下問題
- 耦合度高,每次添加新的模塊就要修改原有模塊的代碼
- 性能下降,調(diào)用者需要等待服務(wù)者返回的結(jié)果,如果調(diào)用鏈過長,則響應(yīng)的時間越長
- 資源浪費,在等待的過程中,不會釋放CPU與內(nèi)存資源,在高并發(fā)的場景下占用浪費資源過大
- 級聯(lián)失敗,當(dāng)調(diào)用鏈中一個服務(wù)宕機(jī),那么調(diào)用者也會出現(xiàn)問題。
異步調(diào)用方案
異步調(diào)用常見的實現(xiàn)方式為事件驅(qū)動模式
事件驅(qū)動模式優(yōu)點:
- 服務(wù)解耦,添加模塊不需要更改其他服務(wù)的代碼
- 性能提升,在用戶請求的模塊可以直接返回結(jié)果,不需要等待其他服務(wù)執(zhí)行完畢后再返回結(jié)果
- 服務(wù)沒有強(qiáng)依賴關(guān)系,一個服務(wù)宕機(jī)不會影響到其他服務(wù)
- 流量削峰
缺點:
- 依賴了第三方組件,第三方組件需要保證可靠性、安全性、吞吐能力
- 架構(gòu)復(fù)雜,業(yè)務(wù)沒有明顯流程線,不好追蹤管理
- 一致性問題
MQ
MQ:Message Queue消息隊列,是消息在傳輸?shù)倪^程中保存消息的容器。多用于分布式系統(tǒng)之間進(jìn)行通信
Kafka適用于數(shù)據(jù)量大但對數(shù)據(jù)安全性不高的場景比如說日志的傳輸
RabbitMQ與RocketMQ適用于對數(shù)據(jù)安全要求較高的場景,比如說業(yè)務(wù)之間的傳輸信息
滿足什么條件才可以使用MQ?
- 生產(chǎn)者不需要從消費者處獲取任何信息
- 容許短暫不一致性
- 使用MQ的效果收益大于管理MQ成本
RabbitMQ的下載
在虛擬機(jī)上啟動dokcer服務(wù)后拉去rabbitmq鏡像
systemctl start docker
docker pull rabbitmq
RabbitMQ的啟動
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mql \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest
命令解釋:
-e RABBITMQ_DEFAULT_USER=admin :指定登錄賬號為admin
-e RABBITMQ_DEFAULT_PASS=admin :指定登錄密碼為admin
--name mq :容器名為mq
--hostname mq1 主機(jī)名為mq1(做集群時使用,不添加也可以)
-p 15672:15672 端口映射
-p 5672:5672
-d 后臺允許
rabbitmq:latest
訪問15672端口輸入密碼登錄
可能會遇到的問題
1、關(guān)閉防火墻后訪問端口仍然無法訪問15672端口
解決方法:
開啟防火墻
systemctl start firewalld
開放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
重新加載配置文件
firewall-cmd --reload
2、即使開放了端口15672也無法訪問頁面
解決方法:
如果是docker拉取的rabbitmq鏡像,需要手動進(jìn)入容器下載rabbitmq的管理插件
進(jìn)入容器
docker exec -it 容器名 bash
下載rabbitmq的管理插件
rabbitmq-plugins enable rabbitmq_management
修改配置文件
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
退出鏡像
exit
重啟rabbitmq
docker restart 容器名
RabbitMQ的結(jié)構(gòu)與概念
RabbitMQ中的幾個概念
- channel:操作MQ的工具
- exchange:路由消息到隊列中
- queue:緩存消息
- virtualhost:虛擬主機(jī),是對queue、exchange等資源的邏輯分組
常見消息模型
不使用交換機(jī)的
- 基本消息隊列
- 工作消息隊列
使用交換機(jī)的
- Fanout Exchange:廣播
- Direct Exchange:路由
- Topic Exchange:主題
簡單消息隊列的實現(xiàn)
只存在三種角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
- queue:消息隊列,負(fù)責(zé)接受并緩存消息
- consumer:訂閱隊列,處理隊列中的消息
示例代碼:
引入依賴
<dependencies><!--rabbitMQ的Java客戶端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>
</dependencies>
/*** 發(fā)送消息方*/
public class Producer_Hello {public static void main(String[] args) throws IOException, TimeoutException {//1、創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//2、設(shè)置參數(shù)connectionFactory.setHost("192.168.116.131");connectionFactory.setPort(5672);//默認(rèn)也是5672connectionFactory.setVirtualHost("/");//設(shè)置虛擬機(jī) 默認(rèn)值是/connectionFactory.setUsername("admin");//默認(rèn)值是guestconnectionFactory.setPassword("admin");//默認(rèn)值是guest//3、創(chuàng)建連接ConnectionConnection connection = connectionFactory.newConnection();//4、創(chuàng)建ChannelChannel channel = connection.createChannel();//5、創(chuàng)建隊列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments* 參數(shù)* 1.queue:隊列名稱* 2.durable:是否持久化* 3.exclusive:* *是否獨占。只能有一個消費者監(jiān)聽這隊列當(dāng)* *Connection關(guān)閉時,是否刪除隊列全* 4.autoDelete:是否自動刪除。當(dāng)沒有Consumer時,自動刪除掉* 5.arguments:參數(shù)。*///如果沒有一個交helloWorld的隊列,那么會自動創(chuàng)建一個channel.queueDeclare("hello_World",true,false,false,null);//6、發(fā)送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* 1、exchange交換機(jī)(簡單模式下不會使用交換機(jī),默認(rèn)使用"")* 2、routingKey:路由名稱* 3、props:配置信息* 4、body:發(fā)送消息數(shù)據(jù)*/String body="Hello";channel.basicPublish("","hello_World",null,body.getBytes());//7、釋放資源channel.close();connection.close();}
}
首先看到目前沒有連接
打斷點啟動
當(dāng)Connection connection = connectionFactory.newConnection()運行結(jié)束后。查看控制臺連接信息
接下來啟動消費者
public class Consumer_Hello {public static void main(String[] args) throws Exception {//1、創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//2、設(shè)置參數(shù)connectionFactory.setHost("192.168.116.131");connectionFactory.setPort(5672);//默認(rèn)也是5672connectionFactory.setVirtualHost("/");//設(shè)置虛擬機(jī) 默認(rèn)值是/connectionFactory.setUsername("admin");//默認(rèn)值是guestconnectionFactory.setPassword("admin");//默認(rèn)值是guest//3、創(chuàng)建連接ConnectionConnection connection = connectionFactory.newConnection();//4、創(chuàng)建ChannelChannel channel = connection.createChannel();//5、創(chuàng)建隊列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments* 參數(shù)* 1.queue:隊列名稱* 2.durable:是否持久化,當(dāng)mq重啟之后,還在* 3.exclusive:* *是否獨占。只能有一個消費者監(jiān)聽這隊列當(dāng)* *Connection關(guān)閉時,是否刪除隊列全* 4.autoDelete:是否自動刪除。當(dāng)沒有Consumer時,自動刪除掉* 5.arguments:參數(shù)。*///如果沒有一個交helloWorld的隊列,那么會自動創(chuàng)建一個channel.queueDeclare("hello_World",true,false,false,null);/*** String queue, boolean autoAck, Consumer callback* queue:隊列名稱* autoAck:是否自動確認(rèn)* callback:回調(diào)對象*/Consumer consumer =new DefaultConsumer(channel){/*回調(diào)方法,收到消息后,自動執(zhí)行*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("envelope:"+envelope.getExchange());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));}};channel.basicConsume("hello_World",true,consumer);//消費者需要監(jiān)聽因此不需要關(guān)閉資源}
}
生產(chǎn)者與消費者都需要聲明隊列是為了避免隊列不存在的情況
SpringAMQP的使用
AMQP是用于在應(yīng)用程序或之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn)。該協(xié)議與語言和平臺無關(guān),更符合微服務(wù)中獨立性的要求。而SpringAMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板來發(fā)送和接收消息。包含兩部分,其中spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認(rèn)實現(xiàn)
生產(chǎn)者實現(xiàn)
引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
在application.yml配置如下信息
spring:rabbitmq:host: 192.168.116.131port: 5672username: adminpassword: adminvirtual-host: /
編寫測試類
@SpringBootTest
@RunWith(SpringRunner.class)
public class test {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend2SimpleQueue() throws Exception {String queueName ="hello";String message = "hello, spring amqp";rabbitTemplate.convertAndSend(queueName,message);}
}
運行測試觀察rabbit控制臺
消費者實現(xiàn)
引入依賴和配置相關(guān)信息與消費者相同,不同的是,編寫一個監(jiān)聽器去監(jiān)聽隊列
@Component
public class SpringRabbitListener {@RabbitListener(queues = "hello")public void listenSimpleQueueMessage(String msg){System.out.println("接收到消息:"+msg);}
}
啟動引導(dǎo)類觀察控制臺
Work Queue工作隊列
提高消息處理速度, 避免消息的堆積問題
案例實現(xiàn):
生產(chǎn)者1秒內(nèi)生產(chǎn)50條消息
@Testpublic void testWorkQueueSendMessage() throws Exception {String queueName ="hello";String message = "hello, spring amqp__";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
而消費者代碼如下
@Component
public class SpringRabbitListener {@RabbitListener(queues = "hello")public void listenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("消費者1接收到消息:"+msg);Thread.sleep(30);}@RabbitListener(queues = "hello")public void listenWorkQueueMessage2(String msg) throws InterruptedException {System.out.println("====消費者2接收到消息:"+msg);Thread.sleep(50);}
}
運行結(jié)果如下
可以看到每個消費者各處理25條,消費者1處理更快處理結(jié)束不會去處理更多的消息而是等待消費者2處理結(jié)束。
這種情況是因為Rabbit中存在消息預(yù)取的行為,當(dāng)消息處理前會從Channel中提前拿去一部分消息(類似于輪詢平均分配)后再去處理,當(dāng)我們希望處理更快的設(shè)備能夠讀取更多的消息時,我們可以設(shè)置消息預(yù)取限制。在application.yml文件中添加如下配置
spring:rabbitmq:host: 192.168.116.131port: 5672username: adminpassword: adminvirtual-host: /listener:simple:prefetch: 1 #每次最多獲取一條消息,處理完成后才能獲取下一條消息
修改完后再次執(zhí)行觀察控制臺
可以看到消費能力更強(qiáng)的處理消息更多。
工作隊列模式應(yīng)用于任務(wù)過重或任務(wù)過多的場景(比如說發(fā)送短信)
發(fā)布訂閱模式
前兩種模式只是將消息發(fā)送給一個消費者,而發(fā)布訂閱模式可以將消息發(fā)送給多個消費者。實現(xiàn)方式是加入了exchange(交換機(jī))。exchange只負(fù)責(zé)路由,不負(fù)責(zé)存儲。路由失敗則消息丟失。
Fanout交換機(jī)(廣播模式)
@Configuration
public class FanoutConfig {//聲明交換機(jī)@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}//聲明隊列@Beanpublic Queue queue1(){return new Queue("fanoutQueue1");}@Beanpublic Queue queue2(){return new Queue("fanoutQueue2");}//聲明綁定關(guān)系@Beanpublic Binding binding1(Queue queue1,FanoutExchange exchange){return BindingBuilder.bind(queue1).to(exchange);}@Beanpublic Binding binding2(Queue queue2,FanoutExchange exchange){return BindingBuilder.bind(queue2).to(exchange);}
}
重啟消費者觀察Rabbit控制臺
編寫監(jiān)聽器
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanoutQueue1")public void listenFanoutQueueMessage1(String msg){System.out.println("從隊列queue1中獲取到消息:"+msg);}@RabbitListener(queues = "fanoutQueue2")public void listenFanoutQueueMessage2(String msg){System.out.println("從隊列queue2中獲取到消息:"+msg);}
}
編寫生產(chǎn)者測試類
@Testpublic void testFanoutQueueSendMessage() throws Exception {String exchangeName = "fanoutExchange";String message = "hello, fanout";rabbitTemplate.convertAndSend(exchangeName,"",message);}
啟動觀察Rabbit控制臺
Direct交換機(jī)(路由模式)
- 每一個Queue都與Exchange設(shè)置一個BindingKey
- 發(fā)布者發(fā)送消息時,指定消息的RoutingKey
- Exchange將消息路由到BindingKey與消息RoutingKey一致的隊列
案例實現(xiàn)
如果和Fanout模式一樣去聲明綁定關(guān)系的話,會比較麻煩,編寫代碼較多,我們可以采用注解的方式去聲明綁定關(guān)系。
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue("directQueue1"),exchange = @Exchange(value = "directExchange",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueueMessage1(String msg){System.out.println("從隊列queue1中獲取到消息:"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("directQueue2"),exchange = @Exchange(value = "directExchange",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueueMessage2(String msg){System.out.println("從隊列queue1中獲取到消息:"+msg);}
}
運行消費者后觀察Rabbit控制臺
編寫生產(chǎn)者測試類代碼
@Testpublic void testDirectQueueSendMessage() throws Exception {String exchangeName = "directExchange";String message = "hello, direct";rabbitTemplate.convertAndSend(exchangeName, "blue", message);rabbitTemplate.convertAndSend(exchangeName, "red", message + " red");rabbitTemplate.convertAndSend(exchangeName, "yellow", message + " yellow");}
運行觀察控制臺
TopicExchange(話題模式)
案例實現(xiàn)
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue("topicQueue1"),exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueueMessage1(String msg){System.out.println("從中國話題隊列中獲取到消息:"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("topicQueue2"),exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueueMessage2(String msg){System.out.println("從新聞話題隊列中獲取到消息:"+msg);}
}
運行觀察Rabbit控制臺
編寫生產(chǎn)者測試類代碼
@Testpublic void testTopicQueueSendMessage() throws Exception {String exchangeName = "topicExchange";String message = "hello, topic";rabbitTemplate.convertAndSend(exchangeName, "china.news", message+" 中國新聞");rabbitTemplate.convertAndSend(exchangeName, "china.#", message + "晴朗");rabbitTemplate.convertAndSend(exchangeName, "#.news", message + "戰(zhàn)爭");}
運行觀察Rabbit控制臺
發(fā)送三條消息但共有5條消息
消息轉(zhuǎn)換器
在簡單消息隊列的實現(xiàn)中,我們發(fā)送消息發(fā)送的是字節(jié)數(shù)組。但是接收的消息反而是String類型的字符。那是因為。Spring中對消息的處理是由org.springframework.amqp.support.converter.MessageConverter處理默認(rèn)使用SimpleMessageConverter來實現(xiàn)序列化(基于JDK的ObjectOutputStream實現(xiàn))
進(jìn)行一個測試,創(chuàng)建一個object.queue隊列,發(fā)送一個Map類型的數(shù)據(jù)
@Testpublic void testSendObject() throws Exception {Map<String, Object> map = new HashMap<>();map.put("name","zmbwcx");String queueName = "object.queue";rabbitTemplate.convertAndSend(queueName,map);}
觀察Rabbit控制臺
消息內(nèi)容被JDK序列化為上圖內(nèi)容,這種序列化方式不安全且占用內(nèi)存更大。增加了傳輸成本。
我們可以修改為JSON的序列化方式,具體操作如下
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
重新發(fā)送一條消息,觀察Rabbit控制臺
生產(chǎn)者與消費者應(yīng)該使用同一個消息轉(zhuǎn)換器,因此,消費者也應(yīng)進(jìn)行相同操作