wordpress更改路徑湖北網(wǎng)站seo設(shè)計
十八、day18
到目前為止,我們以及學習了單線程同步/異步服務(wù)器、多線程IOServicePool和多線程IOThreadPool模型,今天學習如何通過asio協(xié)程實現(xiàn)并發(fā)服務(wù)器。
并發(fā)服務(wù)器有以下幾種好處:
- 協(xié)程比線程更輕量,創(chuàng)建和銷毀協(xié)程的開銷較小,適合高并發(fā)場景
- 協(xié)程通常在單線程中運行,避免了多線程帶來的資源競爭和同步問題,從而減少了內(nèi)存使用
- 將回調(diào)函數(shù)改寫為順序調(diào)用,讓異步的函數(shù)能夠以同步的方式寫出來的同時不降低性能,提高開發(fā)效率
- 協(xié)程調(diào)度比線程調(diào)度更輕量化,因為協(xié)程是運行在用戶空間的,線程切換需要在用戶空間和內(nèi)核空間切換
首先需將C++語言標準換為C++20標準,協(xié)程是在C++20之后引入的新標準
1. 官方案例
asio官網(wǎng)提供了一個協(xié)程并發(fā)編程的案例,如下
#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;awaitable<void> echo(tcp::socket socket) {try {char data[1024];for (;;) {std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);}}catch (std::exception& e) {std::cout << "Echo exception is " << e.what() << std::endl;}
}awaitable<void> listener() {auto executor = co_await this_coro::executor;tcp::acceptor acceptor(executor, { tcp::v4(), 10086 });for (;;) {tcp::socket socket = co_await acceptor.async_accept(use_awaitable);co_spawn(executor, echo(std::move(socket)), detached);}
}int main()
{try {boost::asio::io_context io_context(1); // 1被用于提供有關(guān)所需并發(fā)級別的提示boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&](auto, auto) { // 處理幾個信號就傳入幾個參數(shù),這里使用auto自動推斷io_context.stop();});co_spawn(io_context, listener(), detached);io_context.run();}catch (std::exception& e) {std::cout << "Exception is " << e.what() << std::endl;}
}
a. 聲明
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
namespace this_coro = boost::asio::this_coro;
- awaitable?:用于定義可以在協(xié)程中使用的異步操作,可以通過?co_await?關(guān)鍵字等待異步任務(wù)的完成,使異步的函數(shù)能夠以同步的方式寫出來的同時不降低性能
- co_spawn:用于啟動新的協(xié)程的函數(shù),可以用它來創(chuàng)建新的異步任務(wù)并在指定的執(zhí)行上下文中運行
- detached:指示器,表示創(chuàng)建的協(xié)程不需要等待其結(jié)果。使用?detached?后,協(xié)程會在后臺獨立運行
- use_awaitable:適配器,指示以協(xié)程的方式使用 Boost.Asio 的異步操作,它使得異步操作可以與?co_await?關(guān)鍵字結(jié)合使用。適配器允許將異步操作的結(jié)果直接與協(xié)程的執(zhí)行流結(jié)合,使得異步調(diào)用能夠以同步的方式寫出,從而避免了手動管理回調(diào)函數(shù)
- co_await?關(guān)鍵字的作用:
- 當協(xié)程遇到 co_await 時,它會掛起執(zhí)行,直到被等待的異步操作完成。這允許當前線程釋放 CPU,去處理其他任務(wù)或協(xié)程。
- 一旦等待的操作完成,協(xié)程會自動恢復執(zhí)行,繼續(xù)從掛起的地方運行。這樣可以避免復雜的回調(diào)地獄,提供更直觀的控制流。
- co_await 會自動獲取異步操作的結(jié)果并將其返回給調(diào)用者。例如,如果等待的是一個返回值的異步操作,結(jié)果會被賦給相應(yīng)的變量。
- 如果在 co_await 等待的異步操作中發(fā)生異常,協(xié)程可以捕獲這些異常,方便進行錯誤處理。
b. echo()
awaitable<void> echo(tcp::socket socket) {try {char data[1024];for (;;) {std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);}}catch (std::exception& e) {std::cout << "Echo exception is " << e.what() << std::endl;}
}
awaitable<void>類型允許函數(shù)在執(zhí)行時可以被暫停和恢復,這使得它能夠與 co_await 一起使用,所以函數(shù)返回類型必須是awaitable<void>。
echo?函數(shù)能夠高效處理多個客戶端連接而不阻塞線程,主要是因為:
- echo 函數(shù)使用 socket.async_read_some 和 async_write 方法進行異步讀寫操作。這意味著當函數(shù)執(zhí)行這些操作時,它不會阻塞當前線程,而是可以在等待 I/O 完成時讓出控制權(quán)。
- 使用協(xié)程和 co_await,當 I/O 操作掛起時,協(xié)程會被暫停并釋放線程。這使得同一線程可以處理其他任務(wù)或更多的連接,而不需要為每個連接創(chuàng)建新的線程。
- 服務(wù)器的主循環(huán)(io_context.run())會持續(xù)運行,處理所有已準備好的異步操作。這樣一來,多個連接可以并發(fā)處理,而不需要多個線程同時活躍。
- 當協(xié)程通過 co_await 等待 I/O 操作時,它不會占用 CPU 資源。主線程可以繼續(xù)接受新的連接或處理其他已完成的操作,從而提高并發(fā)能力。
其中
std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
該段代碼使用co_await?關(guān)鍵字等待異步讀取操作完成,并將讀取的字節(jié)數(shù)存儲到n中。和之前異步服務(wù)器異步操作需要綁定回調(diào)函數(shù)不同,這里通過協(xié)程實現(xiàn)的并發(fā)服務(wù)器讀寫通過co_await?關(guān)鍵字和use_awaitable適配器組合使用,會自動處理異步操作的結(jié)果。當調(diào)用?socket.async_read_some?時,協(xié)程會暫停執(zhí)行,并在操作完成時恢復。這個機制隱藏了回調(diào)的復雜性,使得代碼更簡潔和易讀。當異步操作完成時,協(xié)程會自動繼續(xù)執(zhí)行,并將結(jié)果傳遞給 n 。
co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
同理,異步寫函數(shù)也以同步的方式使用,不需要顯示bind回調(diào)函數(shù),co_await?關(guān)鍵字會等待異步讀取操作完成,而適配器use_awaitable允許將異步操作的結(jié)果直接與協(xié)程的執(zhí)行流結(jié)合
c. listener()
awaitable<void> listener() {auto executor = co_await this_coro::executor;tcp::acceptor acceptor(executor, { tcp::v4(), 10086 });for (;;) {tcp::socket socket = co_await acceptor.async_accept(use_awaitable);co_spawn(executor, echo(std::move(socket)), detached);}
}
該函數(shù)不斷監(jiān)聽 TCP 端口,接受來自客戶端的連接。每當有新連接到達時,它會啟動一個 echo 協(xié)程來處理該連接。這種設(shè)計使得服務(wù)器能夠同時處理多個客戶端連接而不會阻塞,提高了并發(fā)處理能力。
auto executor = co_await this_coro::executor;
獲取執(zhí)行器:
- this_coro::executor?是特殊的上下文,用于獲取當前協(xié)程的執(zhí)行器(executor),它定義了協(xié)程將在哪個上下文(io_context)中運行
- co_await?關(guān)鍵字使得協(xié)程在獲取執(zhí)行器時可以暫停,并在獲取到執(zhí)行器后恢復執(zhí)行。
co_spawn(executor, echo(std::move(socket)), detached);
- 啟動處理協(xié)程:
- co_spawn?啟動一個新的協(xié)程
- executor?指定了新的協(xié)程的執(zhí)行上下文
- echo(std::move(socket)) 創(chuàng)建一個新的 echo 協(xié)程來處理該連接。std::move(socket) 將 socket 移動到 echo 協(xié)程中,避免不必要的拷貝。移動socket之后,上面的socket便無法發(fā)揮作用,因為該socket已經(jīng)被移動至echo中。
- detached 表示新協(xié)程的執(zhí)行不需要主協(xié)程等待其完成。
d. main()
int main()
{try {boost::asio::io_context io_context(1); // 1被用于提供有關(guān)所需并發(fā)級別的提示boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&](auto, auto) { // 處理幾個信號就傳入幾個參數(shù),這里使用auto自動推斷io_context.stop();});co_spawn(io_context, listener(), detached);io_context.run();}catch (std::exception& e) {std::cout << "Exception is " << e.what() << std::endl;}
}
io_context有多個重載,這里使用的重載原型為
explicit io_context(int concurrency_hint);
concurrency_hint用來提示實現(xiàn)該類的系統(tǒng),它應(yīng)當允許多少個線程(不是協(xié)程)同時運行。
- concurrency_hint=0時,則I/O操作的實現(xiàn)將使用默認的并發(fā)級別,此時,io_context 將根據(jù)內(nèi)部實現(xiàn)和系統(tǒng)資源自動決定使用多少線程;
- concurrency_hint=1時,則I/O操作的實現(xiàn)將嘗試最小化線程的創(chuàng)建,并且不會創(chuàng)建額外的工作線程,常表示僅使用一個線程來處理所有 I/O 操作,適用于大多數(shù)簡單的應(yīng)用場景,避免不必要的線程開銷;
- concurrency_hint>1時,則I/O操作的實現(xiàn)將允許同時運行多個工作線程,允許程序在多個線程中并行處理 I/O 操作,從而提高性能。
signals.async_wait([&](auto, auto) { // 處理幾個信號就傳入幾個參數(shù),這里使用auto自動推斷io_context.stop();});
信號處理,當遇到退出信號(ctrl+c或強制終止信號)時,執(zhí)行l(wèi)ambd函數(shù),停止ioc的運行。
co_spawn(io_context, listener(), detached);
啟動一個listener協(xié)程,開始監(jiān)聽客戶端連接,并且這個協(xié)程的執(zhí)行不需要主協(xié)程等待其完成。
2. 客戶端
#include <iostream>
#include <boost/asio.hpp>const int MAX_LENGTH = 1024;int main()
{try {boost::asio::io_context ioc;boost::asio::ip::tcp::endpoint remote_ep(boost::asio::ip::address::from_string("127.0.0.1"), 10086);boost::asio::ip::tcp::socket sock(ioc);boost::system::error_code error = boost::asio::error::host_not_found;sock.connect(remote_ep, error);if (error) {std::cout << "connect failed, code is " << error.value() << " error msg is " << error.what() << std::endl;return 0;}std::cout << "Enter message: ";char request[MAX_LENGTH];std::cin.getline(request, MAX_LENGTH);size_t request_length = strlen(request);boost::asio::write(sock, boost::asio::buffer(request, request_length));char reply[MAX_LENGTH];size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply, request_length));std::cout << "reply is " << std::string(reply, reply_length) << std::endl;getchar();}catch (std::exception& e) {std::cerr << "Exception is " << e.what() << std::endl;}return 0;
}
和之前的客戶端處理基本類似,只不過忽略了消息節(jié)點封裝和序列號操作。
3. 修改之前的服務(wù)器函數(shù)
void CSession::Start() {auto shared_this = shared_from_this();//開啟接收協(xié)程co_spawn(_io_context, [=]()->awaitable<void> {try {for (;!_b_close;) {_recv_head_node->Clear();std::size_t n = co_await boost::asio::async_read(_socket,boost::asio::buffer(_recv_head_node->_data, HEAD_TOTAL_LEN),use_awaitable);if (n == 0) {std::cout << "receive peer closed" << endl;Close();_server->ClearSession(_uuid);co_return;}//獲取頭部MSGID數(shù)據(jù)short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//網(wǎng)絡(luò)字節(jié)序轉(zhuǎn)化為本地字節(jié)序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;//id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_uuid);co_return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);//網(wǎng)絡(luò)字節(jié)序轉(zhuǎn)化為本地字節(jié)序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;//長度非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_uuid);co_return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);//讀出包體n = co_await boost::asio::async_read(_socket,boost::asio::buffer(_recv_msg_node->_data, _recv_msg_node->_total_len), use_awaitable);if (n == 0) {std::cout << "receive peer closed" << endl;Close();_server->ClearSession(_uuid);co_return;}_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';cout << "receive data is " << _recv_msg_node->_data << endl;//投遞給邏輯線程LogicSystem::GetInstance().PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));}}catch (std::exception& e) {std::cout << "exception is " << e.what() << endl;Close();_server->ClearSession(_uuid);}}, detached);
}
在新的Session中,不需要綁定回調(diào)函數(shù)進行處理,而是通過關(guān)鍵字co_await 和適配器use_awaitable,使異步函數(shù)通過同步方式寫出來,在一個函數(shù)中進行數(shù)據(jù)的粘包處理、網(wǎng)絡(luò)序列-本地序列轉(zhuǎn)換、序列化處理,并將消息投遞至邏輯隊列。
通過協(xié)程實現(xiàn)并發(fā)服務(wù)器可大大減少代碼量,相比異步編程更加直觀,但受限于平臺,目前C++20的協(xié)程說是協(xié)程庫,實際上只是開放了無棧協(xié)程的協(xié)議,正兒八經(jīng)的官方協(xié)程還未發(fā)布。