作者:Yoho
转载注明出处
TCP Server继承自ITcpServer类和CIOHandler类
直接从Start开撸
bool TcpServerService::Start(LPCTSTR lpszBindAddress, USHORT usPort) {
//检查参数
if(!CheckParams() || !CheckStarting())
{
return false;
}
//启动事项
PrepareStart();
//创建监听Socket
if(CreateListenSocket(lpszBindAddress,usPort)){
//创建工作线程
if(CreateWorkerThreads()){
//开始接收
if(StartAccept()){
m_enState = SS_STARTED;
return true;
}
}
}
EXECUTE_RESTORE_ERROR(Stop());
return false;
}
首先检查启动参数,然后检查是否正在运行 然后调用PrepareStart(可覆盖)函数
然后就开始创建Socket:
bool TcpServerService::CreateListenSocket(LPCTSTR lpszBindAddress,USHORT usPort){
//地址是否为空
if(::IsStrEmpty(lpszBindAddress)){
lpszBindAddress = DEFAULT_IPV4_BIND_ADDRESS;//默认绑定0.0.0.0
}
HP_SOCKADDR addr;
if(::sockaddr_A_2_IN(lpszBindAddress,usPort,addr)){
m_soListen = socket(addr.family,SOCK_STREAM,IPPROTO_TCP); //创建Socket
if(m_soListen != INVALID_SOCKET){ //如果Socket返回值不是-1
::fcntl_SETFL(m_soListen, O_NONBLOCK | O_CLOEXEC);//定义非阻塞;
bool bOnOff = (m_dwKeepAliveTime > 0 && m_dwKeepAliveInterval > 0);
//设置Socket KeepAliveTime
VERIFY(IS_NO_ERROR(::SSO_KeepAliveVals(m_soListen,bOnOff,m_dwKeepAliveTime,m_dwKeepAliveInterval)));
VERIFY(IS_NO_ERROR(::SSO_ReuseAddress(m_soListen)));
//绑定地址
if(::bind(m_soListen,addr.Addr(),addr.AddrSize()) != SOCKET_ERROR){
if(TRIGGER(FirePrepareListen(m_soListen)) != HR_ERROR){
if(::listen(m_soListen,m_dwSocketListenQueue) != SOCKET_ERROR){
return true;
}else{
SetLastError(SE_SOCKET_LISTEN,__FUNCTION__,::WSAGetLastError());
}
}else{
SetLastError(SE_SOCKET_PREPARE,__FUNCTION__,ENSURE_ERROR_CANCELLED);
}
}else{
SetLastError(SE_SOCKET_BIND,__FUNCTION__,::WSAGetLastError());
}
}else{
SetLastError(SE_SOCKET_CREATE,__FUNCTION__,::WSAGetLastError());
}
}else{
SetLastError(SE_SOCKET_CREATE,__FUNCTION__,::WSAGetLastError());
}
return false;
}
首先判断绑定地址和端口是否合法 如果合法则给addr赋值结构(HP_SOCKADDR)
然后调用socket 来创建套接字 然后通过fnctl_SETFL 来设置非阻塞O_NONBLOCK | O_CLOEXEC
再通过bind 来绑定端口
通过listen来监听端口
注:Firexxx等函数都可以被继承覆盖
创建完套接字就开始创建工作线程
//创建工作线程
bool TcpServerService::CreateWorkerThreads() {
if(!m_ioDispatcher.Start(this,m_dwAcceptSocketCount,m_dwWorkerThreadCount)){
return false;//创建失败
}
const CIODispatcher::CWorkerThread* pWorkerThread = m_ioDispatcher.GetWorkerThreads();
for(DWORD i = 0;i<m_dwWorkerThreadCount;i++){
m_rcBufferMap[pWorkerThread[i].GetThreadID()] = new CBufferPtr(m_dwSocketBufferSize);
}
return true;
}
调用IODispatcher来创建线程 然后再通过GetWorkerThreads来获取工作线程 并且遍历把他放在rcBufferMap变量(unordered_map)给每个线程开辟Buffer
我们来看CIODispatcher::Start
BOOL CIODispatcher::Start(IIOHandler* pHandler, int iWorkerMaxEvents, int iWorkers, LLONG llTimerInterval)
{
ASSERT_CHECK_EINVAL(pHandler && iWorkerMaxEvents >= 0 && iWorkers >= 0);
CHECK_ERROR(!HasStarted(), ERROR_INVALID_STATE);
if(iWorkerMaxEvents == 0) iWorkerMaxEvents = DEF_WORKER_MAX_EVENTS;
if(iWorkers == 0) iWorkers = DEFAULT_WORKER_THREAD_COUNT;
m_iMaxEvents = iWorkerMaxEvents;
m_iWorkers = iWorkers;
m_pHandler = pHandler;
m_epoll = epoll_create1(EPOLL_CLOEXEC);
CHECK_ERROR_FD(m_epoll);
m_evCmd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if(IS_INVALID_FD(m_evCmd))
goto START_ERROR;
if(!VERIFY(AddFD(m_evCmd, EPOLLIN | EPOLLET, &m_evCmd)))
goto START_ERROR;
m_evExit = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC | EFD_SEMAPHORE);
if(IS_INVALID_FD(m_evExit))
goto START_ERROR;
if(!VERIFY(AddFD(m_evExit, EPOLLIN, &m_evExit)))
goto START_ERROR;
if(llTimerInterval > 0)
{
m_evTimer = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);;
if(IS_INVALID_FD(m_evTimer))
goto START_ERROR;
itimerspec its;
::MillisecondToTimespec(llTimerInterval, its.it_value);
::MillisecondToTimespec(llTimerInterval, its.it_interval);
if(!VERIFY_IS_NO_ERROR(timerfd_settime(m_evTimer, 0, &its, nullptr)))
goto START_ERROR;
if(!VERIFY(AddFD(m_evTimer, EPOLLIN | EPOLLET, &m_evTimer)))
goto START_ERROR;
}
sigset_t ss;
sigemptyset(&ss);
sigaddset(&ss, SIGPIPE);
VERIFY_IS_NO_ERROR(pthread_sigmask(SIG_BLOCK, &ss, nullptr));
m_pWorkers = make_unique<CWorkerThread[]>(m_iWorkers);
for(int i = 0; i < m_iWorkers; i++)
{
if(!VERIFY(m_pWorkers[i].Start(this, &CIODispatcher::WorkerProc)))
goto START_ERROR;
}
return TRUE;
START_ERROR:
EXECUTE_RESTORE_ERROR(Stop(FALSE));
return FALSE;
}
这里就是HP-Socket的精髓了 这里是与系统内核IO进行监听的地方 在linux下用的是Epoll 如果想在Unix下使用可以自行更改为Kqueue 在Windows下是使用IOCP 这里是分别创建了三个线程通讯eventfd 分别是命令信号/退出信号/定时器信号
然后每个线程执行CIODispatcher::WorkerProc
此时我们看到WorkerProc
int CIODispatcher::WorkerProc(PVOID pv)
{
BOOL bRun = TRUE;
unique_ptr<epoll_event[]> pEvents = make_unique<epoll_event[]>(m_iMaxEvents);
while(bRun)
{
int rs = NO_EINTR_INT(epoll_pwait(m_epoll, pEvents.get(), m_iMaxEvents, INFINITE, nullptr));
if(rs <= TIMEOUT)
ERROR_ABORT();
for(int i = 0; i < rs; i++)
{
UINT events = pEvents[i].events;
PVOID ptr = pEvents[i].data.ptr;
if(ptr == &m_evCmd)
ProcessCommand(events);
else if(ptr == &m_evTimer)
ProcessTimer(events);
else if(ptr == &m_evExit)
bRun = ProcessExit(events);
else
ProcessIo(ptr, events);
}
}
m_pHandler->OnDispatchThreadEnd(SELF_THREAD_ID);
return 0;
}
通过epoll_pwait来捕获事件 并且事件总数
然后再用for来执行判断事件类型是命令/定时器/退出/内核事件(Socket)
此时工作线程已创建完毕
然后就开始StartAccept 接收
BOOL CTcpServer::StartAccept()
{
return m_ioDispatcher.AddFD(m_soListen, _EPOLL_READ_EVENTS | EPOLLET, TO_PVOID(&m_soListen));
}
把Socket加入epoll 让epoll 捕获accept 事件
然后如果有人连接Socket 则会被WorkProc里面的epoll_wait所捕获我们再回到那里
if(ptr == &m_evCmd)
ProcessCommand(events);
else if(ptr == &m_evTimer)
ProcessTimer(events);
else if(ptr == &m_evExit)
bRun = ProcessExit(events);
else
ProcessIo(ptr, events);
此时 如果是Accept事件的话 是会触发ProcessIo的 我们再进入里面
BOOL CIODispatcher::ProcessIo(PVOID pv, UINT events)
{
if(!m_pHandler->OnBeforeProcessIo(pv, events))
return FALSE;
BOOL rs = DoProcessIo(pv, events);
m_pHandler->OnAfterProcessIo(pv, events, rs);
return rs;
}
这里首先是调用BeforeProcessIo m_pHandler(TcpServer)
BOOL CTcpServer::OnBeforeProcessIo(PVOID pv, UINT events)
{
if(pv == &m_soListen)
{
HandleAccept(events);
return FALSE;
}
TSocketObj* pSocketObj = (TSocketObj*)(pv);
if(!TSocketObj::IsValid(pSocketObj))
return FALSE;
pSocketObj->csIo.Lock();
if(!TSocketObj::IsValid(pSocketObj))
{
pSocketObj->csIo.Unlock();
return FALSE;
}
return TRUE;
}
则执行HandleAccept 否则 的话 则把对象转化成SocketObj 然后执行互斥锁来进行Socket操作
我们看下HandleAccept函数
BOOL CTcpServer::HandleAccept(UINT events)
{
if(events & _EPOLL_ALL_ERROR_EVENTS)
{
VERIFY(!HasStarted());
return FALSE;
}
while(TRUE)
{
HP_SOCKADDR addr;
socklen_t addrLen = (socklen_t)addr.AddrSize();
SOCKET soClient = ::accept(m_soListen, addr.Addr(), &addrLen);
if(soClient == INVALID_SOCKET)
{
int code = ::WSAGetLastError();
if(code == ERROR_WOULDBLOCK)
return TRUE;
else if(code == ERROR_CONNABORTED)
continue;
else if(code == ERROR_HANDLES_CLOSED)
return FALSE;
ERROR_EXIT2(EXIT_CODE_SOFTWARE, code);
}
VERIFY(::fcntl_SETFL(soClient, O_NOATIME | O_NONBLOCK | O_CLOEXEC));
CONNID dwConnID = 0;
if(!m_bfActiveSockets.AcquireLock(dwConnID))
{
::ManualCloseSocket(soClient, SHUT_RDWR);
continue;
}
TSocketObj* pSocketObj = GetFreeSocketObj(dwConnID, soClient);
addr.Copy(pSocketObj->remoteAddr);
AddClientSocketObj(dwConnID, pSocketObj);
if(TRIGGER(FireAccept(pSocketObj)) == HR_ERROR)
{
AddFreeSocketObj(pSocketObj, SCF_NONE);
continue;
}
UINT evts = (pSocketObj->IsPending() ? EPOLLOUT : 0) | (pSocketObj->IsPaused() ? 0 : EPOLLIN);
VERIFY(m_ioDispatcher.AddFD(pSocketObj->socket, evts | EPOLLRDHUP | EPOLLONESHOT, pSocketObj));
}
return TRUE;
}
此时会进入一个循环 确保所有待处理的Socket全部被Accept
accept返回过来的客户socket 首先会进行判断 如果是wouldblock则说明接收完成然后返回 connaborted 则说明要再执行一次
hanldeclosed说明Socket关闭
然后执行fcntl_SETFL(soClient, O_NOATIME | O_NONBLOCK | O_CLOEXEC) 来设置非阻塞套接字
然后通过m_bfActiveSockets.AcquireLock 来获取一个空余的ConnId
如果获取失败则关闭Socket(已满)
然后再通过GetFreeSocketObj 来生成一个空的SocketObj结构
TSocketObj* CTcpServer::GetFreeSocketObj(CONNID dwConnID, SOCKET soClient)
{
DWORD dwIndex;
TSocketObj* pSocketObj = nullptr;
if(m_lsFreeSocket.TryLock(&pSocketObj, dwIndex))
{
if(::GetTimeGap32(pSocketObj->freeTime) >= m_dwFreeSocketObjLockTime)
VERIFY(m_lsFreeSocket.ReleaseLock(nullptr, dwIndex));
else
{
VERIFY(m_lsFreeSocket.ReleaseLock(pSocketObj, dwIndex));
pSocketObj = nullptr;
}
}
if(!pSocketObj) pSocketObj = CreateSocketObj();
pSocketObj->Reset(dwConnID, soClient);
return pSocketObj;
}
TSocketObj* CTcpServer::CreateSocketObj()
{
TSocketObj* pSocketObj = (TSocketObj*)m_phSocket.Alloc(sizeof(TSocketObj));
ASSERT(pSocketObj);
return new (pSocketObj) TSocketObj(m_bfObjPool);
}
此处首先TryLock 互斥体一下 如果ID对应有存在的SocketObj则释放SocketObj 然后再通过CreateSocketObj()创建SocketObj
TSocketObj* CTcpServer::CreateSocketObj()
{
TSocketObj* pSocketObj = (TSocketObj*)m_phSocket.Alloc(sizeof(TSocketObj));
ASSERT(pSocketObj);
return new (pSocketObj) TSocketObj(m_bfObjPool);
}
这里通过Alloc(PrivateHeap这是一个专门负责管理内存的类)来生成对象 此时SocketObj已生成完毕 再回到HandleAccept中
通过addr.Copy 方法 把accept过来的地址赋值给pSocketObj里面的remoteAddr
然后再通过AddClientSocketObj加入客户Socket堆中(把SocketObj的地址 放到m_pv的偏移上面去)
然后调用FireAccept执行用户层接口 如果用户层接口返回HR_ERROR则把SocketObj放入FreeSocketObj等待回收处理
否则把他加入epoll监听 此时就已经完成了Accept事件的处理
当把客户的Socket加入epoll监听 当客户的Socket 触发Recvie Write等也会触发Epoll回调具体我们可以回到这句代码
BOOL CIODispatcher::ProcessIo(PVOID pv, UINT events)
{
if(!m_pHandler->OnBeforeProcessIo(pv, events))
return FALSE;
BOOL rs = DoProcessIo(pv, events);
m_pHandler->OnAfterProcessIo(pv, events, rs);
return rs;
}
刚刚我们分析完OnBeforeProcessIo 这次我们看DoProcessIo
BOOL CIODispatcher::DoProcessIo(PVOID pv, UINT events)
{
if(events & EPOLLERR)
return m_pHandler->OnError(pv, events);
if((events & EPOLLPRI) && !m_pHandler->OnReadyPrivilege(pv, events))
return FALSE;
if((events & EPOLLIN) && !m_pHandler->OnReadyRead(pv, events))
return FALSE;
if((events & EPOLLOUT) && !m_pHandler->OnReadyWrite(pv, events))
return FALSE;
if((events & (_EPOLL_HUNGUP_EVENTS)) && !m_pHandler->OnHungUp(pv, events))
return FALSE;
return TRUE;
}
这里通过判断事件类型分发具体函数 可以看到如果是事件类型是EPOLLIN 的话 则调用OnReadyRead 如果是EPOLLOUT 则调用OnReadyWrite
BOOL CTcpServer::OnReadyRead(PVOID pv, UINT events)
{
return HandleReceive((TSocketObj*)pv, RETRIVE_EVENT_FLAG_H(events));
}
BOOL CTcpServer::HandleReceive(TSocketObj* pSocketObj, int flag)
{
ASSERT(TSocketObj::IsValid(pSocketObj));
if(m_bMarkSilence) pSocketObj->activeTime = ::TimeGetTime();
CBufferPtr& buffer = *(m_rcBufferMap[SELF_THREAD_ID]);
//取当前线程缓冲区
int reads = flag ? -1 : MAX_CONTINUE_READS;
for(int i = 0; i < reads || reads < 0; i++)
{
int rc = (int)read(pSocketObj->socket, buffer.Ptr(), buffer.Size());
//读Socket到Buffer
if(rc > 0)
{
//调用FireReceive接口
if(TRIGGER(FireReceive(pSocketObj, buffer.Ptr(), rc)) == HR_ERROR)
{
TRACE("<C-CNNID: %Iu> OnReceive() event return 'HR_ERROR', connection will be closed !", pSocketObj->connID);
AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, ENSURE_ERROR_CANCELLED);
return FALSE;
}
}//如果返回字节为0则释放Socket
else if(rc == 0)
{
AddFreeSocketObj(pSocketObj, SCF_CLOSE, SO_RECEIVE, SE_OK);
return FALSE;
}
else
{//小于0则记录报错信息以及释放Socket
ASSERT(rc == SOCKET_ERROR);
int code = ::WSAGetLastError();
if(code == ERROR_WOULDBLOCK)
break;
AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, code);
return FALSE;
}
}
return TRUE;
}
FireReceive调用接口层的OnReceive
这次我们再看到OnReadyWrite
BOOL CTcpServer::OnReadyWrite(PVOID pv, UINT events)
{
return HandleSend((TSocketObj*)pv, RETRIVE_EVENT_FLAG_H(events));
}
BOOL CTcpServer::HandleSend(TSocketObj* pSocketObj, int flag)
{
ASSERT(TSocketObj::IsValid(pSocketObj));
//是否有内容
if(!pSocketObj->IsPending())
return TRUE;
//下锁
CReentrantCriSecLock locallock(pSocketObj->csSend);
//是否有内容
if(!pSocketObj->IsPending())
return TRUE;
BOOL isOK = TRUE;
int writes = flag ? -1 : MAX_CONTINUE_WRITES;
TBufferObjList& sndBuff = pSocketObj->sndBuff;
//获取Buff队列
for(int i = 0; i < writes || writes < 0; i++)
{
//队列出栈
TItemPtr itPtr(sndBuff, sndBuff.PopFront());
//判断是否有效
if(!itPtr.IsValid())
break;
//是否为空
ASSERT(!itPtr->IsEmpty());
//队列发送
isOK = SendItem(pSocketObj, itPtr);
if(!isOK)
break;
//如果没发送完
if(!itPtr->IsEmpty())
{//重新加入队列
sndBuff.PushFront(itPtr.Detach());
break;
}
}
return isOK;
}
BOOL CTcpServer::SendItem(TSocketObj* pSocketObj, TItem* pItem)
{
//内容是否为空 如果没发完就一直执行循环直到发送完毕为止
while(!pItem->IsEmpty())
{//write写入
int rc = (int)write(pSocketObj->socket, pItem->Ptr(), pItem->Size());
if(rc > 0)
{//调用FireSend接口
if(TRIGGER(FireSend(pSocketObj, pItem->Ptr(), rc)) == HR_ERROR)
{
TRACE("<C-CNNID: %Iu> OnSend() event should not return 'HR_ERROR' !!", pSocketObj->connID);
ASSERT(FALSE);
}
//减去发送长度
pItem->Reduce(rc);
}
else if(rc == SOCKET_ERROR)
{
int code = ::WSAGetLastError();
if(code == ERROR_WOULDBLOCK)
break;
AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_SEND, code);
return FALSE;
}
else
ASSERT(FALSE);
}
return TRUE;
}
此时接收 发送事件监听已完成 我们再看下服务器主动发送事件
BOOL CTcpServer::Send(CONNID dwConnID, const BYTE* pBuffer, int iLength, int iOffset)
{//发送事件
ASSERT(pBuffer && iLength > 0);
if(iOffset != 0) pBuffer += iOffset;
//组装Buffer
WSABUF buffer;
buffer.len = iLength;
buffer.buf = (BYTE*)pBuffer;
//发送
return SendPackets(dwConnID, &buffer, 1);
}
virtual BOOL SendPackets (CONNID dwConnID, const WSABUF pBuffers[], int iCount) {return DoSendPackets(dwConnID, pBuffers, iCount);}
BOOL CTcpServer::DoSendPackets(CONNID dwConnID, const WSABUF pBuffers[], int iCount)
{
ASSERT(pBuffers && iCount > 0);
//找到SocketObj
TSocketObj* pSocketObj = FindSocketObj(dwConnID);
if(!TSocketObj::IsValid(pSocketObj))
{
::SetLastError(ERROR_OBJECT_NOT_FOUND);
return FALSE;
}
return DoSendPackets(pSocketObj, pBuffers, iCount);
}
BOOL CTcpServer::DoSendPackets(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount)
{
ASSERT(pSocketObj && pBuffers && iCount > 0);
int result = NO_ERROR;
//Buffer合法性检验
if(pBuffers && iCount > 0)
{//下锁
CReentrantCriSecLock locallock(pSocketObj->csSend);
if(TSocketObj::IsValid(pSocketObj))
result = SendInternal(pSocketObj, pBuffers, iCount);
else
result = ERROR_OBJECT_NOT_FOUND;
}
else
result = ERROR_INVALID_PARAMETER;
if(result != NO_ERROR)
::SetLastError(result);
return (result == NO_ERROR);
}
int CTcpServer::SendInternal(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount)
{
int iPending = pSocketObj->Pending();
//加入队列
for(int i = 0; i < iCount; i++)
{
int iBufLen = pBuffers[i].len;
if(iBufLen > 0)
{
BYTE* pBuffer = (BYTE*)pBuffers[i].buf;
ASSERT(pBuffer);
//加入SocketObj的Buffer队列
pSocketObj->sndBuff.Cat(pBuffer, iBufLen);
}
}
//如果一开始队列为0则说明没有队列没有事件 无需处理 后来新增的要手动触发Epoll事件
if(iPending == 0 && pSocketObj->IsPending())
{
//调用
if(!m_ioDispatcher.SendCommand(DISP_CMD_SEND, pSocketObj->connID))
return ::GetLastError();
}
return NO_ERROR;
}
BOOL CIODispatcher::SendCommand(USHORT t, UINT_PTR wp, UINT_PTR lp)
{
return SendCommand(TDispCommand::Construct(t, wp, lp));
}
BOOL CIODispatcher::SendCommand(TDispCommand* pCmd)
{
m_queue.PushBack(pCmd);
return VERIFY_IS_NO_ERROR(eventfd_write(m_evCmd, 1)); //手动触发eventfd
}
我们回到WorkProc
int CIODispatcher::WorkerProc(PVOID pv)
{
BOOL bRun = TRUE;
unique_ptr<epoll_event[]> pEvents = make_unique<epoll_event[]>(m_iMaxEvents);
while(bRun)
{
int rs = NO_EINTR_INT(epoll_pwait(m_epoll, pEvents.get(), m_iMaxEvents, INFINITE, nullptr));
if(rs <= TIMEOUT)
ERROR_ABORT();
for(int i = 0; i < rs; i++)
{
UINT events = pEvents[i].events;
PVOID ptr = pEvents[i].data.ptr;
if(ptr == &m_evCmd)
ProcessCommand(events); //触发这里
else if(ptr == &m_evTimer)
ProcessTimer(events);
else if(ptr == &m_evExit)
bRun = ProcessExit(events);
else
ProcessIo(ptr, events);
}
}
m_pHandler->OnDispatchThreadEnd(SELF_THREAD_ID);
return 0;
}
BOOL CIODispatcher::ProcessCommand(UINT events)
{
if(events & _EPOLL_ALL_ERROR_EVENTS)
ERROR_ABORT();
if(!(events & EPOLLIN))
return FALSE;
BOOL isOK = TRUE;
eventfd_t v;
int rs = eventfd_read(m_evCmd, &v);//读
if(IS_NO_ERROR(rs))
{
ASSERT(v > 0);
TDispCommand* pCmd = nullptr;
//队列出栈
while(m_queue.PopFront(&pCmd))
{
m_pHandler->OnCommand(pCmd);//触发接口事件
TDispCommand::Destruct(pCmd);释放内存
}
}
else if(IS_HAS_ERROR(rs))
{
ASSERT(IS_WOULDBLOCK_ERROR());
isOK = FALSE;
}
return isOK;
}
VOID CTcpServer::OnCommand(TDispCommand* pCmd)
{
switch(pCmd->type)
{
case DISP_CMD_SEND:
HandleCmdSend((CONNID)(pCmd->wParam));
break;
case DISP_CMD_UNPAUSE:
HandleCmdUnpause((CONNID)(pCmd->wParam));
break;
case DISP_CMD_DISCONNECT:
HandleCmdDisconnect((CONNID)(pCmd->wParam), (BOOL)pCmd->lParam);
break;
}
}
VOID CTcpServer::HandleCmdSend(CONNID dwConnID)
{
//获得SocketObj
TSocketObj* pSocketObj = FindSocketObj(dwConnID);
if(TSocketObj::IsValid(pSocketObj) && pSocketObj->IsPending())
手动调用ProcessIo
m_ioDispatcher.ProcessIo(pSocketObj, EPOLLOUT);
}
则此时服务端主动发送则完成 TcpServer大致到这里