#include "StdAfx.h"
#include "dtipsocket.h"
Datatal::DtIpSocket::DtIpSocket(void)
{
m_sdClient = INVALID_SOCKET;
m_nRemotePort = NULL;
strcpy(m_szRemoteHost, "127.0.0.1");
//Create events
m_hNewDataEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
m_bDisableNagle = false;
m_bDisableRecvBuf = false;
m_bDisableSendBuf = false;
}
Datatal::DtIpSocket::~DtIpSocket(void)
{
WSACleanup();
}
void Datatal::DtIpSocket::Connect()
{
if (m_szRemoteHost[0] == NULL)
throw DtSocketException(ERROR_INVALID_DATA, "Connect failed", "Remotehost was not specified");
if (m_nRemotePort == NULL)
throw DtSocketException(ERROR_INVALID_DATA, "Connect failed", "Remove port was not specified");
DisableNagle();
try
{
Connect(m_szRemoteHost, m_nRemotePort);
}
catch (DtSocketException& ex)
{
WriteLog(LP_HIGH, "Connect", "Connect failed: %s", ex.ToString());
Disconnect();
}
}
void Datatal::DtIpSocket::DisableNagle()
{
m_bDisableNagle = true;
}
/// Disable winsocks internal sendbuffer and use own buffering only.
void Datatal::DtIpSocket::DisableSendBuffer()
{
m_bDisableSendBuf = true;
}
/// Disable winsocks internal recvbuffer and use own buffering only.
void Datatal::DtIpSocket::DisableRecvBuffer()
{
m_bDisableRecvBuf = true;
}
void Datatal::DtIpSocket::SetRemoteHost(const char* szHostName)
{
if (strlen(szHostName) > 128)
return;
strcpy(m_szRemoteHost, szHostName);
}
void Datatal::DtIpSocket::SetRemotePort(int nRemotePort)
{
m_nRemotePort = nRemotePort;
}
//Init everything. Start our workerthread when we are done
void Datatal::DtIpSocket::Connect(const char* szHostName, int nRemotePort)
{
DWORD dwFlags; //socket flags
struct sockaddr_in SockAddr; //used to map ipbased address
struct hostent FAR *pHostEnt; //used to map stringbased address
HANDLE hConnect; //Temporary connect event
HANDLE hEvents[2]; //Create a array of the events to wait for:
DWORD dwRes = 0;
if (GetState() == DTSS_CONNECTING)
return;
if (GetState() == DTSS_CONNECTED)
return;
if (GetState() == DTSS_ERROR)
Disconnect();
SetState(DTSS_CONNECTING);
SetRemoteHost(szHostName);
SetRemotePort(nRemotePort);
WSADATA Wsa;
if ( int i = WSAStartup(MAKEWORD(2,0), &Wsa) )
{
SetState(DTSS_DISCONNECTED);
throw DtSocketException(i, "Connect failed", "WSAStartup failed");
}
//Create the socket
dwFlags = WSA_FLAG_OVERLAPPED;
if (m_sdClient == INVALID_SOCKET)
{
m_sdClient = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, NULL, WSA_FLAG_OVERLAPPED);
if (m_sdClient == INVALID_SOCKET)
{
SetState(DTSS_DISCONNECTED);
throw DtSocketException(WSAGetLastError(), "Connect failed", "WSASocket failed.");
}
}
//Set our port
SockAddr.sin_family=AF_INET;
SockAddr.sin_port=htons((u_short)m_nRemotePort);
if (m_szRemoteHost[0]>='0' && m_szRemoteHost[0]<='9')
{
//It's an address of type xxx.xxx.xxx.xxx
SockAddr.sin_addr.s_addr=inet_addr(m_szRemoteHost);
if (SockAddr.sin_addr.s_addr==INADDR_NONE)
{
char szError[256];
sprintf(szError, "Connect, inet_addr(%s) failed", m_szRemoteHost);
SetState(DTSS_DISCONNECTED);
throw DtSocketException(GetLastError(), "Connect failed", szError);
}
}
else
{
if ((pHostEnt=gethostbyname(m_szRemoteHost))==NULL)
{
char szError[256];
sprintf(szError, "Connect, gethostbyname(%s) failed", m_szRemoteHost);
SetState(DTSS_DISCONNECTED);
throw DtSocketException(GetLastError(), "Connect failed", szError);
}
else
memcpy(&SockAddr.sin_addr,pHostEnt->h_addr,pHostEnt->h_length);
}
//Create the event for Socket Connect operations
hConnect = WSACreateEvent();
if ( !hConnect )
{
SetState(DTSS_DISCONNECTED);
throw DtSocketException(GetLastError(), "Connect failed", "WSACreateEvent() for hConnect failed.");
}
//wait for connect or stop
hEvents[0] = m_hStopEvent;
hEvents[1] = hConnect;
ResetEvent(hConnect);
if (m_bDisableNagle)
{
INT option_value = 1;
if (setsockopt(m_sdClient,SOL_SOCKET, TCP_NODELAY, (char *)&option_value, sizeof(option_value)) == SOCKET_ERROR )
{
WriteLog(1, "Connect", "Failed to disable Nagle algoritm: %d", WSAGetLastError());
}
} // if (m_bDisableNagle)
if (m_bDisableSendBuf)
{
int nLen = 0;
dwRes = setsockopt (m_sdClient, SOL_SOCKET, SO_SNDBUF, (char*)&nLen, sizeof (nLen));
if (dwRes != 0)
throw DtSocketException(0, "Socket buffer", "Failed to disable winsock outbuffer");
} // if (m_bDisableRecvBuf)
if (m_bDisableRecvBuf)
{
int nLen = 0;
dwRes = setsockopt (m_sdClient, SOL_SOCKET, SO_RCVBUF, (char*)&nLen, sizeof (nLen));
if (dwRes != 0)
throw DtSocketException(0, "Socket buffer", "Failed to disable winsock inbuffer");
}
//Select which events who will Signal our Event
if(WSAEventSelect( m_sdClient, hConnect, FD_CONNECT|FD_CLOSE ) == SOCKET_ERROR)
{
SetState(DTSS_DISCONNECTED);
throw DtSocketException(WSAGetLastError(), "Connect failed", "WSACreateEvent() for hConnect failed.");
}
//Connect to Host...
if (WSAConnect( m_sdClient, (const struct sockaddr *)&SockAddr , sizeof(SockAddr), NULL, NULL, NULL, NULL ) == SOCKET_ERROR)
{
DWORD dwError = (DWORD)WSAGetLastError();
if (dwError == WSAEWOULDBLOCK)
{
dwRes = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE);
switch(dwRes)
{
case WAIT_OBJECT_0:
WriteLog(Datatal::LP_LOW, "Connect", "Die event was signalled");
WSACloseEvent(hConnect);
if (IsThreadRunning()) StopThread();
return;
case WAIT_OBJECT_0 + 1:
WriteLog(Datatal::LP_NORMAL, "Connect", "WSAConnect() OK!");
break;
default:
WSACloseEvent(hConnect);
SetState(DTSS_DISCONNECTED);
throw DtSocketException(GetLastError(), "Connect failed", "WaitForMultipleObjects() failed.");
}
}
else
{
SetState(DTSS_ERROR);
throw DtSocketException(WSAGetLastError(), "Connect failed", "WSAConnect failed.");
}
}
WSANETWORKEVENTS NetworkEvents;
memset(&NetworkEvents, 0, sizeof(NetworkEvents));
if ( WSAEnumNetworkEvents ( m_sdClient, hConnect, &NetworkEvents) == SOCKET_ERROR)
{
SetState(DTSS_ERROR);
throw DtSocketException(WSAGetLastError(), "Connect failed", "WSAEnumNetworkEvents() failed.");
}
if (NetworkEvents.lNetworkEvents & FD_CONNECT)
{
if (NetworkEvents.iErrorCode[FD_CONNECT_BIT] == 0)
{
if (!IsThreadRunning()) StartThread();
SetState(DTSS_CONNECTED);
HandleConnect();
return;
}
else
{
SetState(DTSS_ERROR);
if (!IsThreadRunning() && GetReconnect()) StartThread(); // Start thread if reconnect is set.
throw DtSocketException(NetworkEvents.iErrorCode[FD_CONNECT_BIT], "Connect failed", "FD_CONNECT failed.");
}
}
if (NetworkEvents.lNetworkEvents & FD_CLOSE)
{
SetState(DTSS_DISCONNECTED);
throw DtSocketException(NetworkEvents.iErrorCode[FD_CLOSE_BIT], "Connect failed", "FD_CLOSE set!");
}
SetState(DTSS_ERROR);
}
bool Datatal::DtIpSocket::Disconnect(bool bStopThread)
{
Datatal::DtSocketBase::Disconnect(bStopThread);
// Not connected = dont do anything.
if (m_sdClient != INVALID_SOCKET)
{
shutdown(m_sdClient, SD_BOTH);
closesocket( m_sdClient );
m_sdClient = INVALID_SOCKET;
} // if (m_bConnected)
return true;
}
void Datatal::DtIpSocket::ThreadFunc(HANDLE hStopEvent)
{
//Async operations
OVERLAPPED osReader; // Read operations
OVERLAPPED osWriter; // Write operations
bool bCanRun = true; // Set to false if server should stop
bool bPendingRead = false; //Pending read operations
bool bPendingWrite = false; //Pending write operations
bool bNewData = false; // Got new data to send.
bool bReadCompleted = false; // true if a read have been completed.
DWORD dwReadbytes = 0; //Number of bytes read
WSABUF wsaInBuf; //Temporary buffer used for reading
WSABUF wsaOutBuf; //Tempbuf used for writing
DWORD dwRes; //Resultcode
DWORD dwFlags; //WSARecv/WSASend flags
DWORD dwWritten; //Number of bytes that have been written
HANDLE hEvents[4] = {NULL, NULL, NULL, NULL};
BOOL bSkipWait = false;
// Create the event for Socket Read operations
memset(&osReader, 0, sizeof(OVERLAPPED));
osReader.hEvent = WSACreateEvent();
if ( !osReader.hEvent )
{
WriteLog(1, "misc", "WSACreateEvent() for osReader failed! Error %d.",GetLastError());
return;
}
// Create the event for Socket Write operations
memset(&osWriter, 0, sizeof(OVERLAPPED));
osWriter.hEvent = WSACreateEvent();
if ( !osWriter.hEvent )
{
WriteLog(1, "misc", "WSACreateEvent() for osWriter failed! Error %d.",GetLastError());
return;
}
//Init our buffers
wsaInBuf.buf = new char[_SOCKETLIB_WORK_SIZE_];
wsaInBuf.len = _SOCKETLIB_WORK_SIZE_;
memset(wsaInBuf.buf, 0, _SOCKETLIB_WORK_SIZE_);
wsaOutBuf.buf = new char[_SOCKETLIB_WORK_SIZE_];
memset(wsaOutBuf.buf, 0, _SOCKETLIB_WORK_SIZE_);
wsaOutBuf.len = 0;
hEvents[0] = hStopEvent;
hEvents[1] = m_hNewDataEvent;
hEvents[2] = osWriter.hEvent;
hEvents[3] = osReader.hEvent;
//Should only run if we can
while (bCanRun)
{
// If an error occured, wait for user action.
if (GetState() == DTSS_ERROR || GetState() == DTSS_CONNECTING)
{
WriteLog(LP_NORMAL, "Error", "In wainting loop");
if (GetReconnect() && GetState() == DTSS_ERROR)
{
WriteLog(LP_NORMAL, "Error", "Disconnecting due to error");
Disconnect();
}
dwRes = WaitForSingleObject(m_hStopEvent, 1000);
if (dwRes == WAIT_OBJECT_0)
bCanRun = false;
continue;
}
// We've been disconnected, wait until disconnect is done and then connect.
if (GetState() == DTSS_DISCONNECTED)
{
bReadCompleted = false;
bPendingRead = false;
bPendingWrite = false;
bNewData = false;
WriteLog(LP_NORMAL, "Error", "In waiting loop2");
//Check if we should reconnect.
if (GetReconnect())
{
WriteLog(LP_NORMAL, "Connect", "Trying to (re)connect...");
Connect();
}
if (!IsConnected())
{
/// Let's wait 15 seconds before we try again.
WriteLog(3, "misc", "ThreadFunc, Waiting 15 seconds or on die event..");
dwRes = WaitForSingleObject(m_hStopEvent, 15000);
if (dwRes == WAIT_OBJECT_0)
bCanRun = false;
continue;
} // if (!IsConnected())
}
if (bCanRun && IsConnected())
{
//Wait if we got some data, else init a read.
if (bPendingRead)
dwRes = WaitForMultipleObjects(4, hEvents, FALSE, INFINITE);
else
dwRes = 1000; //used to invoke a WSARecv.
DWORD dwBytesWritten = 0;
switch(dwRes)
{
//stopEvent
//=======================================================
case WAIT_OBJECT_0:
bCanRun = false;
continue;
break;
// SendData event
//=======================================================
case WAIT_OBJECT_0 + 1:
WriteLog(3, "Send", "New data triggered.");
ResetEvent(m_hNewDataEvent);
bNewData = true;
break;
// Overlapped write is done.
//=======================================================
case WAIT_OBJECT_0 + 2:
WSAGetOverlappedResult(m_sdClient, &osWriter, &dwBytesWritten, FALSE, &dwFlags);
WriteLog(Datatal::LP_LOW, "Send", "Write completed, %d bytes", dwBytesWritten);
ResetEvent(osWriter.hEvent);
HandleSendComplete();
bPendingWrite = false;
if (m_lOutBuffers.pFirst)
bNewData = true;
break;
//osreader, pending read is completed
//=======================================================
case WAIT_OBJECT_0 + 3:
WriteLog(3, "Read", "Read completed.");
ResetEvent(osReader.hEvent);
bReadCompleted = true;
break;
case 1000:
break;
default:
WriteLog(1, "misc", "ThreadFunc, Incorrect dwRes: %d, error: %d", dwRes, GetLastError());
break;
}
if (!bCanRun)
break;
// no pending read. Initiate one.
dwReadbytes = 0;
if (!bPendingRead)
{
dwFlags = 0;
WriteLog(Datatal::LP_LOW, "Read", "WSARecv");
memset(wsaInBuf.buf, 0, _SOCKETLIB_WORK_SIZE_);
dwRes = WSARecv(m_sdClient, &wsaInBuf, 1, &dwReadbytes , &dwFlags , &osReader, NULL);
if (dwRes == SOCKET_ERROR)
{
//What Error?
DWORD dwError = WSAGetLastError();
if (dwError != ERROR_IO_PENDING)
{
char szLog[256];
sprintf(szLog, "WSARecv() failed! Error %d.", dwError);
SetState(DTSS_ERROR);
HandleError(dwError, szLog);
continue;
}
bPendingRead = true;
bReadCompleted = false;
WriteLog(Datatal::LP_LOW, "Read", "pending read...");
} //if (dwRes == SOCKET_ERROR)
else
{
// 0 bytes recieved = closed socket
if (!dwReadbytes)
{
SetState(DTSS_ERROR);
HandleError(0, "0 bytes recieved, closing socket...");
continue;
}
bPendingRead = false;
bReadCompleted = true;
WriteLog(Datatal::LP_LOW, "Read", "Completed directly %d bytes", dwReadbytes);
}
}
// No pending writes and new data to send.
// =======================================================================
if (bReadCompleted)
{
//If a read was not completed directly, dwReadBytes = 0, fetch overlapped result.
if (dwReadbytes == 0)
{
dwFlags = 0;
WriteLog(Datatal::LP_LOW, "Read", "WSAGetOverlappedResult");
if(!WSAGetOverlappedResult(m_sdClient, &osReader, &dwReadbytes, FALSE, &dwFlags))
{
char szLog[512];
sprintf(szLog, "GetOverlappedResult() failed on Read! Bytes read is %d. Flags=%d.", dwReadbytes, dwFlags);
SetState(DTSS_ERROR);
HandleError(GetLastError(), szLog);
continue;
}
}
//Closed?
if ( !dwReadbytes )
{
SetState(DTSS_ERROR);
HandleError(WSAECONNRESET, "Socket closed by Host when waiting for Read..");
continue;
}
//Lock our inputbuffer
m_CritRead.Lock();
WriteLog(Datatal::LP_NORMAL, "Read", "%d bytes read", dwReadbytes);
HandleReceive(wsaInBuf.buf, dwReadbytes);;
m_CritRead.Unlock();
bReadCompleted = false;
bPendingRead = false;
} //if (!bPendingRead && bReadCompleted)
// No pending writes and new data to send.
// =======================================================================
if (!bPendingWrite && bNewData)
{
//Lock buffer and fetch data
m_CritWrite.Lock();
if (m_lOutBuffers.pFirst && !bPendingWrite)
{
Outbuffer* pBuffer = m_lOutBuffers.pFirst;
if (pBuffer->nSize - m_nBufferPos <= _SOCKETLIB_WORK_SIZE_)
{
memcpy(wsaOutBuf.buf, pBuffer->pBuffer + m_nBufferPos, pBuffer->nSize - m_nBufferPos);
wsaOutBuf.len = (int)pBuffer->nSize - m_nBufferPos;
m_lOutBuffers.RemoveFirst();
WriteLog(Datatal::LP_LOW, "Send", "First/Last outbuffers: %X/%X", m_lOutBuffers.pFirst, m_lOutBuffers.pLast);
m_nBufferPos = 0;
}
else
{
memcpy(wsaOutBuf.buf, pBuffer->pBuffer + m_nBufferPos, _SOCKETLIB_WORK_SIZE_);
m_nBufferPos += _SOCKETLIB_WORK_SIZE_;
wsaOutBuf.len = _SOCKETLIB_WORK_SIZE_;
}
dwWritten = 0;
dwFlags = 0;
if (WSASend(m_sdClient, &wsaOutBuf, 1, &dwWritten , dwFlags , &osWriter, NULL) == SOCKET_ERROR)
{
//Overlapped?
if (GetLastError() != ERROR_IO_PENDING)
{
m_CritWrite.Unlock();
SetState(DTSS_ERROR);
HandleError(WSAGetLastError(), "WSASend() failed!");
continue;
}
bPendingWrite = true; //only set pending write if we do not complete directly.
}
else
{
// 0 bytes recieved = closed socket
if (!dwWritten)
{
SetState(DTSS_ERROR);
HandleError(WSAECONNRESET, "0 bytes sent");
m_CritWrite.Unlock();
continue;
}
HandleSendComplete();
}
}
m_CritWrite.Unlock();
bNewData = false;
} //if (!bPendingWrite && bNewData)
} //valid socket
} // while
if (wsaInBuf.buf)
delete[] wsaInBuf.buf;
if (wsaOutBuf.buf)
delete[] wsaOutBuf.buf;
WSACloseEvent(osWriter.hEvent);
WSACloseEvent(osReader.hEvent);
}