網(wǎng)站建設(shè)有幾種方式北京網(wǎng)站優(yōu)化外包
異步讀寫api
- 異步寫操作
- async_write_some
- async_send
- 異步讀操作
- async_read_some
- async_receive
??定義一個Session類,主要是為了服務(wù)端專門為客戶端服務(wù)創(chuàng)建的管理類
class Session {
public:Session(std::shared_ptr<asio::ip::tcp::socket> socket);void Connect(const asio::ip::tcp::endpoint& ep);
private:std::shared_ptr<asio::ip::tcp::socket> _socket;
};
異步寫操作
??在介紹異步寫之前,需要先封裝一個Node結(jié)構(gòu),用來管理發(fā)送的數(shù)據(jù)
class MsgNode
{friend class Session;
public:MsgNode(const char* msg, int total_len):_total_len(total_len),_cur_len(0){_msg = new char[total_len];memcpy(_msg, msg, total_len);}MsgNode(int total_len):_total_len(total_len),_cur_len(0){_msg = new char[_total_len];}~MsgNode(){delete[] _msg;}
private:char* _msg;int _total_len;int _cur_len;
};
??其中,_msg
表示要發(fā)送的數(shù)據(jù),_cur_len
表示已經(jīng)發(fā)送的長度,而_total_len
表示數(shù)據(jù)的總長度
async_write_some
??通過源碼可以看出,async_write_some
需要兩個參數(shù)。第一個參數(shù)是buffer
結(jié)構(gòu)的數(shù)據(jù),用來放需要發(fā)送的數(shù)據(jù);第二個參數(shù)是一個回調(diào)函數(shù),這個回調(diào)函數(shù)又有兩個參數(shù),一個是用來存放錯誤碼的對象,另一個是無符號整數(shù)(這個無符號整數(shù)代表的就是當前具體發(fā)送數(shù)據(jù)的大小)
??當調(diào)用完async_write_some之后(即一次異步寫操作結(jié)束之后),系統(tǒng)會調(diào)用這個回調(diào)函數(shù)。
void Session::WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode> node)
{if (node->_cur_len + bytes_transferred <= node->_total_len){node->_cur_len += bytes_transferred;this->_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - node->_cur_len),std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));}
}void Session::WriteToSocketErr(const std::string& buf)
{_send_node = std::make_shared<MsgNode>(buf.c_str(), buf.size());_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
??在以上代碼中,先在WriteToSocketErr
函數(shù)中創(chuàng)建一個消息結(jié)點,然后調(diào)用async_write_some
將數(shù)據(jù)發(fā)送出去。當一次寫操作結(jié)束之后。系統(tǒng)會將錯誤碼和已寫入數(shù)據(jù)的長度作為參數(shù)給回調(diào)函數(shù)。
if (node->_cur_len + bytes_transferred <= node->_total_len)
??在回調(diào)函數(shù)中判斷是否已經(jīng)將數(shù)據(jù)全部發(fā)送出去了,如果沒有,則更新_cur_len
,然后繼續(xù)執(zhí)行異步發(fā)送操作
??但是,以上代碼邏輯中存在一個漏洞。在異步執(zhí)行的邏輯中,代碼調(diào)用的順序是不確定的。
??舉個例子,當需要連續(xù)兩次發(fā)送hello world
//連續(xù)兩次調(diào)用
WriteToSocketErr("HelloWorld");
WriteToSocketErr("HelloWorld");
??可能會發(fā)生第一次進行寫入的時候只寫入了Hello
,這時按照邏輯需要執(zhí)行回調(diào)函數(shù),當在回調(diào)函數(shù)中發(fā)現(xiàn)數(shù)據(jù)并沒有發(fā)送完全,于是再次調(diào)用async_write_some
想繼續(xù)寫入World
,但此時第二次調(diào)用WriteToSocketErr("HelloWorld");
中,已經(jīng)提前一步調(diào)用了async_write_some
并將數(shù)據(jù)全部寫完,然后才輪到第一次發(fā)送時的回調(diào)函數(shù)將剩下的World
繼續(xù)發(fā)完。這最終導(dǎo)致的結(jié)果時對方收到的數(shù)據(jù)為HelloHelloWorldWorld
.
??為了確保發(fā)送順序的問題,可以在Session類中定義一個隊列用來管理需要發(fā)送的結(jié)點和i一個布爾類型變量用來表示當前是否有數(shù)據(jù)正在被發(fā)送(初始化為false)
class Session{
public:Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):_socket(socket),_send_pending(false){}void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);void WriteToSocket(const std::string &buf);
private:std::queue<std::shared_ptr<MsgNode>> _send_queue;std::shared_ptr<asio::ip::tcp::socket> _socket;bool _send_pending;
};
??此時再對寫操作進行改進
void Session::WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{if (ec.value() != 0){std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;return;}std::shared_ptr<MsgNode>&node = _send_queue.front();node->_cur_len += bytes_transferred;if (node->_cur_len + bytes_transferred < node->_total_len)//還沒有發(fā)送完{_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - bytes_transferred),std::bind(&WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}_send_queue.pop();if (_send_queue.empty()){_send_pending = false;}else{std::shared_ptr<MsgNode>& node = _send_queue.front();_socket->async_write_some(boost::asio::buffer(node->_msg, node->_total_len),std::bind(&Session::WriteCallBack, std::placeholders::_1, std::placeholders::_2));}
}void Session::WriteToSocket(const std::string& buf)
{_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));if (_send_pending)//當前有消息正在發(fā){return;}_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));_send_pending = true;
}
??在WriteToSocket
函數(shù)中,先不著急將數(shù)據(jù)立馬發(fā)送出去,而是將數(shù)據(jù)節(jié)點放入到發(fā)送隊列中,然后判斷當前是否有數(shù)據(jù)正在發(fā)送,如果有就返回避免沖突;沒有就直接調(diào)用async_write_some
,在回調(diào)函數(shù)中,永遠都是取出隊首的結(jié)點進行發(fā)送,如果判斷隊首的元素數(shù)據(jù)已經(jīng)發(fā)送完了就pop
掉,并且檢查隊列中是否還有需要發(fā)送的元素:如果有,繼續(xù)執(zhí)行發(fā)送邏輯;如果沒有就將_send_pending
置為false表示當前已經(jīng)沒有數(shù)據(jù)正在發(fā)送了。
async_send
??async_send
的作用是直接將所有數(shù)據(jù)全部發(fā)送完,代碼邏輯也比async_write_some
要簡單一些
void Session::WriteAllToSocket(const std::string& buf)
{_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));if (_send_pending){return;}_socket->async_send(boost::asio::buffer(buf.c_str(), buf.size()),std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));_send_pending = true;
}void Session::WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred)
{if (ec.value() != 0){std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;return;}_send_queue.pop();if (_send_queue.empty()){_send_pending = false;}else{std::shared_ptr<MsgNode>& node = _send_queue.front();_socket->async_send(boost::asio::buffer(node->_msg, node->_total_len),std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));}
}
注意
??async_send
和async_write_some
不要放在一起使用,因為async_send
底層還是多次調(diào)用的async_write_some
。如果一起使用,還是會引發(fā)數(shù)據(jù)沖突的問題
異步讀操作
??為了準備讀操作,需要在Session類中添加數(shù)據(jù)結(jié)點_recv_node
和一個布爾變量_recv_pending
class Session
{
public:Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):_socket(socket),_send_pending(false),_recv_pending(false){}void Connect(boost::asio::ip::tcp::endpoint& ep);void WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode>);void WriteToSocketErr(const std::string& buf);void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);void WriteToSocket(const std::string& buf);void WriteAllToSocket(const std::string& buf);void WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred);void ReadFromSocket();void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);void ReadAllFromSocket();void ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);private:std::shared_ptr<boost::asio::ip::tcp::socket> _socket;std::shared_ptr<MsgNode> _send_node;std::queue<std::shared_ptr<MsgNode>> _send_queue;std::shared_ptr<MsgNode> _recv_node;bool _recv_pending;bool _send_pending;
};
??由于接收的數(shù)據(jù)在TCP緩沖區(qū)里面已經(jīng)是排好序了的,所以并不需要隊列來維護順序
async_read_some
其實異步讀和異步寫的邏輯類似,這里就不多介紹了
void Session::ReadFromSocket()
{if (_recv_pending){return;}_recv_node = std::make_shared<MsgNode>(RECVSIZE);_socket->async_read_some(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));_recv_pending = true;
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{if (ec.value() != 0){std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;return;}if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len){_recv_node->_cur_len += bytes_transferred;_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}_recv_pending = false;
}
async_receive
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{if (ec.value() != 0){std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;return;}if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len){_recv_node->_cur_len += bytes_transferred;_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}_recv_pending = false;
}void Session::ReadAllFromSocket()
{if (_recv_pending){return;}_recv_node = std::make_shared<MsgNode>(RECVSIZE);_socket->async_receive(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),std::bind(&Session::ReadAllCallBack, this, std::placeholders::_1, std::placeholders::_2));_recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{if (ec.value() != 0){std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;return;}_recv_pending = false;
}