網上有很多關于pos機服務器連接超時,c++實現基于reactor的高并發服務器的知識,也有很多人為大家解答關于pos機服務器連接超時的問題,今天pos機之家(www.www690aa.com)為大家整理了關于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
pos機服務器連接超時
基于Reactor的高并發服務器,分為反應堆模型,多線程,I/O模型,服務器,Http請求和響應五部分
全局
反應堆模型Channel描述了文件描述符以及讀寫事件,以及對應的讀寫銷毀回調函數,對應存儲arg讀寫回調對應的參數
Channel
Channel添加寫和判斷異或 |:相同為0,異為1按位與&:只有11為1,其它組合全部為0,即只有真真為真,其它一假則假去反 ~:二進制全部取反添加寫屬性:若對應為10 想要寫添加寫屬性,與100異或,的110讀寫屬性刪除寫屬性: 第三位清零,若為110,第三位清零,將寫取反011,在按位與& 010只留下讀事件// C++11 強類型枚舉enumclass FDEvent{TimeOut = 0x01, //十進制1,超時了 1ReadEvent = 0x02, //十進制2 10WriteEvent = 0x04//十進制4 二進制 100};void Channel::WriteEventEnable(bool flag){if (flag) //如果為真,添加寫屬性{// 異或 相同為0 異為1// WriteEvent 從右往左數第三個標志位1,通過異或 讓channel->events的第三位為1m_events |= static_cast<int>(FDEvent::WriteEvent); // 按位異或 int events整型32位,0/1,}else// 如果不寫,讓channel->events 對應的第三位清零{// ~WriteEvent 按位與, ~WriteEvent取反 011 然后與 channel->events按位與&運算 只有11 為 1,其它皆為0只有同為真時則真,一假則假,1為真,0為假m_events = m_events & ~static_cast<int>(FDEvent::WriteEvent); //channel->events 第三位清零之后,寫事件就不再檢測}}//判斷文件描述符是否有寫事件bool Channel::isWriteEventEnable(){return m_events & static_cast<int>(FDEvent::WriteEvent); //按位與 ,第三位都是1,則是寫,如果成立,最后大于0,如果不成立,最后為0}Dispatcher
Dispatcher作為父類函數,對應Epoll,Poll,Select模型。
反應堆模型
選擇反應堆模型在EventLoop初始化時,針對全局EventLoop,將m_dispatcher初始化為EpollDispatcher.
使用多態性,父類建立虛函數,子類繼承復函數,使用override取代父類虛函數。達到選擇反應堆模型。
m_dispatcher = new EpollDispatcher(this); //選擇模型//Dispatcher類為父類virtual ~Dispatcher(); //也虛函數,在多態時virtual int add(); //等于 = 0純虛函數,就不用定義//刪除 將某一個節點從epoll樹上刪除virtual int remove();//修改virtual int modify();//事件檢測, 用于檢測待檢測三者之一模型epoll_wait等的一系列事件上是否有事件被激活,讀/寫事件virtual int dispatch(int timeout = 2);//單位 S 超時時長//Epoll子類繼承父類,override多態性覆蓋父類函數,同時public繼承,繼承Dispatcher的私有變量class EpollDispatcher :public Dispatcher //繼承父類Dispatcher{public:EpollDispatcher(struct EventLoop* evLoop);~EpollDispatcher(); //也虛函數,在多態時// override修飾前面的函數,表示此函數是從父類繼承過來的函數,子類將重寫父類虛函數// override會自動對前面的名字進行檢查,int add() override; //等于 =純虛函數,就不用定義 //刪除 將某一個節點從epoll樹上刪除int remove() override;//修改int modify() override;//事件檢測, 用于檢測待檢測三者之一模型epoll_wait等的一系列事件上是否有事件被激活,讀/寫事件int dispatch(int timeout = 2) override;//單位 S 超時時長// 不改變的不寫,直接繼承父類EventLoop
處理所有的事件,啟動反應堆模型,處理機會文件描述符后的事件,添加任務,處理任務隊列 調用dispatcher中的添加移除,修改操作 存儲著任務隊列m_taskQ 存儲fd和對應channel對應關系:m_channelmap
相關視頻推薦
6種epoll的設計方法(單線程epoll、多線程epoll、多進程epoll)及應用場景
手把手寫一次reactor,為你的web服務器增加技術點
120行代碼實現線程池,實現異步操作,解決項目性能問題
需要C/C++ Linux服務器架構師學習資料加qun812855908獲?。ㄙY料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享
私有函數變量// CHannelElement結構體//定義任務隊列的節點 類型,文件描述符信息struct ChannelElement{ElemType type; //如何處理該節點中ChannelChannel* channel; //文件描述符信息};//私有函數變量//加入開關 EventLoop是否工作bool m_isQuit;//該指針指向之類的實例epoll,poll,selectDispatcher* m_dispatcher; //任務隊列,存儲任務,遍歷任務隊列就可以修改dispatcher檢測的文件描述符//任務隊列queue<ChannelElement*>m_taskQ;//map 文件描述符和Channel之間的對應關系 通過數組實現map<int,Channel*> m_channelmap;// 線程相關,線程ID,namethread::id m_threadID;string m_threadName; //主線程只有一個,固定名稱,初始化要分為兩個//互斥鎖,保護任務隊列mutex m_mutex;// 整型數組int m_socketPair[2]; //存儲本地通信fd通過socketpair初始化
EventLoop事件處理
m_channelmap
任務隊列ChannelElement
任務隊列
反應堆運行反應堆模型啟動之后將會在while循環中一直執行下去。首先調用dispatcher調用Epoll的wait函數,等待內核回應,根據其讀寫請求調用evLoop的enactive函數進行相關的讀寫操作。
int EventLoop::Run(){ m_isQuit = false; //不退出 //比較線程ID,當前線程ID與我們保存的線程ID是否相等 if (m_threadID != this_thread::get_id()) { //不相等時 直接返回-1 return-1; } // 循環進行時間處理 while (!m_isQuit) //只要沒有停止 死循環 { //調用初始化時選中的模型Epoll,Poll,Select m_dispatcher->dispatch(); // ProcessTaskQ(); //處理任務隊列 } return0;}enactive
根據傳入的event調用對應Channel對應的讀寫回調函數
int EventLoop::eventActive(int fd, int event){ // 判斷函數傳入的參數是否為有效 if (fd < 0) { return-1; } //基于fd從EventLoop取出對應的Channel Channel* channel = m_channelmap[fd]; //channelmap根據對應的fd取出對應的channel // 判斷取出channel的fd與當前的fd是否相同 assert(channel->getSocket() == fd); //如果為假,打印出報錯信息 if (event & (int)FDEvent::ReadEvent && channel->readCallback) //channel->readCallback不等于空 { //調用channel的讀回調函數 channel->readCallback(const_cast<void*>(channel->getArg())); } if (event & (int)FDEvent::WriteEvent && channel->writeCallback) { channel->writeCallback(const_cast<void*>(channel->getArg())); } return0;}添加任務
int EventLoop::AddTask(Channel* channel, ElemType type){ //加鎖,有可能是當前線程,也有可能是主線程 m_mutex.lock(); // 創建新節點 ChannelElement* node = new ChannelElement; node->channel = channel; node->type = type; m_taskQ.push(node); m_mutex.unlock(); // 處理節點 /* * 如當前EventLoop反應堆屬于子線程 * 1,對于鏈表節點的添加:可能是當前線程也可能是其它線程(主線程) * 1),修改fd的事件,可能是當前線程發起的,還是當前子線程進行處理 * 2),添加新的fd,和新的客戶端發起連接,添加任務節點的操作由主線程發起 * 2,主線程只負責和客戶端建立連接,判斷當前線程,不讓主線程進行處理,分給子線程 * 不能讓主線程處理任務隊列,需要由當前的子線程處理 */ if (m_threadID == this_thread::get_id()) { //當前子線程 // 直接處理任務隊列中的任務 ProcessTaskQ(); } else { //主線程 -- 告訴子線程處理任務隊列中的任務 // 1,子線程在工作 2,子線程被阻塞了:1,select,poll,epoll,如何解除其阻塞,在本代碼阻塞時長是2s // 在檢測集合中添加屬于自己(額外)的文件描述,不負責套接字通信,目的控制文件描述符什么時候有數據,輔助解除阻塞 // 滿足條件,兩個文件描述符,可以相互通信,//1,使用pipe進程間通信,進程更可,//2,socketpair 文件描述符進行通信 taskWakeup(); //主線程調用,相當于向socket添加了數據 } return0;}處理任務
從任務隊列中取出一個任務,根據其任務類型,調用反應堆模型對應,將channel在內核中的檢測進行刪除,修改,或添加
int EventLoop::ProcessTaskQ(){ //遍歷鏈表 while (!m_taskQ.empty()) { //將處理后的task從當前鏈表中刪除,(需要加鎖) // 取出頭結點 m_mutex.lock(); ChannelElement* node = m_taskQ.front(); //從頭部 m_taskQ.pop(); //把頭結點彈出,相當于刪除 m_mutex.unlock(); //讀鏈表中的Channel,根據Channel進行處理 Channel* channel = node->channel; // 判斷任務類型 if (node->type == ElemType::ADD) { // 需要channel里面的文件描述符evLoop里面的數據 //添加 -- 每個功能對應一個任務函數,更利于維護 Add(channel); } elseif (node->type == ElemType::DELETE) { //Debug("斷開了連接"); //刪除 Remove(channel); // 需要資源釋放channel 關掉文件描述符,地址堆內存釋放,channel和dispatcher的關系需要刪除 } elseif (node->type == ElemType::MODIFY) { //修改 的文件描述符事件 Modify(channel); } delete node; } return0;}int EventLoop::Add(Channel* channel){ //把任務節點中的任務添加到dispatcher對應的檢測集合里面, int fd = channel->getSocket(); //找到fd對應數組元素的位置,并存儲 if (m_channelmap.find(fd) == m_channelmap.end()) { m_channelmap.insert(make_pair(fd, channel)); //將當前fd和channel添加到map m_dispatcher->setChannel(channel); //設置當前channel int ret = m_dispatcher->add(); //加入 return ret; } return-1;}int EventLoop::Remove(Channel* channel){ //調用dispatcher的remove函數進行刪除 // 將要刪除的文件描述符 int fd = channel->getSocket(); // 判斷文件描述符是否已經在檢測的集合了 if (m_channelmap.find(fd) == m_channelmap.end()) { return-1; } //從檢測集合中刪除 封裝了poll,epoll select m_dispatcher->setChannel(channel); int ret = m_dispatcher->remove(); return ret;}int EventLoop::Modify(Channel* channel){ // 將要修改的文件描述符 int fd = channel->getSocket(); // TODO判斷 if (m_channelmap.find(fd) == m_channelmap.end()) { return-1; } //從檢測集合中刪除 m_dispatcher->setChannel(channel); int ret = m_dispatcher->modify(); return ret;}多線程ThreadPool
定義線程池,運行線程池,public函數取出線程池中某個子線程的反應堆實例EventLoop,線程池的EventLoop反應堆模型事件由主線程傳入,屬于主線程,其內部,任務隊列,fd和Channel對應關系,ChannelElement都是所有線程需要使用的數據
線程池工作
線程池運行創建子工作線程線程池運行語句在主線層運行,根據當前線程數量,申請響應的工作線程池,并將工作線程運行起來,將工作線程加入到線程池的vector數組當中。
void ThreadPool::Run(){assert(!m_isStart); //運行期間此條件不能錯//判斷是不是主線程if(m_mainLoop->getTHreadID() != this_thread::get_id()){exit(0);}// 將線程池設置狀態標志為啟動m_isStart = true;// 子線程數量大于0if (m_threadNum > 0){for (int i = 0; i < m_threadNum; ++i){WorkerThread* subThread = new WorkerThread(i); // 調用子線程subThread->Run();m_workerThreads.push_back(subThread);}}}取出工作線程池中的EventLoop
EventLoop* ThreadPool::takeWorkerEventLoop(){//由主線程來調用線程池取出反應堆模型assert(m_isStart); //當前程序必須是運行的//判斷是不是主線程if (m_mainLoop->getTHreadID() != this_thread::get_id()){exit(0);}//從線程池中找到一個子線層,然后取出里面的反應堆實例EventLoop* evLoop = m_mainLoop; //將主線程實例初始化if (m_threadNum > 0){evLoop = m_workerThreads[m_index]->getEventLoop();//雨露均沾,不能一直是一個pool->index線程m_index = ++m_index % m_threadNum;}return evLoop;}工作線程運行
在子線程中申請反應堆模型,供子線程在客戶端連接時取出 ,供類Connection使用
void WorkerThread::Run(){//創建子線程,3,4子線程的回調函數以及傳入的參數//調用的函數,以及此函數的所有者thism_thread = new thread(&WorkerThread::Running,this);// 阻塞主線程,讓當前函數不會直接結束,不知道當前子線程是否運行結束// 如果為空,子線程還沒有初始化完畢,讓主線程等一會,等到初始化完畢unique_lock<mutex> locker(m_mutex);while (m_evLoop == nullptr){m_cond.wait(locker);}}void* WorkerThread::Running(){m_mutex.lock();//對evLoop做初始化m_evLoop = new EventLoop(m_name);m_mutex.unlock();m_cond.notify_one(); //喚醒一個主線程的條件變量等待解除阻塞// 啟動反應堆模型m_evLoop->Run();}IO模型Buffer
讀寫內存結構體,添加字符串,接受套接字數據,將寫緩存區數據發送
讀寫位置移動
發送目錄int Buffer::sendData(int socket){// 判斷buffer里面是否有需要發送的數據 得到未讀數據即待發送int readable = readableSize();if (readable > 0){//發送出去buffer->data + buffer->readPos 緩存區的位置+已經讀到的位置// 管道破裂 -- 連接已經斷開,服務器繼續發數據,出現管道破裂 -- TCP協議// 當內核產生信號時,MSG_NOSIGNAL忽略,繼續保持連接// Linux的信號級別高,Linux大多數信號都會終止信號int count = send(socket, m_data + m_readPos, readable, MSG_NOSIGNAL);if (count > 0){// 往后移動未讀緩存區位置m_readPos += count;// 稍微休眠一下usleep(1); // 1微妙}return count;}return0;}發送文件
發送文件是不需要將讀取到的文件放入緩存的,直接內核發送提高文件IO效率。
int Buffer::sendData(int cfd, int fd, off_t offset, int size){int count = 0;while (offset < size){//系統函數,發送文件,linux內核提供的sendfile 也能減少拷貝次數// sendfile發送文件效率高,而文件目錄使用send//通信文件描述符,打開文件描述符,fd對應的文件偏移量一般為空,//單獨單文件出現發送不全,offset會自動修改當前讀取位置int ret = (int)sendfile(cfd, fd, &offset, (size_t)(size - offset));if (ret == -1 && errno == EAGAIN){printf("not data ....");perror("sendfile");}count += (int)offset;}return count;}TcpConnection
負責子線程與客戶端進行通信,分別存儲這讀寫銷毀回調函數->調用相關buffer函數完成相關的通信功能
TcpConnection
主線程
初始化申請讀寫緩存區,并初始化Channel,初始化子線程與客戶端與服務器進行通信時回調函數
TcpConnection::TcpConnection(int fd, EventLoop* evloop){//并沒有創建evloop,當前的TcpConnect都是在子線程中完成的m_evLoop = evloop;m_readBuf = new Buffer(10240); //10Km_writeBuf = new Buffer(10240);// 初始化m_request = new HttpRequest;m_response = new HttpResponse;m_name = "Connection-" + to_string(fd);// 服務器最迫切想知道的時候,客戶端有沒有數據到達m_channel =new Channel(fd,FDEvent::ReadEvent, processRead, processWrite, destory, this);// 把channel放到任務循環的任務隊列里面evloop->AddTask(m_channel, ElemType::ADD);}讀寫回調
讀事件將調用HttpRequest解析,客戶端發送的讀取請求。寫事件,將針對讀事件將對應的數據寫入緩存區,由寫事件進行發送。但由于效率的考慮,在讀事件時,已經設置成邊讀變發送提高效率,發送文件也將采用Linux內核提供的sendfile方法,不讀取內核直接發送,比send的效率快了,很多,在很大程度上,寫事件的寫功能基本被架空。
int TcpConnection::processRead(void* arg){TcpConnection* conn = static_cast<TcpConnection*>(arg);// 接受數據 最后要存儲到readBuf里面int socket = conn->m_channel->getSocket();int count = conn->m_readBuf->socketRead(socket);// data起始地址 readPos該讀的地址位置Debug("接收到的http請求數據: %s", conn->m_readBuf->data());if (count > 0){// 接受了http請求,解析http請求#ifdef MSG_SEND_AUTO//添加檢測寫事件conn->m_channel->writeEventEnable(true);// MODIFY修改檢測讀寫事件conn->m_evLoop->AddTask(conn->m_channel, ElemType::MODIFY);#endifbool flag = conn->m_request->parseHttpRequest(conn->m_readBuf, conn->m_response,conn->m_writeBuf, socket);if (!flag){//解析失敗,回復一個簡單的HTMLstring errMsg = "Http/1.1 400 Bad Request\\\\";conn->m_writeBuf->appendString(errMsg);}}else{#ifdef MSG_SEND_AUTO //如果被定義,//斷開連接conn->m_evLoop->AddTask(conn->m_channel, ElemType::DELETE);#endif}// 斷開連接 完全寫入緩存區再發送不能立即關閉,還沒有發送#ifndef MSG_SEND_AUTO //如果沒有被定義,conn->m_evLoop->AddTask(conn->m_channel, ElemType::DELETE);#endifreturn0;}//寫回調函數,處理寫事件,將writeBuf中的數據發送給客戶端int TcpConnection::processWrite(void* arg){Debug("開始發送數據了(基于寫事件發送)....");TcpConnection* conn = static_cast<TcpConnection*>(arg);// 發送數據int count = conn->m_writeBuf->sendData(conn->m_channel->getSocket());if (count > 0){// 判斷數據是否全部被發送出去if (conn->m_writeBuf->readableSize() == 0){// 數據發送完畢// 1,不再檢測寫事件 --修改channel中保存的事件conn->m_channel->writeEventEnable(false);// 2, 修改dispatcher中檢測的集合,往enentLoop反映模型認為隊列節點標記為modifyconn->m_evLoop->AddTask(conn->m_channel, ElemType::MODIFY);//3,若不通信,刪除這個節點conn->m_evLoop->AddTask(conn->m_channel, ElemType::DELETE);}}return0;}HttpRequest
定義http 請求結構體添加請求頭結點,解析請求行,頭,解析/處理http請求協議,獲取文件類型 發送文件/目錄 設置請求url,Method,Version ,state
處理客戶端解析請求在while循環內部,完成對請求行和請求頭的解析。解析完成之后,根據請求行,讀取客戶端需要的數據,并對應進行操作
bool HttpRequest::parseHttpRequest(Buffer* readBuf, HttpResponse* response, Buffer* sendBuf, int socket){bool flag = true;// 先解析請求行while (m_curState !=PressState::ParseReqDone){// 根據請求頭目前的請求狀態進行選擇switch (m_curState){case PressState::ParseReqLine:flag = parseRequestLine(readBuf);break;case PressState::ParseReqHeaders:flag = parseRequestHeader(readBuf);break;case PressState::ParseReqBody: //post的請求,咱不做處理// 讀取post數據break;default:break;}if (!flag){return flag;}//判斷是否解析完畢,如果完畢,需要準備回復的數據if (m_curState == PressState::ParseReqDone){// 1,根據解析出的原始數據,對客戶端請求做出處理processHttpRequest(response);// 2,組織響應數據并發送response->prepareMsg(sendBuf, socket);}}// 狀態還原,保證還能繼續處理第二條及以后的請求m_curState = PressState::ParseReqLine;//再解析請求頭return flag;}處理客戶端請求
根據請求行規則判斷是請求目錄,還是請求文件,調用Buffer相關發送目錄,和發送文件重載函數,完成通信任務。
bool HttpRequest::processHttpRequest(HttpResponse* response){if (strcasecmp(m_method.data(), "get") != 0) //strcasecmp比較時不區分大小寫{//非get請求不處理return-1;}m_url = decodeMsg(m_url); // 避免中文的編碼問題 將請求的路徑轉碼 linux會轉成utf8//處理客戶端請求的靜態資源(目錄或文件)constchar* file = NULL;if (strcmp(m_url.data(), "/") == 0) //判斷是不是根目錄{file = "./";}else{file = m_url.data() + 1; // 指針+1 把開始的 /去掉吧}//判斷file屬性,是文件還是目錄struct stat st;int ret = stat(file, &st); // file文件屬性,同時將信息傳入st保存了文件的大小if (ret == -1){//文件不存在 -- 回復404//sendHeadMsg(cfd, 404, "Not Found", getFileType(".html"), -1);//sendFile("404.html", cfd); //發送404對應的html文件response->setFileName("404.html");response->setStatusCode(StatusCode::NotFound);// 響應頭response->addHeader("Content-type", getFileType(".html"));response->sendDataFunc = sendFile;return0;}response->setFileName(file);response->setStatusCode(StatusCode::OK);//判斷文件類型if (S_ISDIR(st.st_mode)) //如果時目錄返回1,不是返回0{//把這個目錄中的內容發送給客戶端//sendHeadMsg(cfd, 200, "OK", getFileType(".html"), (int)st.st_size);//sendDir(file, cfd);// 響應頭response->addHeader("Content-type", getFileType(".html"));response->sendDataFunc = sendDir;}else{//把這個文件的內容發給客戶端//sendHeadMsg(cfd, 200, "OK", getFileType(file), (int)st.st_size);//sendFile(file, cfd);// 響應頭response->addHeader("Content-type", getFileType(file));response->addHeader("Content-length", to_string(st.st_size));response->sendDataFunc = sendFile;}returnfalse;}HttpResponse
定義http響應,添加響應頭,準備響應的數據
服務器TcpServer服務器類,復制服務器的初始化,設置監聽,啟動服務器,并接受主線程的連接請求
TcpServer工作流程
主函數傳入用戶輸入的端口和文件夾端口將作為服務器端口,文件夾將作為瀏覽器訪問的文件夾初始化TcpServer服務器實例 - 傳入端口和初始化線程個數運行服務器#include <stdlib.h>#include <unistd.h>#include "TcpServer.h"//初始化監聽的套接字// argc 輸入參數的個數// argv[0]可執行程序的名稱 // argv[1]傳入的第一個參數, port// argv[2]傳入的第二個參數 pathint main(int argc, char* argv[]){#if 0 if (argc < 3) { printf("./a.out port path\"); return-1; } unsigned short port = (unsigned short)atoi(argv[1]); //切換服務器的根目錄,將根目錄當前目錄切換到其它目錄 chdir(argv[2]); // 啟動服務器#else // VS code 調試 unsigned short port = 8080; chdir("/home/foryouos/blog");#endif // 創建服務器實例 TcpServer* server = new TcpServer(port, 4); // 服務器運行 - 啟動線程池-對監聽的套接字進行封裝,并放到主線程的任務隊列,啟動反應堆模型 // 底層的epoll也運行起來, server->Run(); return0;}初始化TcpServer啟動TcpServer檢測到客戶端請求
詳細代碼:https://github.com/foryouos/cppserver-linux/tree/main/c_simple_server/cpp_server
以上就是關于pos機服務器連接超時,c++實現基于reactor的高并發服務器的知識,后面我們會繼續為大家整理關于pos機服務器連接超時的知識,希望能夠幫助到大家!









