做自己的網(wǎng)站長(zhǎng)沙百度開(kāi)戶
Muduo網(wǎng)絡(luò)庫(kù)的EventLoop模塊是網(wǎng)絡(luò)編程框架中的核心組件,負(fù)責(zé)事件循環(huán)的驅(qū)動(dòng)和管理。以下是對(duì)EventLoop模塊的詳細(xì)介紹:
作用與功能:
- EventLoop是網(wǎng)絡(luò)服務(wù)器中負(fù)責(zé)循環(huán)的重要模塊,它持續(xù)地監(jiān)聽(tīng)、獲取和處理各種事件,如IO事件、定時(shí)器事件等。
- 它通過(guò)輪詢?cè)L問(wèn)Poller(如EPollPoller),獲取激活的Channel列表,然后使Channel根據(jù)自身情況調(diào)用相應(yīng)的回調(diào)函數(shù)來(lái)處理事件。
- EventLoop確保了每個(gè)Loop都是相互獨(dú)立的,擁有自己的事件循環(huán)、Poller監(jiān)聽(tīng)者和Channel監(jiān)聽(tīng)通道列表。
與Poller的關(guān)系:
- Poller負(fù)責(zé)從事件監(jiān)聽(tīng)器上獲取監(jiān)聽(tīng)結(jié)果,即哪些文件描述符(fd)上發(fā)生了哪些事件。
- EventLoop會(huì)輪詢?cè)L問(wèn)Poller,以獲取這些發(fā)生事件的fd及其相關(guān)事件。
與Channel的關(guān)系:
- Channel類是對(duì)文件描述符(fd)以及其相關(guān)事件的封裝。它保存了fd的感興趣事件、實(shí)際發(fā)生的事件以及每種事件對(duì)應(yīng)的處理函數(shù)。
- 當(dāng)Poller檢測(cè)到某個(gè)fd上有事件發(fā)生時(shí),EventLoop會(huì)找到對(duì)應(yīng)的Channel,并調(diào)用其上的回調(diào)函數(shù)來(lái)處理該事件。
線程模型:
- EventLoop遵循“one loop one thread”的原則,即每個(gè)EventLoop都在一個(gè)獨(dú)立的線程上運(yùn)行。
- 這種設(shè)計(jì)使得事件處理更加高效和清晰,避免了多線程環(huán)境下的競(jìng)態(tài)條件和同步問(wèn)題。
mainLoop和subLoop
在Muduo網(wǎng)絡(luò)庫(kù)中,mainLoop和subLoop都是EventLoop的實(shí)例,它們分別代表主事件循環(huán)和子事件循環(huán)。
mainLoop(主事件循環(huán))
- mainLoop是整個(gè)Muduo網(wǎng)絡(luò)庫(kù)的核心事件循環(huán)。它負(fù)責(zé)監(jiān)聽(tīng)服務(wù)器套接字(通常是listenfd),并接受來(lái)自客戶端的連接請(qǐng)求。
- mainLoop運(yùn)行一個(gè)Accrptor,包含一個(gè)Poller,用于監(jiān)聽(tīng)一個(gè)特定的非阻塞的服務(wù)器sockfd上的讀事件。當(dāng)Poller檢測(cè)到有讀事件發(fā)生時(shí)(一般是新用戶連接),mainLoop會(huì)在線程池中通過(guò)輪詢算法選擇一個(gè)subLoop來(lái)處理這個(gè)連接的讀寫(xiě)和關(guān)閉事件。Acceptor將在后續(xù)闡述。
- mainLoop遵循 “one loop one thread” 的原則,即每個(gè)mainLoop都在一個(gè)獨(dú)立的線程上運(yùn)行。這確保了事件處理的高效性和清晰性,避免了多線程環(huán)境下的競(jìng)態(tài)條件和同步問(wèn)題。
subLoop(子事件循環(huán)):
- subLoop是mainLoop的子事件循環(huán),用于處理已建立的連接的讀寫(xiě)和關(guān)閉事件。每個(gè)subLoop都在一個(gè)獨(dú)立的線程上運(yùn)行,有一個(gè)用于喚醒自身的fd和Channel,運(yùn)行一個(gè)Poller,并保存自己管理的多個(gè)Channel,以實(shí)現(xiàn)并發(fā)處理多個(gè)連接的目的。
- 當(dāng)mainLoop接受到一個(gè)新的連接請(qǐng)求時(shí),它會(huì)根據(jù)EventLoopThreadPool中的線程來(lái)選擇一個(gè)subLoop,將新創(chuàng)建的TcpConnection的Channel放入這個(gè)subLoop中。這個(gè)subLoop會(huì)接管該連接的fd,并監(jiān)聽(tīng)其上的讀寫(xiě)和關(guān)閉事件。
- subLoop中的事件處理邏輯與mainLoop類似,也是通過(guò)Poller來(lái)監(jiān)聽(tīng)fd上的事件,并調(diào)用相應(yīng)的回調(diào)函數(shù)來(lái)處理這些事件。
- 由于subLoop是獨(dú)立的線程,因此它們可以并行處理多個(gè)連接,從而提高了服務(wù)器的并發(fā)處理能力。
總的來(lái)說(shuō),mainLoop和subLoop共同構(gòu)成了Muduo網(wǎng)絡(luò)庫(kù)的事件驅(qū)動(dòng)編程框架。mainLoop負(fù)責(zé)監(jiān)聽(tīng)服務(wù)器套接字并接受連接請(qǐng)求,而subLoop則負(fù)責(zé)處理已建立的連接的讀寫(xiě)和關(guān)閉事件。通過(guò)合理的線程調(diào)度和事件處理機(jī)制,Muduo網(wǎng)絡(luò)庫(kù)能夠高效、穩(wěn)定地處理大量的并發(fā)連接請(qǐng)求。
EventLoop.h
#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"
#include "LogStream.h"#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>
#include <sys/types.h>class Channel;
class Poller;/*** 事件循環(huán)類 兩大模型:Channel Poller* mainLoop只負(fù)責(zé)處理IO,并返回client的fd* subLoop負(fù)責(zé)監(jiān)聽(tīng)poll,并處理相應(yīng)的回調(diào)* 兩者之間通過(guò)weakupfd進(jìn)行通信
*/
class EventLoop : noncopyable
{
public:using Functor = std::function<void()>;EventLoop();~EventLoop();// 開(kāi)啟loopvoid loop();// 退出loopvoid quit();Timestamp pollReturnTime() const { return pollReturnTime_; }// 在當(dāng)前l(fā)oop執(zhí)行cbvoid runInLoop(Functor cb);// 把cb放入隊(duì)列,喚醒subloop所在的線程,執(zhí)行cbvoid queueInLoop(Functor cb);size_t queueSize() const;// 喚醒loop所在的線程,EventLoop::queueInLoop中調(diào)用void wakeup();// EventLoop方法 =》 Poller的方法void updateChannel(Channel *channel);void removeChannel(Channel *channel);bool hasChannel(Channel *channel);// 判斷EventLoop對(duì)象是否在自己的線程中bool isInLoopThread() const {return threadId_ == CurrentThread::tid();}private:// waked up后的一個(gè)操作 void handleRead(); // 執(zhí)行回調(diào)void doPendingFunctors(); using ChannelList = std::vector<Channel *>;std::atomic_bool looping_; // 原子操作,通過(guò)CAS實(shí)現(xiàn)std::atomic_bool quit_; // 標(biāo)識(shí)退出loop循環(huán)const pid_t threadId_; // 記錄當(dāng)前l(fā)oop所屬的線程idTimestamp pollReturnTime_; // poller返回發(fā)生事件的channels的時(shí)間點(diǎn)std::unique_ptr<Poller> poller_;int wakeupFd_; // 當(dāng)mainLoop獲取一個(gè)新用戶的channel,通過(guò)輪詢算法選擇一個(gè)subloop,通過(guò)該成員喚醒subloop處理channel。使用eventfd// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client.std::unique_ptr<Channel> wakeupChannel_;// scratch variablesChannelList activeChannels_;std::atomic_bool callingPendingFunctors_; // 標(biāo)識(shí)當(dāng)前l(fā)oop是否有需要執(zhí)行的回調(diào)操作,正在執(zhí)行則為truestd::vector<Functor> pendingFunctors_; // 存儲(chǔ)loop需要執(zhí)行的所有回調(diào)操作std::mutex mutex_; // 保護(hù)pendingFunctors_線程安全
};
EventLoop.cc
#include "EventLoop.h"
#include "LogStream.h"
#include "Poller.h"
#include "Channel.h"#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <iostream>// 防止一個(gè)線程創(chuàng)建多個(gè)EventLoop threadLocal
__thread EventLoop *t_loopInThisThread = nullptr;// 定義Poller超時(shí)時(shí)間
const int kPollTimeMs = 10000;// 創(chuàng)建weakupfd,用來(lái)notify喚醒subReactor處理新來(lái)的channel
int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_FATAL << "Failed in eventfd" << errno;}return evtfd;
}EventLoop::EventLoop(): looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_))
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}// 設(shè)置weakupfd的事件類型以及發(fā)生事件后的回調(diào)操作wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfd// 每一個(gè)EventLoop都將監(jiān)聽(tīng)weakupChannel的EPOLLIN讀事件了// 作用是subloop在阻塞時(shí)能夠被mainLoop通過(guò)weakupfd喚醒wakeupChannel_->enableReading();
}EventLoop::~EventLoop()
{LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_<< " destructs in thread " << CurrentThread::tid();wakeupChannel_->disableAll();wakeupChannel_->remove();::close(wakeupFd_);t_loopInThisThread = NULL;
}void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";}
}void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO << "EventLoop " << this << " start looping";while (!quit_){activeChannels_.clear();// 當(dāng)前EventLoop的Poll,監(jiān)聽(tīng)兩類fd,client的fd(正常通信的,在baseloop中)和 weakupfd(mainLoop 和 subLoop 通信用來(lái)喚醒sub的)pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller監(jiān)聽(tīng)哪些channel發(fā)生事件了,然后上報(bào)給EventLoop,通知channel處理相應(yīng)的事件channel->handleEvent(pollReturnTime_);}// 執(zhí)行當(dāng)前EventLoop事件循環(huán)需要處理的回調(diào)操作/*** IO線程 mainLoop 只 accept 然后返回client通信用的fd <= 用channel打包 并分發(fā)給 subloop* mainLoop事先注冊(cè)一個(gè)回調(diào)cb(需要subLoop來(lái)執(zhí)行),weakup subloop后,* 執(zhí)行下面的方法,執(zhí)行之前mainLoop注冊(cè)的cb操作(一個(gè)或多個(gè))*/doPendingFunctors();}LOG_INFO << "EventLoop " << this << " stop looping";looping_ = false;
}/*** 退出事件循環(huán)* 1、loop在自己的線程中 調(diào)用quit,此時(shí)肯定沒(méi)有阻塞在poll中* 2、在其他線程中調(diào)用quit,如在subloop(woker)中調(diào)用mainLoop(IO)的qiut** mainLoop* * Muduo庫(kù)沒(méi)有 生產(chǎn)者-消費(fèi)者線程安全的隊(duì)列 存儲(chǔ)Channel* 直接使用wakeupfd進(jìn)行線程間的喚醒 ** subLoop1 subLoop2 subLoop3*/
void EventLoop::quit()
{quit_ = true;// 2中,此時(shí),若當(dāng)前woker線程不等于mainLoop線程,將本線程在poll中喚醒if (!isInLoopThread()){wakeup();}
}void EventLoop::runInLoop(Functor cb)
{// LOG_DEBUG<<"EventLoop::runInLoop cb:" << (cb != 0);if (isInLoopThread()) // 產(chǎn)生段錯(cuò)誤{ // 在當(dāng)前l(fā)oop線程中 執(zhí)行cbLOG_DEBUG << "在當(dāng)前l(fā)oop線程中 執(zhí)行cb";cb();}else{ // 在其他loop線程執(zhí)行cb,需要喚醒其loop所在線程,執(zhí)行cbLOG_DEBUG << "在其他loop線程執(zhí)行cb,需要喚醒其loop所在線程,執(zhí)行cb";queueInLoop(cb);}
}void EventLoop::queueInLoop(Functor cb)
{{std::unique_lock<std::mutex> ulock(mutex_);pendingFunctors_.emplace_back(cb);}// 喚醒相應(yīng)的,需要執(zhí)行上面回調(diào)操作的loop線程// 若當(dāng)前線程正在執(zhí)行回調(diào)doPendingFunctors,但是又有了新的回調(diào)cb// 防止執(zhí)行完回調(diào)后又阻塞在poll上無(wú)法執(zhí)行新cb,所以預(yù)先wakeup寫(xiě)入一個(gè)數(shù)據(jù)if (!isInLoopThread() || callingPendingFunctors_) {wakeup(); // 喚醒loop所在線程}
}// 用來(lái)喚醒loop所在的線程,向wakeupfd寫(xiě)一個(gè)數(shù)據(jù),wakeupChannel就發(fā)生讀事件,當(dāng)前l(fā)oop線程就會(huì)被喚醒
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = ::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}void EventLoop::updateChannel(Channel *channel)
{// channel是發(fā)起方,通過(guò)loop調(diào)用pollpoller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel *channel)
{// channel是發(fā)起方,通過(guò)loop調(diào)用pollpoller_->removeChannel(channel);
}bool EventLoop::hasChannel(Channel *channel)
{return poller_->hasChannel(channel);
}// 執(zhí)行回調(diào),由TcpServer提供的回調(diào)函數(shù)
void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;callingPendingFunctors_ = true; // 正在執(zhí)行回調(diào)操作{ // 使用swap,將原pendingFunctors_置空并且釋放,其他線程不會(huì)因?yàn)閜endingFunctors_阻塞std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor &functor : functors){functor(); // 執(zhí)行當(dāng)前l(fā)oop需要的回調(diào)操作}callingPendingFunctors_ = false;
}