// SocketThread.cpp : implementation file
//
#include "stdafx.h"
#include "IPtraffic.h"
#include "SocketThread.h"
#include "IPtrafficDlg.h"
#include "URL.h"
#include <fstream>
#include <iostream>
using namespace std;
// CSqlThread
IMPLEMENT_DYNCREATE(CSqlThread, CWinThread)
CSqlThread::CSqlThread()
{
m_iThreadTimerLiveMax=15;//Seconds
m_pCIPtrafficDlg=NULL;
m_pSQLDatabase=NULL;
m_bRun=true; //update sql While
pURLrs=NULL;
}
CSqlThread::~CSqlThread()
{
}
BOOL CSqlThread::InitInstance()
{ BOOL bRet=FALSE;
AFX_MANAGE_STATE(AfxGetStaticModuleState());
m_iThreadTimeStart=CTime::GetCurrentTime();
m_pMutex->Lock();
try{
*m_pbRun+=1;
m_iThreadNo=*m_pThreadNO;
m_FileSafe=m_pCIPtrafficDlg->m_sExePath+"SQLRouterList.txt";
getLastData(m_FileSafe);
DeleteFile(m_FileSafe);
m_FileSafeTmp.Format("%sSQLRouterList%0d.txt",m_pCIPtrafficDlg->m_sExePath,m_iThreadNo);
bRet=TRUE;
}catch(...){
Log(CString("Thread InitInstance() Error!"));
}
m_pMutex->Unlock();
return bRet;
}
bool CSqlThread::getLastData(CString sFileSafe)
{
try{
ifstream cIn(sFileSafe,ios::in);
while(cIn)
{ CString sIn;
cIn.getline(sIn.GetBuffer(128),128,'\r');
sIn.ReleaseBuffer();
sIn.Replace("\n","");
m_RouterTelnetList.AddHead(sIn);
}
cIn.close();
}catch(...){
Log(CString("getLastData() Error!:")+sFileSafe);
return false;
}
return true;
}
bool CSqlThread::theLastResort(CString sFileSafe)
{bool bRet=false;
try{
if(m_RouterTelnetList.GetCount())
{
ofstream cOut(sFileSafe,ios::app);
while(m_RouterTelnetList.GetCount())
{
CString s=m_RouterTelnetList.RemoveHead();
s.Trim();
cOut << s.GetBuffer(128) <<"\r\n";
s.ReleaseBuffer();
}
}
bRet=true;
}catch(...)
{
Log(CString("bool CSqlThread::theLastResort() Error! ")+sFileSafe);
}
return bRet;
}
int CSqlThread::ExitInstance()
{
AFX_MANAGE_STATE(AfxGetStaticModuleState());
m_pMutex->Lock();
*m_pbRun-=1;
m_pMutex->Unlock();
try{
if(pURLrs)delete pURLrs;pURLrs=NULL;
if(m_pSQLDatabase){
if(m_pSQLDatabase->IsOpen())m_pSQLDatabase->Close();
delete m_pSQLDatabase;
m_pSQLDatabase=NULL;
}
}catch(...)
{
Log(CString("Thread ExitInstance() Error!"));
}
theLastResort(m_FileSafe);
return CWinThread::ExitInstance();
}
BEGIN_MESSAGE_MAP(CSqlThread, CWinThread)
END_MESSAGE_MAP()
// CSqlThread message handlers
int CSqlThread::Run()
{
AFX_MANAGE_STATE(AfxGetStaticModuleState());
m_pMutex->Lock();
while(m_pCIPtrafficDlg->m_RouterTelnetList.GetCount())
{
m_RouterTelnetList.AddTail(m_pCIPtrafficDlg->m_RouterTelnetList.RemoveHead());
}
m_pMutex->Unlock();
try{
theLastResort(m_FileSafeTmp); //create the SQLRouterList.txt file
getLastData(m_FileSafeTmp); //re-read the data
updateSQL();
DeleteFile(m_FileSafeTmp);
}catch(...)
{
Log(CString("Thread updateSQL() Error!"));
}
PostThreadMessage(WM_QUIT,0,0);
// AfxEndThread(0,FALSE);
return CWinThread::Run();
}
void CSqlThread::Log(LPCSTR pszText)
{
AFX_MANAGE_STATE(AfxGetStaticModuleState());
int l=lstrlen(pszText);
LPSTR lparam=new char[l+1];
lstrcpy(lparam,(LPSTR)pszText);
::PostMessage(m_pCIPtrafficDlg->GetSafeHwnd(),WM_COMMAND,WM_TO_LOG_FROM_THREAD,(LPARAM)lparam);
}
bool CSqlThread::updateSQL()
{ AFX_MANAGE_STATE(AfxGetStaticModuleState());
int iCntSQL=0;
while(!m_RouterTelnetList.IsEmpty() && *m_pbRun>0)
{ CString sLine=m_RouterTelnetList.GetHead();
if(!openDB())
{
return false;
}
int iP=sLine.Find("{");
if(sLine.GetLength()>20 && iP>=0)
{CString sDate,sSrcIP,sDstIP,sPackets,sBytes;
CString sDnsSrcIP,sDnsDestIP;
//{ts 'YYYY-MM-DD [HH:MM:SS.FFF]}'
iP=sLine.Find("}",iP+1);
if(iP>0)
{
sDate =sLine.Left(iP+1);
sLine=sLine.Mid(iP+1);
}else continue;
//Source IP
sLine=sLine.TrimLeft();
sSrcIP=sLine.SpanIncluding("1234567890.");
if(sLine.GetLength()==sSrcIP.GetLength())continue;
sLine=sLine.Mid(sSrcIP.GetLength());
//Destination IP
sLine=sLine.TrimLeft();
sDstIP=sLine.SpanIncluding("1234567890.");
if(sLine.GetLength()==sDstIP.GetLength())continue;
sLine=sLine.Mid(sDstIP.GetLength());
//Packets
sLine=sLine.TrimLeft();
sPackets=sLine.SpanIncluding("1234567890");
if(sLine.GetLength()==sPackets.GetLength())continue;
sLine=sLine.Mid(sPackets.GetLength());
//Bytes
sLine=sLine.TrimLeft();
sBytes=sLine.SpanIncluding("1234567890");
// DNS resolve
if(m_bIPresolveDNS)
resolveIP2Name(sSrcIP,&sDnsSrcIP);
else{
if(!ResolveName(sSrcIP,sDnsSrcIP))resolveIP2Name(sSrcIP,&sDnsSrcIP);
}
if(m_bIPresolveDNS)
resolveIP2Name(sDstIP,&sDnsDestIP);
else{
if(!ResolveName(sDstIP,sDnsDestIP))resolveIP2Name(sDstIP,&sDnsDestIP);
}
sDnsSrcIP.Replace("\'","\'\'");
sDnsDestIP.Replace("\'","\'\'");
// result:
/*
CREATE TABLE [dbo].[RAIN_TRAFFIC] (
[ID] [bigint] IDENTITY (1, 1) NOT NULL ,
[IPS] [char] (15) COLLATE Cyrillic_General_CI_AS NOT NULL ,
[IPD] [char] (15) COLLATE Cyrillic_General_CI_AS NOT NULL ,
[PACKETS] [int] NOT NULL ,
[BYTES] [int] NOT NULL ,
[RDATE] [datetime] NOT NULL ,
[SNAME] [varchar] (256) COLLATE Cyrillic_General_CI_AS NULL ,
[DNAME] [varchar] (256) COLLATE Cyrillic_General_CI_AS NULL
) ON [PRIMARY]
*/
sLine="insert into RAIN_TRAFFIC(IPS,IPD,PACKETS,BYTES,RDATE,SNAME,DNAME) VALUES(";
sLine=sLine +"\'"+sSrcIP+"\',"
+"\'"+sDstIP+"\',"
+sPackets+","
+sBytes+","
+sDate+"," //ts 'YYYY-MM-DD [HH:MM:SS.FFF]'
+"\'"+sDnsSrcIP+"\',"
+"\'"+sDnsDestIP+"\'";
sLine+=")";
if(execSQL(sLine)){
::PostMessage(m_pCIPtrafficDlg->GetSafeHwnd(),WM_SQL_LINES,0,(LPARAM)iCntSQL);
::PostMessage(m_pCIPtrafficDlg->GetSafeHwnd(),WM_DB_CNT,0,(LPARAM)(m_RouterTelnetList.GetCount()));
// Remove only if Inserted Ok!!!
m_RouterTelnetList.RemoveHead();
}
}//if(sLine.GetLength()>20 && iP>=0)
else{
m_RouterTelnetList.RemoveHead();
}
}
Log(_T("Start Exec splitDataFast"));
execSQL(_T("Exec splitDataFast"));
Log(_T("Finish Exec splitDataFast"));
::PostMessage(m_pCIPtrafficDlg->GetSafeHwnd(),WM_DB_CNT,0,(LPARAM)(m_RouterTelnetList.GetCount()));
return true;
}
bool CSqlThread::ResolveName(LPCSTR pstrSrcIP,CString &sDnsIP)
{ bool bFound=false;
try{
if(pURLrs){delete pURLrs;pURLrs=NULL;}
pURLrs=new CURLrs(pstrSrcIP,m_pSQLDatabase);
if(pURLrs->Open())
{
if(!pURLrs->IsBOF() && !pURLrs->IsEOF())
{
pURLrs->m_URL.Trim();
sDnsIP=pURLrs->m_URL;
if(sDnsIP=="")
{
sDnsIP=pstrSrcIP;
}else{
bFound=true;
}
}
}
pURLrs->Close();
delete pURLrs;pURLrs=NULL;
}catch(CDBException *e){
Log(CString("\'")+pstrSrcIP+"\'; Error:"+LPCSTR(e->m_strError));
if(pURLrs){delete pURLrs;pURLrs=NULL;}
return false;
}
return bFound;
}
bool CSqlThread::execSQL(CString sSQL)
{
if(m_pSQLDatabase)
{
try
{
m_pSQLDatabase->ExecuteSQL(sSQL);
return true;
}catch(CDBException *e){
Log(CString("\'")+sSQL+"\'; Error:"+LPCSTR(e->m_strError));
delete m_pSQLDatabase;
m_pSQLDatabase=NULL;
return false;
}catch(...){
Log(CString("\'")+sSQL+"\'; Error!!!");
delete m_pSQLDatabase;
m_pSQLDatabase=NULL;
return false;
}
}
return false;
}
BOOL CSqlThread::WaitWithMessageLoop(HANDLE hEvent)
{
DWORD dwRet;
MSG msg;
while(1)
{
dwRet = MsgWaitForMultipleObjects( 1, // One event to wait for
&hEvent, // The array of events
FALSE, // Wait for 1 event
INFINITE, // Timeout value
QS_ALLINPUT); // Any message wakes up
if(dwRet == WAIT_OBJECT_0)
{
// The event was signaled, return
return TRUE;
} else if(dwRet == WAIT_OBJECT_0 + 1)
{
// There is a window message available. Dispatch it.
while(PeekMessage(&msg,NULL,NULL,NULL,PM_REMOVE))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
} else
{
// Something else happened. Return.
return FALSE;
}
}
}
bool CSqlThread::openDB()
{CString sConnDB;
try{
sConnDB=m_pCIPtrafficDlg->m_sConnectString+
_T(";UID=")+m_pCIPtrafficDlg->m_dbUser+
_T(";PWD=")+m_pCIPtrafficDlg->m_dbUserPassword;
// _T(";SERVER=")+m_pCIPtrafficDlg->m_dbServer;
if(!m_pSQLDatabase)m_pSQLDatabase=new CDatabase();
m_pSQLDatabase->SetQueryTimeout(300);
if(!m_pSQLDatabase->IsOpen())
{
if(m_pSQLDatabase->OpenEx(sConnDB,CDatabase::noOdbcDialog))
{
return true;
}else{
Log(LPCSTR(" Can not CDatabase.OpenEx()1"));
}
}else
return true;
}catch(CDBException *e){
//AfxMessageBox( e->m_strError,MB_ICONEXCLAMATION );
Log(CString("Connect string:\'")+sConnDB+"\'; Error:"+LPCSTR(e->m_strError));
e->Delete();
}catch(CMemoryException *e){
Log(">(Memory overflow)");
e->Delete();
}
if(m_pSQLDatabase)
{
if(m_pSQLDatabase->IsOpen())m_pSQLDatabase->Close();
delete m_pSQLDatabase;
m_pSQLDatabase=NULL;
}
return false;
}
bool CSqlThread::resolveIP2Name(LPCSTR pstrIP,CString *pName)
{
bool bRet=false;
// CSocket sck;
// if(sck.Create(27015,SOCK_DGRAM,pstrIP))
// {
// sck.GetSockName(
// };
// return bRet;
*pName=pstrIP;
//-----------------------------------------
// Declare and initialize variables
struct sockaddr_in saGNI;
// #define NI_MAXHOST 1025
// #define NI_MAXSERV 32
char hostName[NI_MAXHOST];
char servInfo[NI_MAXSERV];
u_short port;
port = 27015;
//-----------------------------------------
// Set up sockaddr_in structure which is passed
// to the getnameinfo function
saGNI.sin_family = AF_INET;
saGNI.sin_addr.s_addr = inet_addr(pstrIP);
saGNI.sin_port = htons(port);
//-----------------------------------------
// Call getnameinfo
int retVal;
*pName=pstrIP;
memset(hostName,'\0',sizeof(hostName));
if ((retVal = getnameinfo((SOCKADDR *)&saGNI,
sizeof(sockaddr),
hostName,
NI_MAXHOST,
servInfo,
NI_MAXSERV,
NI_NUMERICSERV)) != 0)
{
CString s;
s.Format("getnameinfo() failed:%ld",WSAGetLastError());
Log(s);
*pName=pstrIP;
}else{
bRet=true;
*pName=hostName;
}
return bRet;
}