#include "StdAfx.h"
#include "BroadcastManagerImplBase.h"
#include "..\include\Protocol.h"
#include "..\include\OutgoingPacket.h"
#include "BroadcastChannel.h"
namespace PushFramework{
BroadcastManagerImplBase::BroadcastManagerImplBase(void)
{
pChainHead = NULL;
}
BroadcastManagerImplBase::~BroadcastManagerImplBase(void)
{
delete pChainHead;
}
void BroadcastManagerImplBase::createChannel( WStrKey channelKey, unsigned int maxPacket, bool requireSubscription, unsigned int uPriority, unsigned int uPacketQuota )
{
BroadcastChannel* pQueue = new BroadcastChannel(this, channelKey, requireSubscription);
pQueue->setMaxPackets(maxPacket);
BroadcastInfo* pBroadcastInfo = new BroadcastInfo;
pBroadcastInfo->pPushInfo = insertChannelIntoOrderedChain(channelKey, uPriority, uPacketQuota);
pBroadcastInfo->pBroadcastChannel = pQueue;
channelMap[channelKey] = pBroadcastInfo;
}
void BroadcastManagerImplBase::removeChannel( WStrKey channelKey )
{
channelMapT::iterator it = channelMap.find(channelKey);
if(it!=channelMap.end()){
BroadcastInfo* pBroadcastInfo = it->second;
delete pBroadcastInfo;
channelMap.erase(it);
}
}
bool BroadcastManagerImplBase::subscribeClient( ClientKey clientKey, WStrKey channelKey )
{
channelMapT::iterator it = channelMap.find(channelKey);
if(it==channelMap.end())
return false;
//
BroadcastChannel* packetQueue = it->second->pBroadcastChannel;
packetQueue->subscribeClient(clientKey);
return true;
}
bool BroadcastManagerImplBase::unsubscribeClient( ClientKey clientKey, WStrKey channelKey )
{
channelMapT::iterator it = channelMap.find(channelKey);
if(it==channelMap.end())
return false;
//
BroadcastChannel* packetQueue = it->second->pBroadcastChannel;
packetQueue->unsubscribeClient(clientKey);
return true;
}
void BroadcastManagerImplBase::pushPacket( OutgoingPacket* pPacket, WStrKey channelName, WStrKey killKey, int objectCategory )
{
channelMapT::iterator it = channelMap.find(channelName);
if(it==channelMap.end())
return;
PreEncodeOutgoingPacket(pPacket);
ReportOnBeforePushPacket(channelName);
//
BroadcastChannel* packetQueue = it->second->pBroadcastChannel;
packetQueue->pushPacket(pPacket, killKey, objectCategory);
ActivateSubscribers(channelName);
}
void BroadcastManagerImplBase::removePacket( WStrKey killKey, int objectCategory, WStrKey channelKey )
{
channelMapT::iterator it = channelMap.find(channelKey);
if(it==channelMap.end())
return;
//
BroadcastChannel* packetQueue = it->second->pBroadcastChannel;
packetQueue->removePacket(killKey, objectCategory);
}
OutgoingPacket* BroadcastManagerImplBase::getNextPacket( ClientKey client, double& hPacket, std::string& channelKey )
{
OutgoingPacket* pPacket = NULL;
sinkMapT::iterator sIt = sinkMap.find(client);
ClientSink *pSinkInfo = NULL;
if (sIt!=sinkMap.end())
pSinkInfo = sIt->second;
else{
pSinkInfo = new ClientSink();
sinkMap[client] = pSinkInfo;
}
if (pSinkInfo->pCurrentSinkChannel)
{
pPacket = getPacketFromGroupChain(pSinkInfo->pCurrentSinkChannel, client, hPacket, channelKey);
if (pPacket)
return pPacket;
}
//Nothing is found, so go up!
ChannelGroupPushInfo* pGroup = pChainHead;
while (pGroup)
{
pPacket = getPacketFromGroupChain(pGroup->pItemList, client, hPacket, channelKey);
if (pPacket)
return pPacket;
pGroup = pGroup->pNextGroup;
}
return NULL;
//
/*
for (channelMapT::iterator it = channelMap.begin();
it!=channelMap.end();
it++)
{
BroadcastChannel* packetQueue = it->second;
channelKey = it->first;
OutgoingPacket* pPacket = packetQueue->getNextPacket(client, hPacket);
if(pPacket!=NULL)
return pPacket;
}*/
return NULL;
}
void BroadcastManagerImplBase::disposePacket( double hPacket, std::string channelKey, std::string clientKey, bool bSuccess )
{
channelMapT::iterator it = channelMap.find(channelKey);
if(it==channelMap.end())
return;
ReportOnAfterPacketIsSent(channelKey, clientKey);
//
BroadcastInfo* pBroadcastInfo = it->second;
pBroadcastInfo->pBroadcastChannel->disposePacket(hPacket, clientKey, bSuccess);
//big aspect!
if (bSuccess)
{
//Now the fun :
ClientSink* pClientSinkInfo = sinkMap[clientKey];
if (pClientSinkInfo->pCurrentSinkChannel == pBroadcastInfo->pPushInfo)
{
pClientSinkInfo->nSentPackets++;
if (pClientSinkInfo->nSentPackets == pBroadcastInfo->pPushInfo->uPacketQuota)
{
if (pBroadcastInfo->pPushInfo->pNext)
{
pClientSinkInfo->pCurrentSinkChannel = pBroadcastInfo->pPushInfo->pNext;
pClientSinkInfo->nSentPackets = 0;
}
else{
pClientSinkInfo->pCurrentSinkChannel = NULL;
pClientSinkInfo->nSentPackets = 0;
}
}
}
else
{
pClientSinkInfo->nSentPackets = 1;
pClientSinkInfo->pCurrentSinkChannel = pBroadcastInfo->pPushInfo;
}
}
}
void BroadcastManagerImplBase::removeClient( ClientKey clientKey )
{
for (channelMapT::iterator it = channelMap.begin();
it!=channelMap.end();
it++)
{
BroadcastChannel* pChannel = it->second->pBroadcastChannel;
pChannel->unsubscribeClient(clientKey);
}
}
ChannelPushInfo* BroadcastManagerImplBase::insertChannelIntoOrderedChain( std::string channelName, unsigned int uPriority, unsigned int uPacketQuota )
{
ChannelPushInfo* pChannelInfo = new ChannelPushInfo(channelName, uPacketQuota);
//if pChainHead is Null, create one and insert channel there.
if (pChainHead == NULL)
{
pChainHead = new ChannelGroupPushInfo(uPriority);
pChainHead->insertChannelInfo(pChannelInfo);
return pChannelInfo;
}
//Search for group between
ChannelGroupPushInfo* pBefore = NULL;
ChannelGroupPushInfo* pAfter = pChainHead;
while(pAfter!=NULL)
{
if (uPriority >= pAfter->uPriority)
{
break;
}
else
{
//Update before and next :
pBefore = pAfter;
pAfter = pAfter->pNextGroup;
}
}
ChannelGroupPushInfo* pParentGroup = NULL;
if (pAfter==NULL)
{
//
ChannelGroupPushInfo* pNewGroup = new ChannelGroupPushInfo(uPriority);
pBefore->pNextGroup = pNewGroup;
pParentGroup = pNewGroup;
}
else if (uPriority == pAfter->uPriority)
{
pParentGroup = pAfter;
}
else
{
ChannelGroupPushInfo* pNewGroup = new ChannelGroupPushInfo(uPriority);
pNewGroup->pNextGroup = pAfter;
if (pBefore)
{
pBefore->pNextGroup = pNewGroup;
}
else
{
pChainHead = pNewGroup;
}
pParentGroup = pNewGroup;
}
pParentGroup->insertChannelInfo(pChannelInfo);
return pChannelInfo;
}
OutgoingPacket* BroadcastManagerImplBase::getPacketFromGroupChain( ChannelPushInfo* pStartAt , ClientKey client, double& hPacket, std::string& channelKey )
{
BroadcastChannel* packetQueue = NULL;
OutgoingPacket* pPacket = NULL;
ChannelPushInfo* pNextChannelInfo = pStartAt;
while (pNextChannelInfo)
{
channelKey = pNextChannelInfo->channelName;
packetQueue = channelMap[pNextChannelInfo->channelName]->pBroadcastChannel;
pPacket = packetQueue->getNextPacket(client, hPacket);
if(pPacket!=NULL)
return pPacket;
pNextChannelInfo = pNextChannelInfo->pNext;
}
return NULL;
}
void BroadcastManagerImplBase::disposeAllPackets()
{
for (channelMapT::iterator it = channelMap.begin();
it!=channelMap.end();
it++)
{
BroadcastChannel* pChannel = it->second->pBroadcastChannel;
pChannel->disposeAllPackets();
}
}
std::string BroadcastManagerImplBase::getChannelsNames()
{
std::stringstream ss;
ss << std::noskipws;
for (channelMapT::iterator it = channelMap.begin();
it!=channelMap.end();
it++)
{
ss << "<channel value=\"" << it->first << "\"/>";
}
return ss.str();
}
}