// QoSExampleServer.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include "QoSClientFactory.h"
#include "QoSService.h"
#include <process.h>
#define nMaxPacket 200
#define nMaxReceiveBuffer 8192
#define nMaxSendBuffer 8192*4
#define nMaxBytesPerSend 8192
#define nMaxBytesPerceive 8192
#define nBroadcastThreshold -1
#define nSmallPrioritPacket 5000
#define nMiddlePriorityPacket 6000
#define nHigPriorityPacket -1
#define nEmergencyBufferSize 8192
class MyServer : public PushFramework::Server
{
private:
virtual PushFramework::OutgoingPacket* getTestPacket()
{
CQoSPacket* pPacket = new CQoSPacket;
pPacket->setPacketId(1);
pPacket->setChannelSourceId(1);
return pPacket;
}
virtual void disposeOutgoingPacket(PushFramework::OutgoingPacket* pPacket)
{
CQoSPacket* pQoSPacket = (CQoSPacket*) pPacket;
delete pQoSPacket;
}
};
MyServer server;
HANDLE m_hEventAbort;
typedef struct threadPushCtxt
{
unsigned int channelId;
unsigned int pushRate;//Packets per seconds.
}threadPushCtxt;
void startThread(unsigned int channelId, unsigned int pushRate);
void PushPacket(unsigned int channelId, unsigned int uPacketId);
static unsigned __stdcall threadProc(LPVOID WorkContext);
class Scenario
{
public:
virtual void CreateBroadcastingGroups() = 0;
virtual void CreatePublishers() = 0;
};
class Scenario1 :public Scenario
{
public:
virtual void CreateBroadcastingGroups()
{
server.getBroadcastManager()->createChannel("1", 100, false, 10, 10);
server.getBroadcastManager()->createChannel("2", 100, false, 10, 10);
}
virtual void CreatePublishers()
{
startThread(1, 10000);
startThread(2, 10000);
}
};
class Scenario2 :public Scenario
{
public:
virtual void CreateBroadcastingGroups()
{
server.getBroadcastManager()->createChannel("1", 100, false, 10, 10);
server.getBroadcastManager()->createChannel("2", 100, false, 5, 10);
server.getBroadcastManager()->createChannel("3", 100, false, 5, 20);
server.getBroadcastManager()->createChannel("4", 100, false, 5, 5);
}
virtual void CreatePublishers()
{
startThread(1, 10000);
startThread(2, 10000);
startThread(3, 10000);
startThread(4, 10000);
}
};
class Scenario3 :public Scenario
{
public:
virtual void CreateBroadcastingGroups()
{
server.getBroadcastManager()->createChannel("1", 100, false, 5, 10);
server.getBroadcastManager()->createChannel("2", 100, false, 5, 20);
server.getBroadcastManager()->createChannel("3", 100, false, 5, 5);
}
virtual void CreatePublishers()
{
startThread(1, 10000);
startThread(2, 10000);
startThread(3, 10000);
}
};
int _tmain(int argc, _TCHAR* argv[])
{
server.setServerInfos("QoS server");
server.registerService(1, new CQoSService, "service");
server.setClientFactory(new CQoSClientFactory);
server.setLoginExpiryDuration(100);
server.setProtocol(new QoSExampleProtocol);
PushFramework::ListenerOptions options;
options.uSendBufferSize = nMaxBytesPerSend;
options.uReadBufferSize = nMaxBytesPerceive;
options.uIntermediateSendBufferSize = nMaxSendBuffer;
options.uIntermediateReceiveBufferSize = nMaxReceiveBuffer;
server.createListener(2010, &options);
//server.setWorkerCount(1);
Scenario* pScenario = new Scenario3();
pScenario->CreateBroadcastingGroups();
try
{
server.start(true);
}
catch (std::exception& e)
{
cout << "Failed to start server. Exception : \n" << e.what() << std::endl;
return 0;
}
//Abort event.
m_hEventAbort = CreateEvent(NULL, TRUE, FALSE, NULL);
ResetEvent(m_hEventAbort);
pScenario->CreatePublishers();
int ch;
do
{
ch = _getch();
ch = toupper(ch);
} while (ch !='Q');
/*
SetEvent(m_hEventAbort);
CloseHandle(m_hEventAbort);
Sleep(100);*/
server.stop();
delete pScenario;
return 0;
}
void startThread(unsigned int channelId, unsigned int pushRate)
{
threadPushCtxt* pCtxt = new threadPushCtxt;
pCtxt->channelId = channelId;
pCtxt->pushRate = pushRate;
UINT nThreadID;
HANDLE hThread = (HANDLE)_beginthreadex(NULL,// Security
0, // Stack size - use default
threadProc, // Thread fn entry point
pCtxt, // Param for thread
0, // Init flag
&nThreadID);
}
void PushPacket(unsigned int channelId, unsigned int uPacketId)
{
//Now Pump Packets into channels :
CQoSPacket* pPacket = new CQoSPacket;
char pChannelName[10];
sprintf(pChannelName, "%d", channelId);
pPacket->setPacketId(uPacketId);
pPacket->setChannelSourceId(channelId);
server.getBroadcastManager()->pushPacket(pPacket, pChannelName);
}
static unsigned __stdcall threadProc(LPVOID WorkContext)
{
threadPushCtxt* pCtxt = reinterpret_cast<threadPushCtxt*>(WorkContext);
unsigned int uPacketID = 1;
//Periodic Timer
HANDLE m_hEventTimer = CreateEvent(NULL, TRUE, FALSE, NULL);
ResetEvent(m_hEventTimer);
//
HANDLE hWaits[2];
hWaits[0] = m_hEventTimer;
hWaits[1] = m_hEventAbort;
UINT sleepTimeMs=100;//1 second.
int nPacketsPerDeciSecond = pCtxt->pushRate / 10;
//
while (true)
{
PushPacket(pCtxt->channelId, uPacketID);
uPacketID++;
//Sleep(50);
}
return 0;
while(true)
{
MMRESULT result = timeSetEvent(sleepTimeMs, 1,
(LPTIMECALLBACK) m_hEventTimer, 0, TIME_ONESHOT|TIME_CALLBACK_EVENT_SET);
_ASSERT(result != NULL);
// Sleep here until the timer interval ends or abort occurs
if(WaitForMultipleObjects(2, hWaits, FALSE, INFINITE)==0)
{
for(int i=0;i<nPacketsPerDeciSecond;i++){
PushPacket(pCtxt->channelId, uPacketID);
uPacketID++;
}
ResetEvent(m_hEventTimer);
}
else
{
//m_hEventAbort is set : break from while.
break;
}
//
}
CloseHandle(m_hEventTimer);
//quit blocking.
return 0;
}