Click here to Skip to main content
15,896,726 members
Articles / High Performance Computing / Parallel Processing

Commands Transfer Protocol (CTP) - A New Networking Protocol for Distributed or Parallel Computations

Rate me:
Please Sign up or sign in to vote.
4.85/5 (33 votes)
2 Feb 200522 min read 228.7K   7.6K   132  
In this article, an improved version of a new networking protocol for distributed or parallel computations is presented. In common, it is suitable just for fast, reliable and featureful interchange of small messages. The protocol's implementation and demo project are provided.
// CCTPReceivedData - Buffer for storing received data and information about it
// CCTPErrorInfo - Buffer for storing error information
// CCTPNet - Class, which implements CTP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////

#include "stdafx.h"

#include "NetBasic.h"
#include "DebugLog.h"
#include "CTPNet.h"

// Macrodefinitions for handy log building

// Put message mess to output stream log, protecter by critical section cs
#define LOG(log,cs,mess) \
    if (log) { \
        char ts[22]; \
        CSingleLock lock(&cs,TRUE); \
        (((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
    }

// The same, but  string representation of ip-address ip can be referenced as
// "addr"
#define LOGA(log,cs,mess,ip) \
    if (log) { \
        char ts[22],saddr[16]; \
        ip.GetString(saddr); \
        CSingleLock lock(&cs,TRUE); \
        (((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
    }

// The same, but inserts description of header head between mess1 and mess2.
// Moreover, string representation of ip-address ip can be referenced as "addr"
#define LOGHA(log,cs,mess1,mess2,head,ip) \
    if (log) { \
        char ts[22],saddr[16]; \
        ip.GetString(saddr); \
        CSingleLock lock(&cs,TRUE); \
        ((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess1;\
        head->ToStream(*log); \
        (((ostream&)*(log))<<mess2<<"\n").flush();\
    }

CCTPReceivedData::CCTPReceivedData(unsigned __int16 command, unsigned __int64 size, unsigned long from, char* buf)
{
    this->command=command;
    this->size=size;
    this->from=IPAddr(from);
    pBuf=new char[(unsigned int)size];
    if (buf) memcpy(pBuf,buf,(unsigned int)size);
}

CCTPErrorInfo::CCTPErrorInfo(unsigned char type,unsigned __int16 command,int code,IPAddr addr)
{
    this->type=type;
    this->command=command;
    this->code=code;
    this->addr=addr;
    GetTimeStamp(timestamp);
}

char* CCTPErrorInfo::GetTimeStamp(char* s)
{
    CHAR date[30]="";
    CHAR time[30]="";
    _timeb timebuffer;

    // Get date/time
    _strdate(date);
    _ftime(&timebuffer);
    _strtime(time);

    // Create string
    sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
    return s;
}

void CCTPNet::Header::ToStream(ostream& out)
{
    if (command&CCTPNet::m_iConfirm) {
        out<<"{confirm: "<<(command^CCTPNet::m_iConfirm);
    } else {
        out<<"{command: "<<command;
    }
    out<<", id: "<<(unsigned int)id<<", size: "<<(unsigned int)size;
    if (amount>1) {
        out<<", num: "<<(unsigned int)number<<"("<<(unsigned int)amount<<")";
    }
    if (options) {
        out<<", opt: "<<options;
    }
    out<<"}";
}

bool CCTPNet::SntCommandInfo::Confirm(unsigned int i)
{
    CI[i].bConfirmed=true;
    for (unsigned int j=0; j<uCount; j++) {
        if (!CI[j].bConfirmed) return false;
    }

    return true;
}

bool CCTPNet::LargeCommandInfo::GotPart(unsigned int i)
{
    received[i]=true;
    for (unsigned int j=0; j<uCount; j++) {
        if (!received[j]) return false;
    }
    return true;
}

const unsigned __int8 CCTPNet::OptPing=CCTPNet::Options::DelAfterError|CCTPNet::Options::NoResend|CCTPNet::Options::UniqueCommand;
const unsigned __int16 CCTPNet::m_iConfirm=0x8000;

CCTPNet::CCTPNet(NetReceiver* receiver,unsigned short port,unsigned short servers,Times* times,ostream* log,unsigned __int16 packetdatasize,unsigned short maxdeliverers)
{
    // Tuning
    m_DefReceiver=receiver;
    m_uPort=port;
    if (times) m_Times=*times;
    m_uPacketDataSize=packetdatasize;
    m_pLog=log;

    // Initialize random generator
    srand((unsigned)time(NULL));

    // Initialization
    m_bSuspended=true;
    m_SntCommands.clear();
    m_Sessions.clear();
    m_LargeCommands.clear();
    m_Receivers.clear();
    m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];
    m_Deliveries.clear();
    m_pDeliverTrds.clear();
    m_uMaxDeliverers=maxdeliverers;
    m_uBusy=0;

    CreateSockets();

    m_bKill=false;

    // Start threads and store handles
    for (unsigned int i=0;i<servers;i++) {
        m_pServerTrds.push_back(AfxBeginThread(CTPServerFunction,this));
    }
    m_pDelManTrd=AfxBeginThread(CTPDelManFunction,this);

    LOG(m_pLog,m_csLog,"CTP started on port "<<port<<" with "<<servers<<" servers\n");
}

CCTPNet::~CCTPNet()
{
    LOG(m_pLog,m_csLog,"CTP is shuting down");

    // Terminate threads
    m_bKill=true;
    DWORD time=GetTickCount();
    while ((!m_pDeliverTrds.empty()) || !m_pServerTrds.empty() || m_pDelManTrd) {
        Sleep(m_Times.uSleepOnDestroy);
        // Kill servers, deliverers and delivery manager if they are busy too long
        if (GetTickCount()-time>m_Times.uPeriodDestroy) {
            CSingleLock locks(&m_csServerTrds);
            LOCK(locks);
            for (vector<CWinThread*>::iterator it=m_pServerTrds.begin(); it!=m_pServerTrds.end(); it++) {
                LOG(m_pLog,m_csLog,"Server thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
                TerminateThread((*it)->m_hThread,0);
            }
            m_pServerTrds.clear();
            UNLOCK(locks);

            CSingleLock lockd(&m_csDeliverTrds);
            LOCK(lockd);
            for (it=m_pDeliverTrds.begin(); it!=m_pDeliverTrds.end(); it++) {
                LOG(m_pLog,m_csLog,"Deliverer thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
                TerminateThread((*it)->m_hThread,0);
            }
            m_pDeliverTrds.clear();
            UNLOCK(lockd);

            if (m_pDelManTrd) {
                LOG(m_pLog,m_csLog,"Delivery manager thread was stopped forcedly");
                TerminateThread(m_pDelManTrd->m_hThread,0);
            }
        }
    }

    // Free resources
    closesocket(m_SendSocket);
    closesocket(m_RecvSocket);
    FreeSntCommands();
    FreeSessions();
    FreeLargeCommands();
    FreeDeliveries();
    m_Receivers.clear();
    delete[] m_pBuffer;

    LOG(m_pLog,m_csLog,"CTP stopped");
}

bool CCTPNet::CreateSockets()
{
    LOG(m_pLog,m_csLog,"Creating sockets");

    CSingleLock lock(&m_csDeliveries);

    // Sockets creation
    m_SendSocket=socket(AF_INET, SOCK_DGRAM, 0);
    m_RecvSocket=socket(AF_INET, SOCK_DGRAM, 0);
    if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
        LOCK(lock);
        m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(0,0,WSAGetLastError(),IPAddr())));
        UNLOCK(lock);
        LOG(m_pLog,m_csLog,"Failed to create sockets");
        m_bSuspended=true;
        return false;
    }
    // Sockets are to support broadcasting
    BOOL broadcast=TRUE;
    setsockopt(m_SendSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
    setsockopt(m_RecvSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));

    // Binding local address with receiving socket
    m_Local.sin_family=AF_INET;
    m_Local.sin_port=htons(m_uPort);
    m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
    if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
    {
        LOCK(lock);
        m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(1,0,WSAGetLastError(),IPAddr())));
        UNLOCK(lock);
        LOG(m_pLog,m_csLog,"Failed to bind receiving socket");
        m_bSuspended=true;
        return false;
    }

    return true;
}

void CCTPNet::CheckupOptions(Header& header)
{
    if (header.amount>1 && header.options&Options::UniqueCommand) header.options^=Options::UniqueCommand;
    if (header.options&Options::StartSession) header.options^=Options::StartSession;
}

bool CCTPNet::Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 options, bool storeiffail)
{
    // Arrange header
    Header head;
    head.amount=sb.GetPacketsCount();
    head.command=command;
    head.messize=sb.GetDataSize();
    head.options=options;
    CheckupOptions(head);

    // Provide data with headers
    for (unsigned int i=0;i<head.amount;i++) {
        // Fill rest header information
        GetNextID(head,to);
        head.size=(unsigned __int16)sb.GetPacketSize(i);
        head.number=i;

        // Put header in the packet
        sb.PutHead(&head,i);
    }

    // Store message to be sent and pointer to it
    SntCommandInfo ci(sb,GetTickCount(),to.Solid);
    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);
    m_SntCommands.push_back(ci);
    SntCommandInfoList::iterator curentry=--m_SntCommands.end();
    UNLOCK(lock);

    // Send packets
    for (i=0;i<head.amount;i++) {
        if (!SendPacket(sb.GetHeadPtr(i),to.Solid)) {
            // Store error information
            CSingleLock lock(&m_csDeliveries);
            LOCK(lock);
            m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(2,head.command,WSAGetLastError(),IPAddr(to.Solid))));
            UNLOCK(lock);

            // Delete from sent packets storage if there was an error and storeiffail equals false
            if (!storeiffail) {
                CSingleLock lock(&m_csSntCommands);
                LOCK(lock);
                m_SntCommands.erase(curentry);
                UNLOCK(lock);
            }

            // Exit sending function with return value, which shows error
            return false;
        }
    }

    // Exit sending function with return value, which shows success
    return true;
}

bool CCTPNet::SendPacket(char* buf, unsigned long to)
{
    SOCKADDR_IN recip;
    recip.sin_family=AF_INET;
    recip.sin_port=htons(m_uPort);
    recip.sin_addr.s_addr=to;
    Header* head=(Header*)buf;

    LOGHA(m_pLog,m_csLog,"Send packet "," to "<<saddr,head,IPAddr(to));

    return (sendto(m_SendSocket,(const char*)buf,head->size,0,(SOCKADDR*)&recip,sizeof(recip))!=SOCKET_ERROR);
}

void CCTPNet::FreeSntCommands()
{
    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);
    for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();it++) {
        it->Free();
    }
    m_SntCommands.clear();
    UNLOCK(lock);

    LOG(m_pLog,m_csLog,"Sent commands storage freed");
}

void CCTPNet::FreeSessions()
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    for (SessionsInfo::iterator it=m_Sessions.begin();it!=m_Sessions.end();it++) {
        it->second.received.clear();
    }
    m_Sessions.clear();
    UNLOCK(lock);

    LOG(m_pLog,m_csLog,"Sessions information storage freed");
}

void CCTPNet::FreeLargeCommands()
{
    CSingleLock lock(&m_csLargeCommands);
    LOCK(lock);
    for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
        it->Free();
        if (it->pRD) delete it->pRD;
    }
    m_LargeCommands.clear();
    UNLOCK(lock);

    LOG(m_pLog,m_csLog,"Large commands storage freed");
}

void CCTPNet::FreeDeliveries()
{
    for (DeliveriesList::iterator it=m_Deliveries.begin();it!=m_Deliveries.end();it++) {
        if (it->data) {
            if (it->type==DeliveryType::ReceivedData) delete (CCTPReceivedData*)it->data; else
            if (it->type==DeliveryType::ErrorInfo) delete (CCTPErrorInfo*)it->data; else
            delete it->data;
        }
    }
    m_Deliveries.clear();

    LOG(m_pLog,m_csLog,"Deliveries storage freed");
}

bool CCTPNet::SaveRcvPacket(unsigned long from,Header* head)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    SessionInfo& si=GetSessionInfo(IPAddr(from),head->options&Options::Broadcast?true:false);

    if (si.received.empty()) {
        // First message from corresponding workstation
        si.received.push_back(head->id);
    } else {
        // Find place among already received messages
        if (Less(si.received.back(),head->id)) {
            si.received.push_back(head->id);
        } else {
            for (SessionInfo::RcvList::iterator it=si.received.begin();it!=si.received.end();it++) {
                // Already has been recieved
                if ((*it)==head->id) {
                    LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
                    return false;
                }
                if (Less(head->id,*it)) {
                    // Already has been recieved
                    if (it==si.received.begin() && si.minwasset) {
                        LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
                        return false;
                    }
                    // New one has been recieved
                    si.received.insert(it,head->id);
                    break;
                }
            }
        }

        // Remove ambigous information
        if (si.minwasset) {
            while ((si.received.front()+1)==*(++si.received.begin())) {
                si.received.pop_front();
            }
        }
    }

    // Session was started with this packet
    if (head->options&Options::StartSession) {
        si.minwasset=true;
        LOGA(m_pLog,m_csLog,"Session starting packet with ID:"<<(unsigned int)head->id<<" have been received from "<<saddr,IPAddr(from));
    }

    UNLOCK(lock);
    return true;
}

void CCTPNet::SendConfirmation(unsigned long to,Header header)
{
    // Change header
    header.command|=m_iConfirm;
    header.messize=0;
    header.size=GetHeaderSize();

    // Send confirmation command
    SendPacket((char*)&header,to);
}

void CCTPNet::ConfirmSntPacket(unsigned long to,Header* header)
{
    Header* head;
    bool erased=false;
    // Is command unique
    bool unique=(header->options&Options::UniqueCommand)&&(header->amount<=1);

    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);

    for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
        erased=false;
        head=(Header*)it->sbBody.GetBufferBegin();

        // Find appropriate recipient or accept any if message was broadcasted
        if (it->ipTo==to || head->options&Options::Broadcast) {
            if (!unique) {
                // Not unique command
                if (head->id==header->id-header->number) {
                    if (it->Confirm(header->number)) {
                        SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
                        LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
                        it->Free();
                        m_SntCommands.erase(it);
                    }
                    UNLOCK(lock);
                    return;
                }
            } else {
                // Unique command
                if ((head->command^header->command)==m_iConfirm) {
                    if (it->Confirm(0)) {
                        SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
                        LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
                        it->Free();
                        it=m_SntCommands.erase(it);
                        erased=true;
                        if (it==m_SntCommands.end()) break;
                    }
                }
            }
        }

        // Go to next sent packet
        if (!erased) it++;
    }

    UNLOCK(lock);
}

bool CCTPNet::ArrangeLargeCommand(unsigned long from,Header* head)
{
    LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" is a part of large command",head,IPAddr(from));

    char* mem=NULL;
    CSingleLock lock(&m_csLargeCommands);
    LOCK(lock);

    // Try to find packets, wich belongs to the same message in received parts
    for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
        if (it->id==head->id-head->number && it->pRD->from==from && !it->received[head->number]) {
            mem=it->pRD->pBuf;
            mem+=head->number*m_uPacketDataSize;
            memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
            if (it->GotPart(head->number)) {
                LOG(m_pLog,m_csLog,"Large command arranged");
                CSingleLock lock(&m_csDeliveries);
                LOCK(lock);
                m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ReceivedData),it->pRD));
                UNLOCK(lock);
                it->Free();
                m_LargeCommands.erase(it);
                return true;
            } else return false;
        }
    }

    // Part of the new message received
    LargeCommandInfo lpi(head->command,head->messize,from,head->id,head->amount);
    mem=lpi.pRD->pBuf;
    mem+=head->number*m_uPacketDataSize;
    memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
    m_LargeCommands.push_front(lpi);
    m_LargeCommands.front().GotPart(head->number);

    return false;
}

void CCTPNet::ResendNotConfirmedData()
{
    Header* head;

    DWORD time=GetTickCount();
    bool erased=false;
    unsigned int i=0;
    // This fleag is used to keep from showing error for all parts of the large
    // message
    bool wasdead=false;

    CSingleLock lock(&m_csSntCommands);
    LOCK(lock);
    for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
        erased=false;
        head=(Header*)it->sbBody.GetBufferBegin();
        for (i=0; i<it->uCount; i++) if (!it->CI[i].bConfirmed) {
            // If timeout have expired
            if ((unsigned int)(time-it->CI[i].dwLTime)>it->CI[i].uResend*GetTimeout(it->ipTo,head->options&Options::Broadcast?true:false)) {
                if (!(head->options&Options::NoResend)) {
                    LOGHA(m_pLog,m_csLog,"Packet "," is to be resent to "<<saddr,((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
                    SendPacket(it->sbBody.GetHeadPtr(i),it->ipTo);
                }
                it->CI[i].dwLTime=time;
                it->CI[i].IncResend();

                // Produce error if dead timeout occuired
                if (it->CI[i].IsDeadTimeout() && !wasdead){
                    CSingleLock lock(&m_csDeliveries);
                    LOCK(lock);
                    m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ErrorInfo),new CCTPErrorInfo(4,head->command,WSAGetLastError(),IPAddr(it->ipTo))));
                    UNLOCK(lock);
                    wasdead=true;

                    // Erase if needed
                    if (head->options&Options::DelAfterError) {
                        LOGHA(m_pLog,m_csLog,"Command, sent to "<<saddr<<", which includes packet ",", is delete from sent commands storage after generating error delivery",((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
                        m_SntCommands.erase(it);
                        erased=true;
                        break;
                    }
                }
            }
        }

        // Go to next element
        if (!erased) it++;
    }
    UNLOCK(lock);
}

void CCTPNet::AddSpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type)
{
    if (command==0) return;
    for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
        if (it->command==command && it->type==type) {
            it=m_Receivers.erase(it);
            if (it==m_Receivers.end()) break;
        } else it++;
    }
    m_Receivers.push_front(SpecialReceiver(command,receiver,type));
}

void CCTPNet::DeleteSpecialReceiver(NetReceiver* receiver)
{
    for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
        if (it->receiver==receiver) {
            it=m_Receivers.erase(it);
            if (it==m_Receivers.end()) break;
        } else it++;
    }
}

NetReceiver* CCTPNet::GetReceiver(unsigned __int16 command, DeliveryType type)
{
    if (command==0) return m_DefReceiver;
    for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();it++) {
        if (it->command==command && it->type==type) return it->receiver;
    }
    return m_DefReceiver;
}

CCTPNet::SessionInfo& CCTPNet::GetSessionInfo(IPAddr addr, bool bcast)
{
    if (bcast) addr.SetBroadcast();
    return m_Sessions.insert(SessionsInfo::value_type(addr.Solid,SessionInfo())).first->second;
}

void CCTPNet::GetNextID(Header& head,IPAddr addr)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    if (m_Sessions.find(addr.Solid)==m_Sessions.end()) head.options|=Options::StartSession;
    SessionInfo& si=GetSessionInfo(addr,head.options&Options::Broadcast?true:false);
    head.id=++si.id;
}

unsigned int CCTPNet::GetTimeout(IPAddr addr, bool bcast)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    SessionInfo& si=GetSessionInfo(addr,bcast);
    if (!(si.timeout)) return m_Times.uDefTimeout; else return si.timeout;
}

void CCTPNet::SetTimeout(IPAddr addr, bool bcast, unsigned int timeout)
{
    CSingleLock lock(&m_csSessions);
    LOCK(lock);
    SessionInfo& si=GetSessionInfo(addr,bcast);
    if (!(si.timeout)) {
        LOGA(m_pLog,m_csLog,"Timeout for session with "<<saddr<<" is considered to be "<<timeout<<" microseconds",addr);
        si.timeout=timeout;
    }
}

unsigned int CTPServerFunction(void* pNet)
{
    CCTPNet* net=(CCTPNet*)pNet;

    // Necessary variables
    timeval tv;
    tv.tv_sec=0;
    tv.tv_usec=1;
    fd_set fdread;
    SOCKADDR_IN sender;
    int sendersize=NULL;
    CCTPNet::Header* head;
    DWORD time=NULL;
    DWORD checktime=GetTickCount();
    CSingleLock lock(&net->m_csDeliveries);
    CSingleLock lockn(&net->m_csNetwork);


    for(;;) {
        // Wait while suspended
        while (net->GetSuspended()) {
            // Does killing needed
            if (net->m_bKill) {
                CSingleLock lock(&net->m_csServerTrds);
                LOCK(lock);
                net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
                UNLOCK(lock);
                LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
                return (unsigned int)AfxGetThread();
            }

            // Sleep a little bit
            Sleep(net->GetTimes().uSleepSuspended);
        }

        // Check for received data
        time=GetTickCount();
        FD_ZERO(&fdread);
        FD_SET(net->m_RecvSocket,&fdread);
        LOCK(lockn);
        if (select(0,&fdread,NULL,NULL,&tv)>0) {
            // Receive data
            sendersize=sizeof(sender);
            int ret=recvfrom(net->m_RecvSocket,net->m_pBuffer,net->GetPacketDataSize()+net->GetHeaderSize(),0,(SOCKADDR*)&sender,&sendersize);
            UNLOCK(lockn);
            if (ret==SOCKET_ERROR) {
                // Error while receiving
                LOCK(lock);
                net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetDefaultReceiver(),new CCTPErrorInfo(3,0,WSAGetLastError(),IPAddr(sender.sin_addr.S_un.S_addr))));
                UNLOCK(lock);
                LOG(net->m_pLog,net->m_csLog,"Network receiving error");
            } else {
                if (ret>=net->GetHeaderSize()) {
                    head=(CCTPNet::Header*)net->m_pBuffer;
                    LOGHA(net->m_pLog,net->m_csLog,"Packet "," have been received from "<<saddr,head,IPAddr(sender.sin_addr.S_un.S_addr));

                    if (net->IsConfirmation(head->command)) {
                        net->ConfirmSntPacket(sender.sin_addr.S_un.S_addr,head);
                    } else {
                        if (net->SaveRcvPacket(sender.sin_addr.S_un.S_addr,head)) {
                            // New packet was got
                            if (head->amount>1) {
                                net->ArrangeLargeCommand(sender.sin_addr.S_un.S_addr,head);
                            } else {
                                LOCK(lock);
                                net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetReceiver(head->command,CCTPNet::DeliveryType::ReceivedData),new CCTPReceivedData(head->command,head->messize,sender.sin_addr.S_un.S_addr,net->m_pBuffer+net->GetHeaderSize())));
                                UNLOCK(lock);
                            }
                        }
                        // Send confimation
                        net->SendConfirmation(sender.sin_addr.S_un.S_addr,*head);
                    }
                }
            }
        } else UNLOCK(lockn);

        // Does killing needed
        if (net->m_bKill) {
            CSingleLock lock(&net->m_csServerTrds);
            LOCK(lock);
            net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
            UNLOCK(lock);
            LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
            return (unsigned int)AfxGetThread();
        }

        // Sent packets were not checked long enough
        if ((time-checktime)>net->GetTimes().uPeriodCheckResend) {
            // Check if resending needed
            checktime=GetTickCount();
            net->ResendNotConfirmedData();
        }
    }
}

unsigned int CTPDelManFunction(void* pNet)
{
    CCTPNet* net=(CCTPNet*)pNet;
    CCTPNet::Delivery del;
    CSingleLock lock(&net->m_csDeliverTrds);

    for(;;) {
        // Does additional delivery threads needed
        if (!net->GetSuspended()) {
            LOCK(lock);
            if (!net->m_Deliveries.empty() && net->m_pDeliverTrds.size()<net->m_uMaxDeliverers && net->m_pDeliverTrds.size()==net->m_uBusy) {
                net->m_pDeliverTrds.push_back(AfxBeginThread(CTPDeliverFunction,pNet));
                LOG(net->m_pLog,net->m_csLog,"Delivery manager have created deliverer with handle "<<net->m_pDeliverTrds.back()->m_hThread);
            }
            UNLOCK(lock);
        }

        // Kill server
        if (net->m_bKill) {
            net->m_pDelManTrd=NULL;
            LOG(net->m_pLog,net->m_csLog,"Delivery manager thread stopped");
            return (unsigned int)AfxGetThread();
        }

        Sleep(net->GetTimes().uSleepDelMan);
    }
}

unsigned int CTPDeliverFunction(void* pNet)
{
    CCTPNet* net=(CCTPNet*)pNet;
    CCTPNet::Delivery del;
    // If zero then something was done. If greater then nothing was done
    //(greater means doing nothing longer)
    bool bNothing=true;
    DWORD lastdel=GetTickCount();
    CSingleLock lock(&net->m_csDeliveries);

    for(;;) {
        bNothing=true;

        for (;;) {
            // Get delivery if exist
            LOCK(lock);
            if (net->m_Deliveries.empty()) break;
            del=net->m_Deliveries.front();
            net->m_Deliveries.pop_front();
            UNLOCK(lock);

            LOG(net->m_pLog,net->m_csLog,"Delivery is beeing processed");

            // Deliver delivery
            net->m_uBusy++;
            switch (del.type) {
            case CCTPNet::DeliveryType::ReceivedData:
                del.target->OnReceive(del.data);
                delete (CCTPReceivedData*)del.data;
            break;
            case CCTPNet::DeliveryType::ErrorInfo:
                del.target->OnError(del.data);
                delete (CCTPErrorInfo*)del.data;
            break;
            default:
                if (VALID(del.data)) delete del.data;
            }
            net->m_uBusy--;
            lastdel=GetTickCount();

            // Thread is working now
            bNothing=false;

            // If killing needed then do it after unlocking
            if (net->m_bKill) break;
        }
        if (lock.IsLocked()) UNLOCK(lock);

        // Does killing needed (because of request or because of sponging)
        if (net->m_bKill || (net->m_Deliveries.size()==0 && GetTickCount()-lastdel>net->GetTimes().uPeriodAutoDest)) {
            CSingleLock lock(&net->m_csDeliverTrds);
            LOCK(lock);
            net->m_pDeliverTrds.erase(find(net->m_pDeliverTrds.begin(),net->m_pDeliverTrds.end(),AfxGetThread()));
            UNLOCK(lock);
            LOG(net->m_pLog,net->m_csLog,"Deliverer thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
            return (unsigned int)AfxGetThread();
        }
        
        // Sleep a little bit
        if (bNothing) {
            Sleep(net->GetTimes().uSleepNothing);
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here


Written By
dum
Web Developer
Russian Federation Russian Federation
Lev Naumov.
MSc in Computer Science. Graduated from Computer Technologies Department of Saint-Petersburg State University of Information Technologies, Mechanics and Optics.
Worked as C/C++ and Java programmer. Now - the research worker in "CAMEL Laboratory" and PhD student in Computer Technologies Department of Saint-Petersburg State University of Information Technologies, Mechanics and Optics.
Has scientific achievements in field of physics, automata theory, cellular automata theory, cluster computing. There are some publications.

Comments and Discussions