wordpress僅顯示標(biāo)題互聯(lián)網(wǎng)廣告優(yōu)化
文章目錄
- 一、LoopThreadPool模塊
- 二、實(shí)現(xiàn)思想
- (一)管理
- (二)流程
- (三)功能設(shè)計(jì)
- 三、代碼
一、LoopThreadPool模塊
TcpServer模塊: 對(duì)所有模塊的整合,通過(guò) tcpserver 模塊實(shí)例化的對(duì)象,可以非常簡(jiǎn)單的完成一個(gè)服務(wù)器的搭建。
對(duì)前面所有子模塊的整合模塊,提供給用戶(hù)用于搭建一個(gè)高性能服務(wù)器的模塊!
二、實(shí)現(xiàn)思想
(一)管理
- Acceptor對(duì)象,創(chuàng)建一個(gè)監(jiān)聽(tīng)套接字!
- EventLoop 對(duì)象,baseloop對(duì)象,實(shí)現(xiàn)對(duì)監(jiān)聽(tīng)套接字的事件監(jiān)控!
- std::vector conns,實(shí)現(xiàn)對(duì)新建連接的管理!
- EventLoopPool 對(duì)象,創(chuàng)建loop線(xiàn)程池,對(duì)新建連接進(jìn)行事件監(jiān)控和處理!
(二)流程
流程:
1. 在TcpServer中實(shí)例一個(gè)Acceptor對(duì)象,以及一個(gè)EventLoop 對(duì)象(baseloop)
2. 將Acceptor 掛在baseloop 進(jìn)行事件監(jiān)控
3. 一旦Acceptor 對(duì)象就緒了可讀事件,則執(zhí)行時(shí)間回調(diào)函數(shù)獲取新建連接!
4. 對(duì)新連接,創(chuàng)造一個(gè) Connection 進(jìn)行管理!
5. 對(duì)新連接對(duì)應(yīng)的 Connection 設(shè)置功能回調(diào) (連接完成回調(diào),消息回調(diào),關(guān)閉回調(diào),任意事件監(jiān)控!)
6. 啟動(dòng)Connettion 的非活躍鏈接的超時(shí)銷(xiāo)毀功能
7. 將新連接對(duì)應(yīng)的Connection 掛到 LoopThreadPool 中的叢書(shū)線(xiàn)程對(duì)應(yīng)的Eventloop 中進(jìn)行事件監(jiān)控!
8. 一旦Connection對(duì)應(yīng)的鏈接就緒了可讀事件,則這個(gè)時(shí)候執(zhí)行讀事件回調(diào)函數(shù),讀取數(shù)據(jù),讀取完畢后調(diào)用TcpServer設(shè)置的消息回調(diào)!
(三)功能設(shè)計(jì)
- 設(shè)置從屬線(xiàn)程池?cái)?shù)量!
2. 啟動(dòng)服務(wù)器
3. 設(shè)置各種回調(diào)函數(shù)!(連接建立完成,消息,關(guān)閉,任意) 用戶(hù)設(shè)置給TcpServer TcpServer設(shè)置獲取的新連接!
4. 是否啟動(dòng)非活躍連接超時(shí)銷(xiāo)毀功能
5. 添加任務(wù)!
三、代碼
class TcpServer {private:uint64_t _next_id; //這是一個(gè)自動(dòng)增長(zhǎng)的連接ID,int _port;int _timeout; //這是非活躍連接的統(tǒng)計(jì)時(shí)間---多長(zhǎng)時(shí)間無(wú)通信就是非活躍連接bool _enable_inactive_release;//是否啟動(dòng)了非活躍連接超時(shí)銷(xiāo)毀的判斷標(biāo)志EventLoop _baseloop; //這是主線(xiàn)程的EventLoop對(duì)象,負(fù)責(zé)監(jiān)聽(tīng)事件的處理Acceptor _acceptor; //這是監(jiān)聽(tīng)套接字的管理對(duì)象LoopThreadPool _pool; //這是從屬EventLoop線(xiàn)程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有連接對(duì)應(yīng)的shared_ptr對(duì)象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay) {_next_id++;_baseloop.TimerAdd(_next_id, delay, task);}//為新連接構(gòu)造一個(gè)Connection進(jìn)行管理void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//啟動(dòng)非活躍超時(shí)銷(xiāo)毀conn->Established();//就緒初始化_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection &conn) {int id = conn->Id();auto it = _conns.find(id);if (it != _conns.end()) {_conns.erase(it);}}//從管理Connection的_conns中移除連接信息void RemoveConnection(const PtrConnection &conn) {_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//將監(jiān)聽(tīng)套接字掛到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) {_closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }//用于添加一個(gè)定時(shí)任務(wù)void RunAfter(const Functor &task, int delay) {_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start() { _pool.Create(); _baseloop.Start(); }
};