分类 内核 下的文章

HPSocket的SSLServer是继承与TcpServer 所实现的 算法部分由OpenSSL来实现 所以使用SSL必须确保OpenSSL支持库存在
首先要调用SetupSSLContext来记性SSL初始化

virtual BOOL SetupSSLContext(int iVerifyMode = SSL_VM_NONE, LPCTSTR lpszPemCertFile = nullptr, LPCTSTR lpszPemKeyFile = nullptr, LPCTSTR lpszKeyPasswod = nullptr, LPCTSTR lpszCAPemCertFileOrPath = nullptr, Fn_SNI_ServerNameCallback fnServerNameCallback = nullptr)
        {return m_sslCtx.Initialize(SSL_SM_SERVER, iVerifyMode, lpszPemCertFile, lpszPemKeyFile, lpszKeyPasswod, lpszCAPemCertFileOrPath, fnServerNameCallback);}
//会调用CCSSLContext类

BOOL CSSLContext::Initialize(EnSSLSessionMode enSessionMode, int iVerifyMode, LPCTSTR lpszPemCertFile, LPCTSTR lpszPemKeyFile, LPCTSTR lpszKeyPasswod, LPCTSTR lpszCAPemCertFileOrPath, HP_Fn_SNI_ServerNameCallback fnServerNameCallback)
{
    ASSERT(!IsValid());
//如果SSL已被初始化过了
    if(IsValid())
    {
        ::SetLastError(ERROR_INVALID_STATE);
        return FALSE;
    }
//设置SSL模式
    m_enSessionMode    = enSessionMode;
//增加上下文
    if(AddContext(iVerifyMode, lpszPemCertFile, lpszPemKeyFile, lpszKeyPasswod, lpszCAPemCertFileOrPath) == 0)
//取第0号位(第一个)
        m_sslCtx = GetContext(0);
    else
    {
//清空
        Cleanup();
        return FALSE;
    }

    SetServerNameCallback(fnServerNameCallback);
//设置服务器名称回调
    return TRUE;
}
BOOL CSSLServer::CheckParams()
{
    if(!m_sslCtx.IsValid())
    {
        SetLastError(SE_SSL_ENV_NOT_READY, __FUNCTION__, ERROR_NOT_READY);
        return FALSE;
    }

    return __super::CheckParams();
}
void CSSLServer::PrepareStart()
{
    __super::PrepareStart();

    m_sslPool.SetItemCapacity    (GetSocketBufferSize());
    m_sslPool.SetItemPoolSize    (GetFreeBufferObjPool());
    m_sslPool.SetItemPoolHold    (GetFreeBufferObjHold());
    m_sslPool.SetSessionLockTime(GetFreeSocketObjLockTime());
    m_sslPool.SetSessionPoolSize(GetFreeSocketObjPool());
    m_sslPool.SetSessionPoolHold(GetFreeSocketObjHold());

    m_sslPool.Prepare();
}
int CSSLContext::AddContext(int iVerifyMode, LPCTSTR lpszPemCertFile, LPCTSTR lpszPemKeyFile, LPCTSTR lpszKeyPasswod, LPCTSTR lpszCAPemCertFileOrPath)
{
    int iIndex        = -1;
    SSL_CTX* sslCtx    = SSL_CTX_new(SSLv23_method());
//初始化上下文
    SSL_CTX_set_quiet_shutdown(sslCtx, 1);
//当设置为1时,假如关闭后,不通知对方,这样不适合TLS标准 ?
    SSL_CTX_set_verify(sslCtx, iVerifyMode, nullptr);
//设置认证模式
    SSL_CTX_set_cipher_list(sslCtx, "ALL:!aNULL:!eNULL");
//设置认证算法列表
    if(m_enSessionMode == SSL_SM_SERVER)
    {//如果是服务器
        static volatile ULONG s_session_id_context = 0;
        ULONG session_id_context = ::InterlockedIncrement(&s_session_id_context);
//互斥增加
        SSL_CTX_set_session_id_context(sslCtx, (BYTE*)&session_id_context, sizeof(session_id_context));
//增加进sessionid
    }
//加载证书或者密钥
    if(!LoadCertAndKey(sslCtx, iVerifyMode, lpszPemCertFile, lpszPemKeyFile, lpszKeyPasswod, lpszCAPemCertFileOrPath))//失败释放
        SSL_CTX_free(sslCtx);
    else
    {//加入Vector
        iIndex = (int)m_lsSslCtxs.size();
        m_lsSslCtxs.push_back(sslCtx);
    }
    
    return iIndex;
}


BOOL CSSLContext::LoadCertAndKey(SSL_CTX* sslCtx, int iVerifyMode, LPCTSTR lpszPemCertFile, LPCTSTR lpszPemKeyFile, LPCTSTR lpszKeyPasswod, LPCTSTR lpszCAPemCertFileOrPath)
{
    USES_CONVERSION;

    if(lpszCAPemCertFileOrPath != nullptr)
    {
        LPCTSTR lpszCAPemCertFile = nullptr;
        LPCTSTR lpszCAPemCertPath = nullptr;

        CFile fCAPemCertFile(lpszCAPemCertFileOrPath, O_RDONLY | O_CLOEXEC);

        if(!fCAPemCertFile.IsExist())
        {
            ::SetLastError(ERROR_FILE_NOT_FOUND);
            return FALSE;
        }

        if(fCAPemCertFile.IsFile())
            lpszCAPemCertFile = lpszCAPemCertFileOrPath;
        else if(fCAPemCertFile.IsDirectory())
            lpszCAPemCertPath = lpszCAPemCertFileOrPath;
        else
        {
            ::SetLastError(ERROR_BAD_FILE_TYPE);
            return FALSE;
        }
//加载认证目录
        if(!SSL_CTX_load_verify_locations(sslCtx, T2CA(lpszCAPemCertFile), T2CA(lpszCAPemCertPath)))
        {
            ::SetLastError(ERROR_INVALID_DATA);
            return FALSE;
        }
//加载
        if(!SSL_CTX_set_default_verify_paths(sslCtx))
        {
            ::SetLastError(ERROR_FUNCTION_FAILED);
            return FALSE;
        }
//判断服务器模式
        if(m_enSessionMode == SSL_SM_SERVER && iVerifyMode & SSL_VM_PEER)
        {
            STACK_OF(X509_NAME)* caCertNames = SSL_load_client_CA_file(T2CA(lpszCAPemCertFileOrPath));

            if(caCertNames == nullptr)
            {
                ::SetLastError(ERROR_EMPTY);
                return FALSE;
            }
//CA列表
            SSL_CTX_set_client_CA_list(sslCtx, caCertNames);
        }
    }

    if(lpszPemCertFile != nullptr)
    {
        CFile fPemCertFile(lpszPemCertFile, O_RDONLY | O_CLOEXEC);

        if(!fPemCertFile.IsFile())
        {
            ::SetLastError(ERROR_FILE_NOT_FOUND);
            return FALSE;
        }

        if(lpszPemKeyFile == nullptr)
        {
            ::SetLastError(ERROR_INVALID_PARAMETER);
            return FALSE;
        }

        CFile fPemKeyFile(lpszPemKeyFile, O_RDONLY | O_CLOEXEC);

        if(!fPemKeyFile.IsFile())
        {
            ::SetLastError(ERROR_FILE_NOT_FOUND);
            return FALSE;
        }
        
        if(lpszKeyPasswod != nullptr)
            SSL_CTX_set_default_passwd_cb_userdata(sslCtx, (void*)T2CA(lpszKeyPasswod));

        if(!SSL_CTX_use_PrivateKey_file(sslCtx, T2CA(lpszPemKeyFile), SSL_FILETYPE_PEM))
        {
            ::SetLastError(ERROR_INVALID_PASSWORD);
            return FALSE;
        }

        if(!SSL_CTX_use_certificate_chain_file(sslCtx, T2CA(lpszPemCertFile)))
        {
            ::SetLastError(ERROR_INVALID_DATA);
            return FALSE;
        }

        if(!SSL_CTX_check_private_key(sslCtx))
        {
            ::SetLastError(ERROR_INVALID_ACCESS);
            return FALSE;
        }
    }

    return TRUE;
}

至此完成了服务器SSL上下文初始化
然后是对ChckeParams、PrepareStart进行覆盖
判断ssl上下文是否初始化成功 以及sslContextpool初始化


BOOL CSSLServer::CheckParams()
{
    if(!m_sslCtx.IsValid())
    {
        SetLastError(SE_SSL_ENV_NOT_READY, __FUNCTION__, ERROR_NOT_READY);
        return FALSE;
    }

    return __super::CheckParams();
}

void CSSLServer::PrepareStart()
{
    __super::PrepareStart();

    m_sslPool.SetItemCapacity    (GetSocketBufferSize());
    m_sslPool.SetItemPoolSize    (GetFreeBufferObjPool());
    m_sslPool.SetItemPoolHold    (GetFreeBufferObjHold());
    m_sslPool.SetSessionLockTime(GetFreeSocketObjLockTime());
    m_sslPool.SetSessionPoolSize(GetFreeSocketObjPool());
    m_sslPool.SetSessionPoolHold(GetFreeSocketObjHold());

    m_sslPool.Prepare();
}

重写了FireAccept 实现客户SSL上下文初始化

EnHandleResult CSSLServer::FireAccept(TSocketObj* pSocketObj)
{
//先执行Accept
    EnHandleResult result = DoFireAccept(pSocketObj);
//Accept完成
    if(result != HR_ERROR)
    {
//客户ssl迟获取空余位置的指针
        CSSLSession* pSession = m_sslPool.PickFreeSession();
//SocketObj 初始化称sslsession
        VERIFY(SetConnectionReserved2(pSocketObj, pSession));
//进行挥手测试
        VERIFY(::ProcessHandShake(this, pSocketObj, pSession) == HR_OK);
    }

    return result;
}
//我们可以看到PickFreeSession
CSSLSession* CSSLSessionPool::PickFreeSession(LPCSTR lpszHostName)
{
    DWORD dwIndex;
    CSSLSession* pSession = nullptr;
//下锁 然后获取一个session 和 id
    if(m_lsFreeSession.TryLock(&pSession, dwIndex))
    {
//如果大于释放时间 则释放id
        if(::GetTimeGap32(pSession->GetFreeTime()) >= m_dwSessionLockTime)
            VERIFY(m_lsFreeSession.ReleaseLock(nullptr, dwIndex));
        else
        {
//否则两个都释放

            VERIFY(m_lsFreeSession.ReleaseLock(pSession, dwIndex));
            pSession = nullptr;
        }
    }
//如果session获取失败则 新建一个session
    if(!pSession) pSession = new CSSLSession(m_itPool);

    ASSERT(pSession);
把session 跟 sslCtx HostName赋值
    return pSession->Renew(m_sslCtx, lpszHostName);
}


CSSLSession* CSSLSession::Renew(const CSSLContext& sslCtx, LPCSTR lpszHostName)
{
    ASSERT(!IsValid());

    m_ssl        = SSL_new(sslCtx.GetDefaultContext());
//获得客户sslContext
    m_bioSend    = BIO_new(BIO_s_mem());
    m_bioRecv    = BIO_new(BIO_s_mem());
//新建客户bio缓冲区
    SSL_set_bio(m_ssl, m_bioRecv, m_bioSend);
//设置bio 
    if(sslCtx.GetSessionMode() == SSL_SM_SERVER)
//调用Accept
        SSL_accept(m_ssl);
    else
    {
//如果是客户端 则设置主机名
        USES_CONVERSION;

        if(lpszHostName && lpszHostName[0] != 0 && !::IsIPAddress(A2CT(lpszHostName)))
            SSL_set_tlsext_host_name(m_ssl, lpszHostName);
//连接
        SSL_connect(m_ssl);
    }
//Pool池 获取空余项 给Send Recv 
    m_pitSend        = m_itPool.PickFreeItem();
    m_pitRecv        = m_itPool.PickFreeItem();
    m_bufSend.buf    = m_pitSend->Ptr();
    m_bufRecv.buf    = m_pitRecv->Ptr();
    m_enStatus        = SSL_HSS_PROC;

    return this;
}

此时PickFreeSession已获取到空余Session 我们则把SocketObj 与 Session 连接起来

BOOL CTcpServer::SetConnectionReserved2(TSocketObj* pSocketObj, PVOID pReserved2)
{
    if(TSocketObj::IsExist(pSocketObj))
    {
        pSocketObj->reserved2 = pReserved2;
        return TRUE;
    }

    return FALSE;
}
//然后再执行握手 把SSL相关信息发给客户
ProcessHandShake
template<class T, class S> EnHandleResult ProcessHandShake(T* pThis, S* pSocketObj, CSSLSession* pSession)
{
    EnHandleResult result = HR_OK;
//下锁
    CCriSecLock locallock(pSession->GetSendLock());

    while(TRUE)
    {//通过ReadSendChannel里面的BIO_read把数据读到Session 里面的bufsend
        VERIFY(pSession->ReadSendChannel());
        const WSABUF& buffer = pSession->GetSendBuffer();
//然后通过GetSendBuffer 来获取到Buffer
        if(buffer.len == 0)
            break;

        if(!pThis->DoSendPackets(pSocketObj, &buffer, 1))
        {
            result = HR_ERROR;
            break;
        }
    }

    return result;
}
//然后调用了TcpServer 的DoSendPackets来发送数据 直到BIO里面没有数据了则握手完成 至此SSL Accept完成

我们再来看看

EnHandleResult CSSLServer::FireReceive(TSocketObj* pSocketObj, const BYTE* pData, int iLength)
{
    CSSLSession* pSession = nullptr;
//通过SocketObj来获取Session
    GetConnectionReserved2(pSocketObj, (PVOID*)&pSession);
    ASSERT(pSession);

    return ::ProcessReceive(this, pSocketObj, pSession, pData, iLength);
//然后调用SSLHelper类里面的 ProcessReceive
}

template<class T, class S> EnHandleResult ProcessReceive(T* pThis, S* pSocketObj, CSSLSession* pSession, const BYTE* pData, int iLength)
{
//收到的数据调用WriteRecvChannel 写入BIO RECV缓冲区
    if(!pSession->WriteRecvChannel(pData, iLength))
        return HR_ERROR;

    EnHandleResult result            = HR_OK;
    EnSSLHandShakeStatus enStatus    = pSession->GetStatus();

    while(TRUE)
    {
//通过SSL Read读取数据读到pSession 里的m_bufRecv
        if(!pSession->ReadRecvChannel())
            return HR_ERROR;
//如果还木有握手
        if(enStatus == SSL_HSS_PROC && pSession->IsReady())
        {
            result = ProcessHandShake(pThis, pSocketObj, pSession);

            if(result == HR_ERROR)
                break;

            enStatus = SSL_HSS_SUCC;
            result     = pThis->DoFireHandShake(pSocketObj);

            if(result == HR_ERROR)
                break;
        }

        const WSABUF& buffer = pSession->GetRecvBuffer();

        if(buffer.len == 0)
            break;
//调用Receive接口
        result = pThis->DoFireReceive(pSocketObj, (const BYTE*)buffer.buf, buffer.len);

        if(result == HR_ERROR)
            break;
    }
//还未握手成功 继续
    if(result != HR_ERROR && pSession->IsHandShaking())
        result = ::ProcessHandShake(pThis, pSocketObj, pSession);

    return result;
}
BOOL CSSLSession::ReadRecvChannel()
{
    BOOL isOK = TRUE;
    int bytes = SSL_read(m_ssl, m_bufRecv.buf, m_pitRecv->Capacity());

    if(bytes > 0)
        m_bufRecv.len = bytes;
    else if(!IsFatalError(bytes))
        m_bufRecv.len = 0;
    else
        isOK = FALSE;

    if(isOK && m_enStatus == SSL_HSS_PROC && SSL_is_init_finished(m_ssl))
        m_enStatus = SSL_HSS_SUCC;

    return isOK;
}

//此处因为SSL_set_bio 所以 通过BIO_write(m_bioRecv, pData, iLength); SSL_read会读m_bioRecv的内容
接收完毕我们来看发送 这里多处HandShake是因为 握手过来的包可能被Receive捕获所以要让他继续

BOOL CSSLServer::SendPackets(CONNID dwConnID, const WSABUF pBuffers[], int iCount)
{
    ASSERT(pBuffers && iCount > 0);

    TSocketObj* pSocketObj = FindSocketObj(dwConnID);

    if(!TSocketObj::IsValid(pSocketObj))
    {
        ::SetLastError(ERROR_OBJECT_NOT_FOUND);
        return FALSE;
    }

    CSSLSession* pSession = nullptr;
//取得session
    GetConnectionReserved2(pSocketObj, (PVOID*)&pSession);

    return ::ProcessSend(this, pSocketObj, pSession, pBuffers, iCount);
}

template<class T, class S> BOOL ProcessSend(T* pThis, S* pSocketObj, CSSLSession* pSession, const WSABUF * pBuffers, int iCount)
{
    if(pSession == nullptr || !pSession->IsReady())
    {
        ::SetLastError(ERROR_INVALID_STATE);
        return FALSE;
    }
//下锁
    CCriSecLock locallock(pSession->GetSendLock());
//是否可发 必须是握手完成
    if(!pSession->IsReady())
    {
        ::SetLastError(ERROR_INVALID_STATE);
        return FALSE;
    }

    VERIFY(pSession->WriteSendChannel(pBuffers, iCount));
//写入
    while(TRUE)
    {
//读取发送内容
        VERIFY(pSession->ReadSendChannel());
        const WSABUF& buffer = pSession->GetSendBuffer();

        if(buffer.len == 0)
            break;
//调用发送接口
        if(!pThis->DoSendPackets(pSocketObj, &buffer, 1))
            return FALSE;
    }

    return TRUE;
}

BOOL CSSLSession::WriteSendChannel(const WSABUF pBuffers[], int iCount)
{
    ASSERT(pBuffers && iCount > 0);

    BOOL isOK = TRUE;

    for(int i = 0; i < iCount; i++)
    {
        const WSABUF& buffer = pBuffers[i];

        if(buffer.len > 0)
        {
            if(!WriteSendChannel((const BYTE*)buffer.buf, buffer.len))
            {
                isOK = FALSE;
                break;
            }
        }
    }

    return isOK;
}
BOOL CSSLSession::WriteSendChannel(const BYTE* pData, int iLength)
{
    ASSERT(IsReady());
    ASSERT(pData && iLength > 0);

    BOOL isOK = TRUE;
    int bytes = SSL_write(m_ssl, pData, iLength);
//写入
    if(bytes > 0)
        ASSERT(bytes == iLength);
    else if(IsFatalError(bytes))
        isOK = FALSE;

    return isOK;
}

至此SSLServer工作流程完毕

BIO工作流程:
socket->(加密数据)bio read -> ssl read -> 原文

SSL_write(原文) -> bio wrtie(加密数据)-> socket

作者:Yoho
转载注明出处

TCP Server继承自ITcpServer类和CIOHandler类
直接从Start开撸

bool TcpServerService::Start(LPCTSTR lpszBindAddress, USHORT usPort) {
    //检查参数
    if(!CheckParams() || !CheckStarting())
    {
        return false;
    }
    //启动事项
    PrepareStart();
    //创建监听Socket
    if(CreateListenSocket(lpszBindAddress,usPort)){
        //创建工作线程
        if(CreateWorkerThreads()){
            //开始接收
            if(StartAccept()){
                m_enState = SS_STARTED;
                return true;
            }
        }
    }
    EXECUTE_RESTORE_ERROR(Stop());
    return false;
}

首先检查启动参数,然后检查是否正在运行 然后调用PrepareStart(可覆盖)函数
然后就开始创建Socket:

bool TcpServerService::CreateListenSocket(LPCTSTR lpszBindAddress,USHORT usPort){
    //地址是否为空
    if(::IsStrEmpty(lpszBindAddress)){
        lpszBindAddress = DEFAULT_IPV4_BIND_ADDRESS;//默认绑定0.0.0.0
    }
    HP_SOCKADDR addr;
    if(::sockaddr_A_2_IN(lpszBindAddress,usPort,addr)){
        m_soListen = socket(addr.family,SOCK_STREAM,IPPROTO_TCP); //创建Socket
        if(m_soListen != INVALID_SOCKET){ //如果Socket返回值不是-1
            ::fcntl_SETFL(m_soListen, O_NONBLOCK | O_CLOEXEC);//定义非阻塞;
            bool bOnOff    = (m_dwKeepAliveTime > 0 && m_dwKeepAliveInterval > 0);
            //设置Socket KeepAliveTime
            VERIFY(IS_NO_ERROR(::SSO_KeepAliveVals(m_soListen,bOnOff,m_dwKeepAliveTime,m_dwKeepAliveInterval)));
            VERIFY(IS_NO_ERROR(::SSO_ReuseAddress(m_soListen)));
            //绑定地址
            if(::bind(m_soListen,addr.Addr(),addr.AddrSize()) != SOCKET_ERROR){
                if(TRIGGER(FirePrepareListen(m_soListen)) != HR_ERROR){
                    if(::listen(m_soListen,m_dwSocketListenQueue) != SOCKET_ERROR){
                        return true;
                    }else{
                        SetLastError(SE_SOCKET_LISTEN,__FUNCTION__,::WSAGetLastError());
                    }
                }else{
                    SetLastError(SE_SOCKET_PREPARE,__FUNCTION__,ENSURE_ERROR_CANCELLED);
                }
            }else{
                SetLastError(SE_SOCKET_BIND,__FUNCTION__,::WSAGetLastError());
            }
        }else{
            SetLastError(SE_SOCKET_CREATE,__FUNCTION__,::WSAGetLastError());
        }
    }else{
        SetLastError(SE_SOCKET_CREATE,__FUNCTION__,::WSAGetLastError());
    }
    return false;
}

首先判断绑定地址和端口是否合法 如果合法则给addr赋值结构(HP_SOCKADDR)
然后调用socket 来创建套接字 然后通过fnctl_SETFL 来设置非阻塞O_NONBLOCK | O_CLOEXEC
再通过bind 来绑定端口
通过listen来监听端口
注:Firexxx等函数都可以被继承覆盖

创建完套接字就开始创建工作线程

//创建工作线程
bool TcpServerService::CreateWorkerThreads() {

    if(!m_ioDispatcher.Start(this,m_dwAcceptSocketCount,m_dwWorkerThreadCount)){
        return false;//创建失败
    }
    const CIODispatcher::CWorkerThread* pWorkerThread = m_ioDispatcher.GetWorkerThreads();
    for(DWORD i = 0;i<m_dwWorkerThreadCount;i++){
       m_rcBufferMap[pWorkerThread[i].GetThreadID()] = new CBufferPtr(m_dwSocketBufferSize);
    }
    return true;
}

调用IODispatcher来创建线程 然后再通过GetWorkerThreads来获取工作线程 并且遍历把他放在rcBufferMap变量(unordered_map)给每个线程开辟Buffer
我们来看CIODispatcher::Start

BOOL CIODispatcher::Start(IIOHandler* pHandler, int iWorkerMaxEvents, int iWorkers, LLONG llTimerInterval)
{
    ASSERT_CHECK_EINVAL(pHandler && iWorkerMaxEvents >= 0 && iWorkers >= 0);
    CHECK_ERROR(!HasStarted(), ERROR_INVALID_STATE);

    if(iWorkerMaxEvents == 0)    iWorkerMaxEvents = DEF_WORKER_MAX_EVENTS;
    if(iWorkers == 0)            iWorkers = DEFAULT_WORKER_THREAD_COUNT;

    m_iMaxEvents = iWorkerMaxEvents;
    m_iWorkers     = iWorkers;
    m_pHandler     = pHandler;

    m_epoll = epoll_create1(EPOLL_CLOEXEC);
    CHECK_ERROR_FD(m_epoll);

    m_evCmd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

    if(IS_INVALID_FD(m_evCmd))
        goto START_ERROR;

    if(!VERIFY(AddFD(m_evCmd, EPOLLIN | EPOLLET, &m_evCmd)))
        goto START_ERROR;

    m_evExit = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC | EFD_SEMAPHORE);

    if(IS_INVALID_FD(m_evExit))
        goto START_ERROR;

    if(!VERIFY(AddFD(m_evExit, EPOLLIN, &m_evExit)))
        goto START_ERROR;

    if(llTimerInterval > 0)
    {
        m_evTimer = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);;

        if(IS_INVALID_FD(m_evTimer))
            goto START_ERROR;

        itimerspec its;

        ::MillisecondToTimespec(llTimerInterval, its.it_value);
        ::MillisecondToTimespec(llTimerInterval, its.it_interval);

        if(!VERIFY_IS_NO_ERROR(timerfd_settime(m_evTimer, 0, &its, nullptr)))
            goto START_ERROR;

        if(!VERIFY(AddFD(m_evTimer, EPOLLIN | EPOLLET, &m_evTimer)))
            goto START_ERROR;
    }

    sigset_t ss;
    sigemptyset(&ss);
    sigaddset(&ss, SIGPIPE);

    VERIFY_IS_NO_ERROR(pthread_sigmask(SIG_BLOCK, &ss, nullptr));

    m_pWorkers = make_unique<CWorkerThread[]>(m_iWorkers);

    for(int i = 0; i < m_iWorkers; i++)
    {
        if(!VERIFY(m_pWorkers[i].Start(this, &CIODispatcher::WorkerProc)))
            goto START_ERROR;
    }

    return TRUE;

START_ERROR:
    EXECUTE_RESTORE_ERROR(Stop(FALSE));
    return FALSE;
}

这里就是HP-Socket的精髓了 这里是与系统内核IO进行监听的地方 在linux下用的是Epoll 如果想在Unix下使用可以自行更改为Kqueue 在Windows下是使用IOCP 这里是分别创建了三个线程通讯eventfd 分别是命令信号/退出信号/定时器信号
然后每个线程执行CIODispatcher::WorkerProc

此时我们看到WorkerProc

int CIODispatcher::WorkerProc(PVOID pv)
{
    BOOL bRun                            = TRUE;
    unique_ptr<epoll_event[]> pEvents    = make_unique<epoll_event[]>(m_iMaxEvents);

    while(bRun)
    {
        int rs = NO_EINTR_INT(epoll_pwait(m_epoll, pEvents.get(), m_iMaxEvents, INFINITE, nullptr));

        if(rs <= TIMEOUT)
            ERROR_ABORT();

        for(int i = 0; i < rs; i++)
        {
            UINT events    = pEvents[i].events;
            PVOID ptr    = pEvents[i].data.ptr;

            if(ptr == &m_evCmd)
                ProcessCommand(events);
            else if(ptr == &m_evTimer)
                ProcessTimer(events);
            else if(ptr == &m_evExit)
                bRun = ProcessExit(events);
            else
                ProcessIo(ptr, events);
        }
    }

    m_pHandler->OnDispatchThreadEnd(SELF_THREAD_ID);

    return 0;
}

通过epoll_pwait来捕获事件 并且事件总数
然后再用for来执行判断事件类型是命令/定时器/退出/内核事件(Socket)
此时工作线程已创建完毕

然后就开始StartAccept 接收

BOOL CTcpServer::StartAccept()
{
    return m_ioDispatcher.AddFD(m_soListen, _EPOLL_READ_EVENTS | EPOLLET, TO_PVOID(&m_soListen));
}

把Socket加入epoll 让epoll 捕获accept 事件
然后如果有人连接Socket 则会被WorkProc里面的epoll_wait所捕获我们再回到那里

            if(ptr == &m_evCmd)
                ProcessCommand(events);
            else if(ptr == &m_evTimer)
                ProcessTimer(events);
            else if(ptr == &m_evExit)
                bRun = ProcessExit(events);
            else
                ProcessIo(ptr, events);

此时 如果是Accept事件的话 是会触发ProcessIo的 我们再进入里面

BOOL CIODispatcher::ProcessIo(PVOID pv, UINT events)
{
    if(!m_pHandler->OnBeforeProcessIo(pv, events))
        return FALSE;

    BOOL rs = DoProcessIo(pv, events);
    m_pHandler->OnAfterProcessIo(pv, events, rs);

    return rs;
}

这里首先是调用BeforeProcessIo m_pHandler(TcpServer)

BOOL CTcpServer::OnBeforeProcessIo(PVOID pv, UINT events)
{
    if(pv == &m_soListen)
    {
        HandleAccept(events);
        return FALSE;
    }

    TSocketObj* pSocketObj = (TSocketObj*)(pv);

    if(!TSocketObj::IsValid(pSocketObj))
        return FALSE;

    pSocketObj->csIo.Lock();

    if(!TSocketObj::IsValid(pSocketObj))
    {
        pSocketObj->csIo.Unlock();
        return FALSE;
    }

    return TRUE;
}

则执行HandleAccept 否则 的话 则把对象转化成SocketObj 然后执行互斥锁来进行Socket操作
我们看下HandleAccept函数


BOOL CTcpServer::HandleAccept(UINT events)
{
    if(events & _EPOLL_ALL_ERROR_EVENTS)
    {
        VERIFY(!HasStarted());
        return FALSE;
    }

    while(TRUE)
    {
        HP_SOCKADDR addr;

        socklen_t addrLen    = (socklen_t)addr.AddrSize();
        SOCKET soClient        = ::accept(m_soListen, addr.Addr(), &addrLen);

        if(soClient == INVALID_SOCKET)
        {
            int code = ::WSAGetLastError();

            if(code == ERROR_WOULDBLOCK)
                return TRUE;
            else if(code == ERROR_CONNABORTED)
                continue;
            else if(code == ERROR_HANDLES_CLOSED)
                return FALSE;
            
            ERROR_EXIT2(EXIT_CODE_SOFTWARE, code);
        }

        VERIFY(::fcntl_SETFL(soClient, O_NOATIME | O_NONBLOCK | O_CLOEXEC));

        CONNID dwConnID = 0;

        if(!m_bfActiveSockets.AcquireLock(dwConnID))
        {
            ::ManualCloseSocket(soClient, SHUT_RDWR);
            continue;
        }

        TSocketObj* pSocketObj = GetFreeSocketObj(dwConnID, soClient);

        addr.Copy(pSocketObj->remoteAddr);
        AddClientSocketObj(dwConnID, pSocketObj);

        if(TRIGGER(FireAccept(pSocketObj)) == HR_ERROR)
        {
            AddFreeSocketObj(pSocketObj, SCF_NONE);
            continue;
        }

        UINT evts = (pSocketObj->IsPending() ? EPOLLOUT : 0) | (pSocketObj->IsPaused() ? 0 : EPOLLIN);
        VERIFY(m_ioDispatcher.AddFD(pSocketObj->socket, evts | EPOLLRDHUP | EPOLLONESHOT, pSocketObj));
    }

    return TRUE;
}

此时会进入一个循环 确保所有待处理的Socket全部被Accept
accept返回过来的客户socket 首先会进行判断 如果是wouldblock则说明接收完成然后返回 connaborted 则说明要再执行一次
hanldeclosed说明Socket关闭
然后执行fcntl_SETFL(soClient, O_NOATIME | O_NONBLOCK | O_CLOEXEC) 来设置非阻塞套接字
然后通过m_bfActiveSockets.AcquireLock 来获取一个空余的ConnId
如果获取失败则关闭Socket(已满)
然后再通过GetFreeSocketObj 来生成一个空的SocketObj结构

TSocketObj* CTcpServer::GetFreeSocketObj(CONNID dwConnID, SOCKET soClient)
{
    DWORD dwIndex;
    TSocketObj* pSocketObj = nullptr;

    if(m_lsFreeSocket.TryLock(&pSocketObj, dwIndex))
    {
        if(::GetTimeGap32(pSocketObj->freeTime) >= m_dwFreeSocketObjLockTime)
            VERIFY(m_lsFreeSocket.ReleaseLock(nullptr, dwIndex));
        else
        {
            VERIFY(m_lsFreeSocket.ReleaseLock(pSocketObj, dwIndex));
            pSocketObj = nullptr;
        }
    }

    if(!pSocketObj) pSocketObj = CreateSocketObj();
    pSocketObj->Reset(dwConnID, soClient);

    return pSocketObj;
}

TSocketObj* CTcpServer::CreateSocketObj()
{
    TSocketObj* pSocketObj = (TSocketObj*)m_phSocket.Alloc(sizeof(TSocketObj));
    ASSERT(pSocketObj);

    return new (pSocketObj) TSocketObj(m_bfObjPool);
}

此处首先TryLock 互斥体一下 如果ID对应有存在的SocketObj则释放SocketObj 然后再通过CreateSocketObj()创建SocketObj

TSocketObj* CTcpServer::CreateSocketObj()
{
    TSocketObj* pSocketObj = (TSocketObj*)m_phSocket.Alloc(sizeof(TSocketObj));
    ASSERT(pSocketObj);

    return new (pSocketObj) TSocketObj(m_bfObjPool);
}

这里通过Alloc(PrivateHeap这是一个专门负责管理内存的类)来生成对象 此时SocketObj已生成完毕 再回到HandleAccept中
通过addr.Copy 方法 把accept过来的地址赋值给pSocketObj里面的remoteAddr
然后再通过AddClientSocketObj加入客户Socket堆中(把SocketObj的地址 放到m_pv的偏移上面去)
然后调用FireAccept执行用户层接口 如果用户层接口返回HR_ERROR则把SocketObj放入FreeSocketObj等待回收处理
否则把他加入epoll监听 此时就已经完成了Accept事件的处理
当把客户的Socket加入epoll监听 当客户的Socket 触发Recvie Write等也会触发Epoll回调具体我们可以回到这句代码

BOOL CIODispatcher::ProcessIo(PVOID pv, UINT events)
{
    if(!m_pHandler->OnBeforeProcessIo(pv, events))
        return FALSE;

    BOOL rs = DoProcessIo(pv, events);
    m_pHandler->OnAfterProcessIo(pv, events, rs);

    return rs;
}

刚刚我们分析完OnBeforeProcessIo 这次我们看DoProcessIo

BOOL CIODispatcher::DoProcessIo(PVOID pv, UINT events)
{
    if(events & EPOLLERR)
        return m_pHandler->OnError(pv, events);
    if((events & EPOLLPRI) && !m_pHandler->OnReadyPrivilege(pv, events))
        return FALSE;
    if((events & EPOLLIN) && !m_pHandler->OnReadyRead(pv, events))
        return FALSE;
    if((events & EPOLLOUT) && !m_pHandler->OnReadyWrite(pv, events))
        return FALSE;
    if((events & (_EPOLL_HUNGUP_EVENTS)) && !m_pHandler->OnHungUp(pv, events))
        return FALSE;

    return TRUE;
}

这里通过判断事件类型分发具体函数 可以看到如果是事件类型是EPOLLIN 的话 则调用OnReadyRead 如果是EPOLLOUT 则调用OnReadyWrite

BOOL CTcpServer::OnReadyRead(PVOID pv, UINT events)
{
    return HandleReceive((TSocketObj*)pv, RETRIVE_EVENT_FLAG_H(events));
}

BOOL CTcpServer::HandleReceive(TSocketObj* pSocketObj, int flag)
{
    ASSERT(TSocketObj::IsValid(pSocketObj));

    if(m_bMarkSilence) pSocketObj->activeTime = ::TimeGetTime();

    CBufferPtr& buffer = *(m_rcBufferMap[SELF_THREAD_ID]);
//取当前线程缓冲区
    int reads = flag ? -1 : MAX_CONTINUE_READS;

    for(int i = 0; i < reads || reads < 0; i++)
    {
        int rc = (int)read(pSocketObj->socket, buffer.Ptr(), buffer.Size());
//读Socket到Buffer
        if(rc > 0)
        {
//调用FireReceive接口
            if(TRIGGER(FireReceive(pSocketObj, buffer.Ptr(), rc)) == HR_ERROR)
            {
                TRACE("<C-CNNID: %Iu> OnReceive() event return 'HR_ERROR', connection will be closed !", pSocketObj->connID);

                AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, ENSURE_ERROR_CANCELLED);
                return FALSE;
            }
        }//如果返回字节为0则释放Socket
        else if(rc == 0)
        {
            AddFreeSocketObj(pSocketObj, SCF_CLOSE, SO_RECEIVE, SE_OK);
            return FALSE;
        }
        else
        {//小于0则记录报错信息以及释放Socket
            ASSERT(rc == SOCKET_ERROR);

            int code = ::WSAGetLastError();

            if(code == ERROR_WOULDBLOCK)
                break;

            AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_RECEIVE, code);
            return FALSE;
        }
    }

    return TRUE;
}

FireReceive调用接口层的OnReceive

这次我们再看到OnReadyWrite

BOOL CTcpServer::OnReadyWrite(PVOID pv, UINT events)
{
    return HandleSend((TSocketObj*)pv, RETRIVE_EVENT_FLAG_H(events));
}


BOOL CTcpServer::HandleSend(TSocketObj* pSocketObj, int flag)
{
    ASSERT(TSocketObj::IsValid(pSocketObj));
//是否有内容
    if(!pSocketObj->IsPending())
        return TRUE;
//下锁
    CReentrantCriSecLock locallock(pSocketObj->csSend);
//是否有内容
    if(!pSocketObj->IsPending())
        return TRUE;

    BOOL isOK = TRUE;

    int writes = flag ? -1 : MAX_CONTINUE_WRITES;
    TBufferObjList& sndBuff = pSocketObj->sndBuff;
//获取Buff队列
    for(int i = 0; i < writes || writes < 0; i++)
    {
//队列出栈
        TItemPtr itPtr(sndBuff, sndBuff.PopFront());
//判断是否有效
        if(!itPtr.IsValid())
            break;
//是否为空
        ASSERT(!itPtr->IsEmpty());
//队列发送
        isOK = SendItem(pSocketObj, itPtr);

        if(!isOK)
            break;
//如果没发送完
        if(!itPtr->IsEmpty())
        {//重新加入队列
            sndBuff.PushFront(itPtr.Detach());
            break;
        }
    }

    return isOK;
}

BOOL CTcpServer::SendItem(TSocketObj* pSocketObj, TItem* pItem)
{
//内容是否为空 如果没发完就一直执行循环直到发送完毕为止
    while(!pItem->IsEmpty())
    {//write写入
        int rc = (int)write(pSocketObj->socket, pItem->Ptr(), pItem->Size());

        if(rc > 0)
        {//调用FireSend接口
            if(TRIGGER(FireSend(pSocketObj, pItem->Ptr(), rc)) == HR_ERROR)
            {
                TRACE("<C-CNNID: %Iu> OnSend() event should not return 'HR_ERROR' !!", pSocketObj->connID);
                ASSERT(FALSE);
            }
//减去发送长度
            pItem->Reduce(rc);
        }
        else if(rc == SOCKET_ERROR)
        {
            int code = ::WSAGetLastError();

            if(code == ERROR_WOULDBLOCK)
                break;

            AddFreeSocketObj(pSocketObj, SCF_ERROR, SO_SEND, code);
            return FALSE;
        }
        else
            ASSERT(FALSE);
    }

    return TRUE;
}

此时接收 发送事件监听已完成 我们再看下服务器主动发送事件

BOOL CTcpServer::Send(CONNID dwConnID, const BYTE* pBuffer, int iLength, int iOffset)
{//发送事件
    ASSERT(pBuffer && iLength > 0);

    if(iOffset != 0) pBuffer += iOffset;
//组装Buffer
    WSABUF buffer;
    buffer.len = iLength;
    buffer.buf = (BYTE*)pBuffer;
//发送
    return SendPackets(dwConnID, &buffer, 1);
}
virtual BOOL SendPackets    (CONNID dwConnID, const WSABUF pBuffers[], int iCount)    {return DoSendPackets(dwConnID, pBuffers, iCount);}

BOOL CTcpServer::DoSendPackets(CONNID dwConnID, const WSABUF pBuffers[], int iCount)
{
    ASSERT(pBuffers && iCount > 0);
//找到SocketObj
    TSocketObj* pSocketObj = FindSocketObj(dwConnID);

    if(!TSocketObj::IsValid(pSocketObj))
    {
        ::SetLastError(ERROR_OBJECT_NOT_FOUND);
        return FALSE;
    }

    return DoSendPackets(pSocketObj, pBuffers, iCount);
}

BOOL CTcpServer::DoSendPackets(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount)
{
    ASSERT(pSocketObj && pBuffers && iCount > 0);

    int result = NO_ERROR;
//Buffer合法性检验
    if(pBuffers && iCount > 0)
    {//下锁
        CReentrantCriSecLock locallock(pSocketObj->csSend);

        if(TSocketObj::IsValid(pSocketObj))
            result = SendInternal(pSocketObj, pBuffers, iCount);
        else
            result = ERROR_OBJECT_NOT_FOUND;
    }
    else
        result = ERROR_INVALID_PARAMETER;

    if(result != NO_ERROR)
        ::SetLastError(result);

    return (result == NO_ERROR);
}

int CTcpServer::SendInternal(TSocketObj* pSocketObj, const WSABUF pBuffers[], int iCount)
{
    int iPending = pSocketObj->Pending();
//加入队列
    for(int i = 0; i < iCount; i++)
    {
        int iBufLen = pBuffers[i].len;

        if(iBufLen > 0)
        {
            BYTE* pBuffer = (BYTE*)pBuffers[i].buf;
            ASSERT(pBuffer);
//加入SocketObj的Buffer队列
            pSocketObj->sndBuff.Cat(pBuffer, iBufLen);
        }
    }
//如果一开始队列为0则说明没有队列没有事件 无需处理  后来新增的要手动触发Epoll事件
    if(iPending == 0 && pSocketObj->IsPending())
    {
//调用
        if(!m_ioDispatcher.SendCommand(DISP_CMD_SEND, pSocketObj->connID))
            return ::GetLastError();
    }

    return NO_ERROR;
}
BOOL CIODispatcher::SendCommand(USHORT t, UINT_PTR wp, UINT_PTR lp)
{
    return SendCommand(TDispCommand::Construct(t, wp, lp));
}
BOOL CIODispatcher::SendCommand(TDispCommand* pCmd)
{
    m_queue.PushBack(pCmd);
    return VERIFY_IS_NO_ERROR(eventfd_write(m_evCmd, 1)); //手动触发eventfd
}

我们回到WorkProc

int CIODispatcher::WorkerProc(PVOID pv)
{
    BOOL bRun                            = TRUE;
    unique_ptr<epoll_event[]> pEvents    = make_unique<epoll_event[]>(m_iMaxEvents);

    while(bRun)
    {
        int rs = NO_EINTR_INT(epoll_pwait(m_epoll, pEvents.get(), m_iMaxEvents, INFINITE, nullptr));

        if(rs <= TIMEOUT)
            ERROR_ABORT();

        for(int i = 0; i < rs; i++)
        {
            UINT events    = pEvents[i].events;
            PVOID ptr    = pEvents[i].data.ptr;

            if(ptr == &m_evCmd)
                ProcessCommand(events); //触发这里
            else if(ptr == &m_evTimer)
                ProcessTimer(events);
            else if(ptr == &m_evExit)
                bRun = ProcessExit(events);
            else
                ProcessIo(ptr, events);
        }
    }

    m_pHandler->OnDispatchThreadEnd(SELF_THREAD_ID);

    return 0;
}
BOOL CIODispatcher::ProcessCommand(UINT events)
{
    if(events & _EPOLL_ALL_ERROR_EVENTS)
        ERROR_ABORT();

    if(!(events & EPOLLIN))
        return FALSE;

    BOOL isOK = TRUE;

    eventfd_t v;

    int rs = eventfd_read(m_evCmd, &v);//读

    if(IS_NO_ERROR(rs))
    {
        ASSERT(v > 0);

        TDispCommand* pCmd = nullptr;
//队列出栈
        while(m_queue.PopFront(&pCmd))
        {
            m_pHandler->OnCommand(pCmd);//触发接口事件
            TDispCommand::Destruct(pCmd);释放内存
        }
    }
    else if(IS_HAS_ERROR(rs))
    {
        ASSERT(IS_WOULDBLOCK_ERROR());

        isOK = FALSE;
    }

    return isOK;
}
VOID CTcpServer::OnCommand(TDispCommand* pCmd)
{
    switch(pCmd->type)
    {
    case DISP_CMD_SEND:
        HandleCmdSend((CONNID)(pCmd->wParam));
        break;
    case DISP_CMD_UNPAUSE:
        HandleCmdUnpause((CONNID)(pCmd->wParam));
        break;
    case DISP_CMD_DISCONNECT:
        HandleCmdDisconnect((CONNID)(pCmd->wParam), (BOOL)pCmd->lParam);
        break;
    }
}
VOID CTcpServer::HandleCmdSend(CONNID dwConnID)
{
//获得SocketObj
    TSocketObj* pSocketObj = FindSocketObj(dwConnID);

    if(TSocketObj::IsValid(pSocketObj) && pSocketObj->IsPending())
手动调用ProcessIo
        m_ioDispatcher.ProcessIo(pSocketObj, EPOLLOUT);
}

则此时服务端主动发送则完成 TcpServer大致到这里

作者:Yoho
转载注明出处

一个高性能网络库-HP-Socket 是一个由国人写的框架
HP-Socket 官方网站:http://www.jessma.org 已拉闸- -
HP-Socket 源代码下载地址:https://github.com/ldcsaa/HP-Socket
是一个把底层封装的很好的框架
这里只看了TCP-Server部分和SSL-Server部分
印象最深的是他这个框架疯狂利用类包装

目录解析(Linux版):

  • common 通用类文件夹
    • GlobalDef.h 全局宏定义 对基本数据类型进行了重定义 方便跨平台
    • STLHelper.h STL封装封装了STL的函数
    • BufferPool.h Buffer缓冲池(队列)
    • BufferPtr.h Buffer项
    • FileHelper.h File包装类
    • FuncHelper.h 常用函数包装
    • IODispatcher.h IO派遣类(线程) 负责监听IO事件以及分发事件
    • PrivateHeap.h 内存堆分发
    • RWLock.h 读写锁(R/W)
    • RingBuffer.h Buffer处理类
    • SysHelper.h 系统函数包装
    • Thread.h 线程函数包装
  • SocketHelper.h Socket函数包装
  • SocketInterface.h 回调接口包装
  • HPTypeDef.h 特有结构定义
  • TcpServer.h
  • SslServer.h

因为操作系统的设计缺陷 内核通信间是低效的 用户态必须得到内核的通知是有消息过来但是并不知道是谁发的 这就导致了用户层需要从头枚举自身的消息队列 来看是哪一个产生的消息 那么时间复杂度为O(n) 自IOCP/EPOLL/KQUEUE 就实现了内核把产生的消息 消息类型 对应的人 发给用户层 用户层则可以直接定位消息的人时间复杂度O(1) 并且利用的是内存拷贝的方式实现的

IOCP EPOLL KQUEUE的差别

IOCP是告诉你消息已经处理好了 你来做后续
EPOLL是告诉你消息可以处理了
KQUEUE设计的更加简单便捷原理类似EPOLL