網(wǎng)站建設新方向開魯網(wǎng)站seo不用下載
GitHub - yzfzzz/MyWebServer: Linux高并發(fā)服務器項目,參考了TinyWebServer,將在此基礎上進行性能改進與功能增加。為方便讀者學習,附帶詳細注釋和博客!
TinyWebserver的復現(xiàn)與改進(1):服務器環(huán)境的搭建與測試-CSDN博客
TinyWebserver的復現(xiàn)與改進(2):項目的整體框架-CSDN博客
TinyWebserver的復現(xiàn)與改進(3):線程同步機制類封裝及線程池實現(xiàn)-CSDN博客
Reactor模式
今天我們將采用主從Reactor多線程模式,這是是大多數(shù)高性能服務器采用的模式
主從Reactor多線程模式要求主線程(I/O處理單元)只需負責:
- 監(jiān)聽文件描述符上是否有事件發(fā)生,
- 有的話就立即將該事件通知工作線程(邏輯單元),
- 將 socket 可讀可寫事件放入請求隊列,交給工作線程處理。
除此之外,主線程不做任何其他實質性的工作。讀寫數(shù)據(jù),接受新的連接,以及處理客戶請求均在工作線程中完成。
Reactor模式的工作流程
使用同步 I/O(以 epoll_wait 為例)實現(xiàn)的 Reactor 模式的工作流程是:
- 主線程往 epoll 內核事件表中注冊 socket 上的讀就緒事件。 (即把listenfd、和已連接的客戶端socketfd加入到epoll模型中)
- 主線程調用 epoll_wait 等待 socket 上有數(shù)據(jù)可讀。
- 當 socket 上有數(shù)據(jù)可讀時, epoll_wait 通知主線程。主線程則將 socket 可讀事件放入請求隊列。
- 正在堵塞的某個工作線程被解除堵塞,它從 socket 讀取數(shù)據(jù),并處理客戶請求,然后往 epoll 內核事件表中注冊該 socket 上的寫就緒事件。
- 當主線程調用 epoll_wait 等待 socket 可寫。
- 當 socket 可寫時,epoll_wait 通知主線程。主線程將 socket 可寫事件放入請求隊列。
- 堵塞的某個工作線程被解除堵塞,它往 socket 上寫入服務器處理客戶請求的結果。
代碼實現(xiàn)
main函數(shù)流程圖如圖所示:
初始化
創(chuàng)建線程池
// 定義一個線程池指針
threadpool<http_conn>* pool = NULL;
try {// 開辟一個線程池pool = new threadpool<http_conn>;
}catch(...)
{// 若異常則退出return 1;
}
使用new創(chuàng)建一個http_conn類型的線程池,返回的指針由pool接收。由于使用了 try……catch(...)
語句,因此如果遇到異常則退出
創(chuàng)建客戶端集合
// 開辟一塊連續(xù)的http_conn數(shù)組,保存所有正在連接的客戶端信息
http_conn* users = new http_conn[MAX_FD];
創(chuàng)建一個大小為 MAX_FD 的 http_conn 數(shù)組,當接收一個新 socket_fd 時,將會在 索引 i = socket的數(shù)組處(即 users[socket_fd]),初始化一個 http_conn 對象。這個對象保存著該客戶端的所有信息。
創(chuàng)建監(jiān)聽
// 設置監(jiān)聽
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
int ret = 0;
struct sockaddr_in address;
address.sin_addr.s_addr = INADDR_ANY; // 表示本機的所有IP都可以連接客戶端
address.sin_family = AF_INET; // 使用ipv4
address.sin_port = htons(port);// 設置端口復用
int reuse = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));// 綁定
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
if(ret == -1)
{
perror("bind");
exit(-1);
}// 開始監(jiān)聽
ret = listen(listenfd, 5);
if(ret == -1)
{
perror("listen");
exit(-1);
}
-
socket 創(chuàng)建一個套接字,AF_INET表示使用 ipv4,SOCK_STREAM表示使用流式協(xié)議,和后面的0搭配表示使用的是TCP協(xié)議
-
但此時的socket,沒有主機的端口號和ip信息,我們需要使用結構體address存儲主機的相關信息:如IP(
address.sin_addr.s_addr = INADDR_ANY
)、ipv4(address.sin_family = AF_INET
)、端口(address.sin_port = htons(port)
),其中htons()
表示 主機字節(jié)序 => 網(wǎng)絡字節(jié)序 ,因為不同的機器在內存中的存放字節(jié)的順序不同,我們需要統(tǒng)一標準 -
假設用戶先運行./server.out,關閉后再快速運行./server.out,會報錯:該端口被占用。這是因為結束./server,服務端處于FIN_WAIT狀態(tài),可以認為TCP通訊的TIME_WAIT時期需要等一小段時間。為了解決這個問題,常用
setsockopt()
表示開啟端口復用功能。 -
bind
將fd 和本地的IP和端口進行綁定,然后開始監(jiān)聽
epoll初始化
// 將listend添加到epoll模型中
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
addfd(epollfd, listenfd, false);
http_conn::m_epollfd = epollfd;
1.epoll_event用于檢測文件描述符發(fā)生了什么事情(如讀事件、寫事件等)
2.addfd
表示將listenfd加入epollfd中,這是一個自定義函數(shù)
epoll輪詢
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER,-1);
如果沒有數(shù)據(jù)發(fā)送給服務器,將一直堵塞再此處
事件處理
有新客戶端連接
// 有新的客戶端連接
if(sockfd == listenfd)
{struct sockaddr_in client_address;socklen_t client_addresslen = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addresslen);if(connfd < 0)
{printf("errno is %d\n", errno);continue;
}if(http_conn::m_user_count >= MAX_FD)
{close(connfd);continue;
}users[connfd].init(connfd, client_address);
}
可讀
// 有讀事件發(fā)生(可讀)
else if(events[i].events & EPOLLIN)
{// 有讀事件發(fā)生if(users[sockfd].read()){// 讀的到數(shù)據(jù)pool->append(users+sockfd);}else{// 讀不到數(shù)據(jù)users[sockfd].close_conn();}
}
可寫
// 有寫事件發(fā)生(可寫)
else if(events[i].events & EPOLLOUT)
{if(!users[sockfd].write()){users[sockfd].close_conn();}
}
結束
close(epollfd);
close(listenfd);
delete [] users;
delete pool;
return 0;
關閉所有的fd
完整代碼
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include "threadpool.hpp"
#include "locker.h"
#include "http_conn.h"
#include <signal.h>
#include <assert.h> #define MAX_FD 65536 // 最大的文件描述符
#define MAX_EVENT_NUMBER 10000 // 監(jiān)聽的最大事件數(shù)/* 函數(shù)指針的聲明: 類型說明符 (*函數(shù)名) (參數(shù))void(handler)(int) 聲明了一個名為 handler 的函數(shù)指針,它指向一個接受一個 int 參數(shù)并返回 void 的函數(shù)
*/
void addsig(int sig, void(handler)(int))
{// sigaction的輸入?yún)?shù)struct sigaction sa;// 指定sa內存區(qū)域的前n個字節(jié)都設置為某個特定的值('\0'),用于對新分配的內存進行初始化memset(&sa, '\0', sizeof(sa));// 寫入函數(shù)指針,指向的函數(shù)就是信號捕捉到之后的處理函數(shù)sa.sa_handler = handler;// 設置臨時阻塞信號集sigfillset(&sa.sa_mask);assert(sigaction(sig, &sa, NULL) != -1);
}int main(int argc, char* argv[])
{if(argc <= 1){// 要求輸入格式為 ./a.out 10000 其中10000是端口號 printf("usage: %s port_number\n", basename(argv[0]));return 1;}// 端口號 string -> intint port = atoi(argv[1]);// 如果向一個沒有讀端的管道寫數(shù)據(jù),不用終止進程addsig(SIGPIPE, SIG_IGN); // SIG_IGN: 忽略信號,這里指的是忽略信號 · SIGPIPE// 定義一個線程池指針threadpool<http_conn>* pool = NULL;try {// 開辟一個線程池pool = new threadpool<http_conn>;}catch(...){// 若異常則退出return 1;}// 開辟一塊連續(xù)的http_conn數(shù)組,保存所有正在連接的客戶端信息http_conn* users = new http_conn[MAX_FD];// 設置監(jiān)聽int listenfd = socket(AF_INET, SOCK_STREAM, 0);int ret = 0;struct sockaddr_in address;address.sin_addr.s_addr = INADDR_ANY;address.sin_family = AF_INET;address.sin_port = htons(port);// 設置端口復用int reuse = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));// 綁定ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));if(ret == -1){perror("bind");exit(-1);}// 開始監(jiān)聽ret = listen(listenfd, 5);if(ret == -1){perror("listen");exit(-1);}// 將listend添加到epoll模型中epoll_event events[MAX_EVENT_NUMBER];int epollfd = epoll_create(5);addfd(epollfd, listenfd, false);http_conn::m_epollfd = epollfd;while(1){// epoll輪詢,等待有數(shù)據(jù)發(fā)送int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER,-1);if((number < 0) && (errno != EINTR)){printf("epoll failture\n");break;}for(int i = 0; i < number; i++){int sockfd = events[i].data.fd;// 有新的客戶端連接if(sockfd == listenfd){struct sockaddr_in client_address;socklen_t client_addresslen = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addresslen);if(connfd < 0){printf("errno is %d\n", errno);continue;}if(http_conn::m_user_count >= MAX_FD){close(connfd);continue;}users[connfd].init(connfd, client_address);}// 若對方異常端開或錯誤else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){users[sockfd].close_conn();}// 有讀事件發(fā)生(可讀)else if(events[i].events & EPOLLIN){// 有讀事件發(fā)生if(users[sockfd].read()){// 讀的到數(shù)據(jù)pool->append(users+sockfd);}else{// 讀不到數(shù)據(jù)users[sockfd].close_conn();}}// 有寫事件發(fā)生(可寫)else if(events[i].events & EPOLLOUT){if(!users[sockfd].write()){users[sockfd].close_conn();}}}}close(epollfd);close(listenfd);delete [] users;delete pool;return 0;
}