// CCTPReceivedData - Buffer for storing received data and information about it
// CCTPErrorInfo - Buffer for storing error information
// CCTPNet - Class, which implements CTP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "NetBasic.h"
#include "DebugLog.h"
#include "CTPNet.h"
// Macrodefinitions for handy log building
// Put message mess to output stream log, protecter by critical section cs
#define LOG(log,cs,mess) \
if (log) { \
char ts[22]; \
CSingleLock lock(&cs,TRUE); \
(((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
}
// The same, but string representation of ip-address ip can be referenced as
// "addr"
#define LOGA(log,cs,mess,ip) \
if (log) { \
char ts[22],saddr[16]; \
ip.GetString(saddr); \
CSingleLock lock(&cs,TRUE); \
(((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
}
// The same, but inserts description of header head between mess1 and mess2.
// Moreover, string representation of ip-address ip can be referenced as "addr"
#define LOGHA(log,cs,mess1,mess2,head,ip) \
if (log) { \
char ts[22],saddr[16]; \
ip.GetString(saddr); \
CSingleLock lock(&cs,TRUE); \
((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess1;\
head->ToStream(*log); \
(((ostream&)*(log))<<mess2<<"\n").flush();\
}
CCTPReceivedData::CCTPReceivedData(unsigned __int16 command, unsigned __int64 size, unsigned long from, char* buf)
{
this->command=command;
this->size=size;
this->from=IPAddr(from);
pBuf=new char[(unsigned int)size];
if (buf) memcpy(pBuf,buf,(unsigned int)size);
}
CCTPErrorInfo::CCTPErrorInfo(unsigned char type,unsigned __int16 command,int code,IPAddr addr)
{
this->type=type;
this->command=command;
this->code=code;
this->addr=addr;
GetTimeStamp(timestamp);
}
char* CCTPErrorInfo::GetTimeStamp(char* s)
{
CHAR date[30]="";
CHAR time[30]="";
_timeb timebuffer;
// Get date/time
_strdate(date);
_ftime(&timebuffer);
_strtime(time);
// Create string
sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
return s;
}
void CCTPNet::Header::ToStream(ostream& out)
{
if (command&CCTPNet::m_iConfirm) {
out<<"{confirm: "<<(command^CCTPNet::m_iConfirm);
} else {
out<<"{command: "<<command;
}
out<<", id: "<<(unsigned int)id<<", size: "<<(unsigned int)size;
if (amount>1) {
out<<", num: "<<(unsigned int)number<<"("<<(unsigned int)amount<<")";
}
if (options) {
out<<", opt: "<<options;
}
out<<"}";
}
bool CCTPNet::SntCommandInfo::Confirm(unsigned int i)
{
CI[i].bConfirmed=true;
for (unsigned int j=0; j<uCount; j++) {
if (!CI[j].bConfirmed) return false;
}
return true;
}
bool CCTPNet::LargeCommandInfo::GotPart(unsigned int i)
{
received[i]=true;
for (unsigned int j=0; j<uCount; j++) {
if (!received[j]) return false;
}
return true;
}
const unsigned __int8 CCTPNet::OptPing=CCTPNet::Options::DelAfterError|CCTPNet::Options::NoResend|CCTPNet::Options::UniqueCommand;
const unsigned __int16 CCTPNet::m_iConfirm=0x8000;
CCTPNet::CCTPNet(NetReceiver* receiver,unsigned short port,unsigned short servers,Times* times,ostream* log,unsigned __int16 packetdatasize,unsigned short maxdeliverers)
{
// Tuning
m_DefReceiver=receiver;
m_uPort=port;
if (times) m_Times=*times;
m_uPacketDataSize=packetdatasize;
m_pLog=log;
// Initialize random generator
srand((unsigned)time(NULL));
// Initialization
m_bSuspended=true;
m_SntCommands.clear();
m_Sessions.clear();
m_LargeCommands.clear();
m_Receivers.clear();
m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];
m_Deliveries.clear();
m_pDeliverTrds.clear();
m_uMaxDeliverers=maxdeliverers;
m_uBusy=0;
CreateSockets();
m_bKill=false;
// Start threads and store handles
for (unsigned int i=0;i<servers;i++) {
m_pServerTrds.push_back(AfxBeginThread(CTPServerFunction,this));
}
m_pDelManTrd=AfxBeginThread(CTPDelManFunction,this);
LOG(m_pLog,m_csLog,"CTP started on port "<<port<<" with "<<servers<<" servers\n");
}
CCTPNet::~CCTPNet()
{
LOG(m_pLog,m_csLog,"CTP is shuting down");
// Terminate threads
m_bKill=true;
DWORD time=GetTickCount();
while ((!m_pDeliverTrds.empty()) || !m_pServerTrds.empty() || m_pDelManTrd) {
Sleep(m_Times.uSleepOnDestroy);
// Kill servers, deliverers and delivery manager if they are busy too long
if (GetTickCount()-time>m_Times.uPeriodDestroy) {
CSingleLock locks(&m_csServerTrds);
LOCK(locks);
for (vector<CWinThread*>::iterator it=m_pServerTrds.begin(); it!=m_pServerTrds.end(); it++) {
LOG(m_pLog,m_csLog,"Server thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
TerminateThread((*it)->m_hThread,0);
}
m_pServerTrds.clear();
UNLOCK(locks);
CSingleLock lockd(&m_csDeliverTrds);
LOCK(lockd);
for (it=m_pDeliverTrds.begin(); it!=m_pDeliverTrds.end(); it++) {
LOG(m_pLog,m_csLog,"Deliverer thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
TerminateThread((*it)->m_hThread,0);
}
m_pDeliverTrds.clear();
UNLOCK(lockd);
if (m_pDelManTrd) {
LOG(m_pLog,m_csLog,"Delivery manager thread was stopped forcedly");
TerminateThread(m_pDelManTrd->m_hThread,0);
}
}
}
// Free resources
closesocket(m_SendSocket);
closesocket(m_RecvSocket);
FreeSntCommands();
FreeSessions();
FreeLargeCommands();
FreeDeliveries();
m_Receivers.clear();
delete[] m_pBuffer;
LOG(m_pLog,m_csLog,"CTP stopped");
}
bool CCTPNet::CreateSockets()
{
LOG(m_pLog,m_csLog,"Creating sockets");
CSingleLock lock(&m_csDeliveries);
// Sockets creation
m_SendSocket=socket(AF_INET, SOCK_DGRAM, 0);
m_RecvSocket=socket(AF_INET, SOCK_DGRAM, 0);
if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(0,0,WSAGetLastError(),IPAddr())));
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Failed to create sockets");
m_bSuspended=true;
return false;
}
// Sockets are to support broadcasting
BOOL broadcast=TRUE;
setsockopt(m_SendSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
setsockopt(m_RecvSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
// Binding local address with receiving socket
m_Local.sin_family=AF_INET;
m_Local.sin_port=htons(m_uPort);
m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
{
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(1,0,WSAGetLastError(),IPAddr())));
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Failed to bind receiving socket");
m_bSuspended=true;
return false;
}
return true;
}
void CCTPNet::CheckupOptions(Header& header)
{
if (header.amount>1 && header.options&Options::UniqueCommand) header.options^=Options::UniqueCommand;
if (header.options&Options::StartSession) header.options^=Options::StartSession;
}
bool CCTPNet::Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 options, bool storeiffail)
{
// Arrange header
Header head;
head.amount=sb.GetPacketsCount();
head.command=command;
head.messize=sb.GetDataSize();
head.options=options;
CheckupOptions(head);
// Provide data with headers
for (unsigned int i=0;i<head.amount;i++) {
// Fill rest header information
GetNextID(head,to);
head.size=(unsigned __int16)sb.GetPacketSize(i);
head.number=i;
// Put header in the packet
sb.PutHead(&head,i);
}
// Store message to be sent and pointer to it
SntCommandInfo ci(sb,GetTickCount(),to.Solid);
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
m_SntCommands.push_back(ci);
SntCommandInfoList::iterator curentry=--m_SntCommands.end();
UNLOCK(lock);
// Send packets
for (i=0;i<head.amount;i++) {
if (!SendPacket(sb.GetHeadPtr(i),to.Solid)) {
// Store error information
CSingleLock lock(&m_csDeliveries);
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(2,head.command,WSAGetLastError(),IPAddr(to.Solid))));
UNLOCK(lock);
// Delete from sent packets storage if there was an error and storeiffail equals false
if (!storeiffail) {
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
m_SntCommands.erase(curentry);
UNLOCK(lock);
}
// Exit sending function with return value, which shows error
return false;
}
}
// Exit sending function with return value, which shows success
return true;
}
bool CCTPNet::SendPacket(char* buf, unsigned long to)
{
SOCKADDR_IN recip;
recip.sin_family=AF_INET;
recip.sin_port=htons(m_uPort);
recip.sin_addr.s_addr=to;
Header* head=(Header*)buf;
LOGHA(m_pLog,m_csLog,"Send packet "," to "<<saddr,head,IPAddr(to));
return (sendto(m_SendSocket,(const char*)buf,head->size,0,(SOCKADDR*)&recip,sizeof(recip))!=SOCKET_ERROR);
}
void CCTPNet::FreeSntCommands()
{
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();it++) {
it->Free();
}
m_SntCommands.clear();
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Sent commands storage freed");
}
void CCTPNet::FreeSessions()
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
for (SessionsInfo::iterator it=m_Sessions.begin();it!=m_Sessions.end();it++) {
it->second.received.clear();
}
m_Sessions.clear();
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Sessions information storage freed");
}
void CCTPNet::FreeLargeCommands()
{
CSingleLock lock(&m_csLargeCommands);
LOCK(lock);
for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
it->Free();
if (it->pRD) delete it->pRD;
}
m_LargeCommands.clear();
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Large commands storage freed");
}
void CCTPNet::FreeDeliveries()
{
for (DeliveriesList::iterator it=m_Deliveries.begin();it!=m_Deliveries.end();it++) {
if (it->data) {
if (it->type==DeliveryType::ReceivedData) delete (CCTPReceivedData*)it->data; else
if (it->type==DeliveryType::ErrorInfo) delete (CCTPErrorInfo*)it->data; else
delete it->data;
}
}
m_Deliveries.clear();
LOG(m_pLog,m_csLog,"Deliveries storage freed");
}
bool CCTPNet::SaveRcvPacket(unsigned long from,Header* head)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
SessionInfo& si=GetSessionInfo(IPAddr(from),head->options&Options::Broadcast?true:false);
if (si.received.empty()) {
// First message from corresponding workstation
si.received.push_back(head->id);
} else {
// Find place among already received messages
if (Less(si.received.back(),head->id)) {
si.received.push_back(head->id);
} else {
for (SessionInfo::RcvList::iterator it=si.received.begin();it!=si.received.end();it++) {
// Already has been recieved
if ((*it)==head->id) {
LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
return false;
}
if (Less(head->id,*it)) {
// Already has been recieved
if (it==si.received.begin() && si.minwasset) {
LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
return false;
}
// New one has been recieved
si.received.insert(it,head->id);
break;
}
}
}
// Remove ambigous information
if (si.minwasset) {
while ((si.received.front()+1)==*(++si.received.begin())) {
si.received.pop_front();
}
}
}
// Session was started with this packet
if (head->options&Options::StartSession) {
si.minwasset=true;
LOGA(m_pLog,m_csLog,"Session starting packet with ID:"<<(unsigned int)head->id<<" have been received from "<<saddr,IPAddr(from));
}
UNLOCK(lock);
return true;
}
void CCTPNet::SendConfirmation(unsigned long to,Header header)
{
// Change header
header.command|=m_iConfirm;
header.messize=0;
header.size=GetHeaderSize();
// Send confirmation command
SendPacket((char*)&header,to);
}
void CCTPNet::ConfirmSntPacket(unsigned long to,Header* header)
{
Header* head;
bool erased=false;
// Is command unique
bool unique=(header->options&Options::UniqueCommand)&&(header->amount<=1);
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
erased=false;
head=(Header*)it->sbBody.GetBufferBegin();
// Find appropriate recipient or accept any if message was broadcasted
if (it->ipTo==to || head->options&Options::Broadcast) {
if (!unique) {
// Not unique command
if (head->id==header->id-header->number) {
if (it->Confirm(header->number)) {
SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
it->Free();
m_SntCommands.erase(it);
}
UNLOCK(lock);
return;
}
} else {
// Unique command
if ((head->command^header->command)==m_iConfirm) {
if (it->Confirm(0)) {
SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
it->Free();
it=m_SntCommands.erase(it);
erased=true;
if (it==m_SntCommands.end()) break;
}
}
}
}
// Go to next sent packet
if (!erased) it++;
}
UNLOCK(lock);
}
bool CCTPNet::ArrangeLargeCommand(unsigned long from,Header* head)
{
LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" is a part of large command",head,IPAddr(from));
char* mem=NULL;
CSingleLock lock(&m_csLargeCommands);
LOCK(lock);
// Try to find packets, wich belongs to the same message in received parts
for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
if (it->id==head->id-head->number && it->pRD->from==from && !it->received[head->number]) {
mem=it->pRD->pBuf;
mem+=head->number*m_uPacketDataSize;
memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
if (it->GotPart(head->number)) {
LOG(m_pLog,m_csLog,"Large command arranged");
CSingleLock lock(&m_csDeliveries);
LOCK(lock);
m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ReceivedData),it->pRD));
UNLOCK(lock);
it->Free();
m_LargeCommands.erase(it);
return true;
} else return false;
}
}
// Part of the new message received
LargeCommandInfo lpi(head->command,head->messize,from,head->id,head->amount);
mem=lpi.pRD->pBuf;
mem+=head->number*m_uPacketDataSize;
memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
m_LargeCommands.push_front(lpi);
m_LargeCommands.front().GotPart(head->number);
return false;
}
void CCTPNet::ResendNotConfirmedData()
{
Header* head;
DWORD time=GetTickCount();
bool erased=false;
unsigned int i=0;
// This fleag is used to keep from showing error for all parts of the large
// message
bool wasdead=false;
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
erased=false;
head=(Header*)it->sbBody.GetBufferBegin();
for (i=0; i<it->uCount; i++) if (!it->CI[i].bConfirmed) {
// If timeout have expired
if ((unsigned int)(time-it->CI[i].dwLTime)>it->CI[i].uResend*GetTimeout(it->ipTo,head->options&Options::Broadcast?true:false)) {
if (!(head->options&Options::NoResend)) {
LOGHA(m_pLog,m_csLog,"Packet "," is to be resent to "<<saddr,((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
SendPacket(it->sbBody.GetHeadPtr(i),it->ipTo);
}
it->CI[i].dwLTime=time;
it->CI[i].IncResend();
// Produce error if dead timeout occuired
if (it->CI[i].IsDeadTimeout() && !wasdead){
CSingleLock lock(&m_csDeliveries);
LOCK(lock);
m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ErrorInfo),new CCTPErrorInfo(4,head->command,WSAGetLastError(),IPAddr(it->ipTo))));
UNLOCK(lock);
wasdead=true;
// Erase if needed
if (head->options&Options::DelAfterError) {
LOGHA(m_pLog,m_csLog,"Command, sent to "<<saddr<<", which includes packet ",", is delete from sent commands storage after generating error delivery",((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
m_SntCommands.erase(it);
erased=true;
break;
}
}
}
}
// Go to next element
if (!erased) it++;
}
UNLOCK(lock);
}
void CCTPNet::AddSpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type)
{
if (command==0) return;
for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
if (it->command==command && it->type==type) {
it=m_Receivers.erase(it);
if (it==m_Receivers.end()) break;
} else it++;
}
m_Receivers.push_front(SpecialReceiver(command,receiver,type));
}
void CCTPNet::DeleteSpecialReceiver(NetReceiver* receiver)
{
for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
if (it->receiver==receiver) {
it=m_Receivers.erase(it);
if (it==m_Receivers.end()) break;
} else it++;
}
}
NetReceiver* CCTPNet::GetReceiver(unsigned __int16 command, DeliveryType type)
{
if (command==0) return m_DefReceiver;
for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();it++) {
if (it->command==command && it->type==type) return it->receiver;
}
return m_DefReceiver;
}
CCTPNet::SessionInfo& CCTPNet::GetSessionInfo(IPAddr addr, bool bcast)
{
if (bcast) addr.SetBroadcast();
return m_Sessions.insert(SessionsInfo::value_type(addr.Solid,SessionInfo())).first->second;
}
void CCTPNet::GetNextID(Header& head,IPAddr addr)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
if (m_Sessions.find(addr.Solid)==m_Sessions.end()) head.options|=Options::StartSession;
SessionInfo& si=GetSessionInfo(addr,head.options&Options::Broadcast?true:false);
head.id=++si.id;
}
unsigned int CCTPNet::GetTimeout(IPAddr addr, bool bcast)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
SessionInfo& si=GetSessionInfo(addr,bcast);
if (!(si.timeout)) return m_Times.uDefTimeout; else return si.timeout;
}
void CCTPNet::SetTimeout(IPAddr addr, bool bcast, unsigned int timeout)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
SessionInfo& si=GetSessionInfo(addr,bcast);
if (!(si.timeout)) {
LOGA(m_pLog,m_csLog,"Timeout for session with "<<saddr<<" is considered to be "<<timeout<<" microseconds",addr);
si.timeout=timeout;
}
}
unsigned int CTPServerFunction(void* pNet)
{
CCTPNet* net=(CCTPNet*)pNet;
// Necessary variables
timeval tv;
tv.tv_sec=0;
tv.tv_usec=1;
fd_set fdread;
SOCKADDR_IN sender;
int sendersize=NULL;
CCTPNet::Header* head;
DWORD time=NULL;
DWORD checktime=GetTickCount();
CSingleLock lock(&net->m_csDeliveries);
CSingleLock lockn(&net->m_csNetwork);
for(;;) {
// Wait while suspended
while (net->GetSuspended()) {
// Does killing needed
if (net->m_bKill) {
CSingleLock lock(&net->m_csServerTrds);
LOCK(lock);
net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
return (unsigned int)AfxGetThread();
}
// Sleep a little bit
Sleep(net->GetTimes().uSleepSuspended);
}
// Check for received data
time=GetTickCount();
FD_ZERO(&fdread);
FD_SET(net->m_RecvSocket,&fdread);
LOCK(lockn);
if (select(0,&fdread,NULL,NULL,&tv)>0) {
// Receive data
sendersize=sizeof(sender);
int ret=recvfrom(net->m_RecvSocket,net->m_pBuffer,net->GetPacketDataSize()+net->GetHeaderSize(),0,(SOCKADDR*)&sender,&sendersize);
UNLOCK(lockn);
if (ret==SOCKET_ERROR) {
// Error while receiving
LOCK(lock);
net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetDefaultReceiver(),new CCTPErrorInfo(3,0,WSAGetLastError(),IPAddr(sender.sin_addr.S_un.S_addr))));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Network receiving error");
} else {
if (ret>=net->GetHeaderSize()) {
head=(CCTPNet::Header*)net->m_pBuffer;
LOGHA(net->m_pLog,net->m_csLog,"Packet "," have been received from "<<saddr,head,IPAddr(sender.sin_addr.S_un.S_addr));
if (net->IsConfirmation(head->command)) {
net->ConfirmSntPacket(sender.sin_addr.S_un.S_addr,head);
} else {
if (net->SaveRcvPacket(sender.sin_addr.S_un.S_addr,head)) {
// New packet was got
if (head->amount>1) {
net->ArrangeLargeCommand(sender.sin_addr.S_un.S_addr,head);
} else {
LOCK(lock);
net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetReceiver(head->command,CCTPNet::DeliveryType::ReceivedData),new CCTPReceivedData(head->command,head->messize,sender.sin_addr.S_un.S_addr,net->m_pBuffer+net->GetHeaderSize())));
UNLOCK(lock);
}
}
// Send confimation
net->SendConfirmation(sender.sin_addr.S_un.S_addr,*head);
}
}
}
} else UNLOCK(lockn);
// Does killing needed
if (net->m_bKill) {
CSingleLock lock(&net->m_csServerTrds);
LOCK(lock);
net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
return (unsigned int)AfxGetThread();
}
// Sent packets were not checked long enough
if ((time-checktime)>net->GetTimes().uPeriodCheckResend) {
// Check if resending needed
checktime=GetTickCount();
net->ResendNotConfirmedData();
}
}
}
unsigned int CTPDelManFunction(void* pNet)
{
CCTPNet* net=(CCTPNet*)pNet;
CCTPNet::Delivery del;
CSingleLock lock(&net->m_csDeliverTrds);
for(;;) {
// Does additional delivery threads needed
if (!net->GetSuspended()) {
LOCK(lock);
if (!net->m_Deliveries.empty() && net->m_pDeliverTrds.size()<net->m_uMaxDeliverers && net->m_pDeliverTrds.size()==net->m_uBusy) {
net->m_pDeliverTrds.push_back(AfxBeginThread(CTPDeliverFunction,pNet));
LOG(net->m_pLog,net->m_csLog,"Delivery manager have created deliverer with handle "<<net->m_pDeliverTrds.back()->m_hThread);
}
UNLOCK(lock);
}
// Kill server
if (net->m_bKill) {
net->m_pDelManTrd=NULL;
LOG(net->m_pLog,net->m_csLog,"Delivery manager thread stopped");
return (unsigned int)AfxGetThread();
}
Sleep(net->GetTimes().uSleepDelMan);
}
}
unsigned int CTPDeliverFunction(void* pNet)
{
CCTPNet* net=(CCTPNet*)pNet;
CCTPNet::Delivery del;
// If zero then something was done. If greater then nothing was done
//(greater means doing nothing longer)
bool bNothing=true;
DWORD lastdel=GetTickCount();
CSingleLock lock(&net->m_csDeliveries);
for(;;) {
bNothing=true;
for (;;) {
// Get delivery if exist
LOCK(lock);
if (net->m_Deliveries.empty()) break;
del=net->m_Deliveries.front();
net->m_Deliveries.pop_front();
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Delivery is beeing processed");
// Deliver delivery
net->m_uBusy++;
switch (del.type) {
case CCTPNet::DeliveryType::ReceivedData:
del.target->OnReceive(del.data);
delete (CCTPReceivedData*)del.data;
break;
case CCTPNet::DeliveryType::ErrorInfo:
del.target->OnError(del.data);
delete (CCTPErrorInfo*)del.data;
break;
default:
if (VALID(del.data)) delete del.data;
}
net->m_uBusy--;
lastdel=GetTickCount();
// Thread is working now
bNothing=false;
// If killing needed then do it after unlocking
if (net->m_bKill) break;
}
if (lock.IsLocked()) UNLOCK(lock);
// Does killing needed (because of request or because of sponging)
if (net->m_bKill || (net->m_Deliveries.size()==0 && GetTickCount()-lastdel>net->GetTimes().uPeriodAutoDest)) {
CSingleLock lock(&net->m_csDeliverTrds);
LOCK(lock);
net->m_pDeliverTrds.erase(find(net->m_pDeliverTrds.begin(),net->m_pDeliverTrds.end(),AfxGetThread()));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Deliverer thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
return (unsigned int)AfxGetThread();
}
// Sleep a little bit
if (bNothing) {
Sleep(net->GetTimes().uSleepNothing);
}
}
}