/********************************************************************
File : Demultiplexor.cpp
Creation date : 2010/6/27
License : Copyright 2010 Ahmed Charfeddine, http://www.pushframework.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*********************************************************************/
#include "StdAfx.h"
#include "Demultiplexor.h"
#include "ServerImpl.h"
#include "Dispatcher.h"
#include "Channel.h"
#include "IOCPQueue.h"
#include "../include/Server.h"
namespace PushFramework{
Demultiplexor::Demultiplexor(ServerImpl *pServerImpl)
{
this->pServerImpl = pServerImpl;
}
Demultiplexor::~Demultiplexor(void)
{
}
bool Demultiplexor::start()
{
g_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
g_nThreads = pServerImpl->getWorkerCount();
g_phWorkerThreads = new HANDLE[g_nThreads];
UINT nThreadID;
for ( int i = 0; i < g_nThreads; i++ )
{
g_phWorkerThreads[i] = (HANDLE)_beginthreadex(NULL,// Security
0, // Stack size - use default
KerIOCPWorkerProc, // Thread fn entry point
(void*) this, // Param for thread
0, // Init flag
&nThreadID); // Thread address
if (g_phWorkerThreads[i] == NULL )
{
return false;
}
m_nWorkerCnt++;
//CloseHandle(hWorker);
}
return true;
}
void Demultiplexor::stop()
{
//Ask all worker threads to start shutting down
SetEvent(g_hShutdownEvent);
for (int i = 0; i < g_nThreads; i++)
{
//Help threads get out of blocking - GetQueuedCompletionStatus()
pServerImpl->getIOCPQueue()->PostTerminationSignal();
}
//Let Worker Threads shutdown
WaitForMultipleObjects(g_nThreads, g_phWorkerThreads, TRUE, INFINITE);
}
unsigned __stdcall Demultiplexor::KerIOCPWorkerProc( LPVOID WorkContext )
{
Demultiplexor* pThis = reinterpret_cast<Demultiplexor*>(WorkContext);
pThis->proc();
return 0;
}
void Demultiplexor::proc()
{
//CLoggingFacilities::doLoggingByArgList("Worker %x started..",::GetCurrentThreadId());
void* perThreadCtx = pServerImpl->getFacade()->workerThreadBegin();
DWORD dwIoSize;
CChannel *pPerSocketContext = NULL;
LPOVERLAPPED lpOverlapped;//Passed to GetCompletionStatus to point to the completed operation's buffer
OVERLAPPEDPLUS *pPerIOContext;//OverlapPlus associated to the lpOverlapped returned by GetCompletionStatus(..
ULONG ulFlags = MSG_PARTIAL;
while (WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0)) {
BOOL bSuccess = pServerImpl->getIOCPQueue()->GetQueuedEvent(&dwIoSize,(LPDWORD) &pPerSocketContext,&lpOverlapped);
//CLoggingFacilities::doLoggingByArgList("IO Completion :%d", dwIoSize);
if(!bSuccess)
{
DWORD dwIOError = GetLastError();
LPVOID lpMsgBuf;
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
dwIOError,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR) &lpMsgBuf,
0, NULL );
if (dwIOError!= WAIT_TIMEOUT )
{
if (pPerSocketContext)
{
//Force the removal of this clientEP.
//dwIOError==ERROR_NETNAME_DELETED
//TRACE(_T("[%x]GetQueuedCompletionStatus failed %x sysMsg:%s\n"),::GetCurrentThreadId(),dwIOError,(LPCTSTR)lpMsgBuf);
//pThis->disconnectEndPoint(pPerSocketContext,false,false); //REMOVED
}
}
continue;
}
if (pPerSocketContext==NULL){
// StopWorkers used PostQueuedCompletionStatus to post an I/O packet with
// a NULL CompletionKey (or if we get one for any reason). It is time to exit.
break;
}
//CLoggingFacilities::doLoggingByArgList("Successful IO Completion :%d", dwIoSize);
pPerIOContext = CONTAINING_RECORD(lpOverlapped, OVERLAPPEDPLUS, m_ol);
switch (pPerIOContext->m_ioType)
{
case IORead:
//CLoggingFacilities::doLoggingByArgList("CNetworkEventQueue::IORead %d", dwIoSize);
pServerImpl->getDispatcher()->OnReceiveComplete(pPerSocketContext,dwIoSize);
break;
case IOWrite:
//CLoggingFacilities::doLoggingByArgList("CNetworkEventQueue::IOWrite %d", dwIoSize);
pServerImpl->getDispatcher()->OnWriteComplete(pPerSocketContext,dwIoSize);
break;
case IOInitialize:
pServerImpl->getDispatcher()->OnInitializeReady(pPerSocketContext);
delete pPerIOContext;
break;
case IOUpload:
//pServerImpl->getDispatcher()->OnUploadReady(pPerSocketContext);
delete pPerIOContext;
break;
case IOGarbageCollection:
//CLoggingFacilities::doLoggingByArgList("GC triggered.");
pServerImpl->getDispatcher()->OnStartGC();
delete pPerIOContext;
break;
case IOPerformanceCollection:
//CLoggingFacilities::doLoggingByArgList("Collecting Profiling data.");
pServerImpl->getDispatcher()->OnStartProfiling();
delete pPerIOContext;
break;
default:
break;
}//end of switch
/*
if(pPerIOContext)
delete pPerIOContext; // from previous call*/
//pThis->_incrementIndicator(counter_IOS_ALL); //REMOVED
}//end of while
pServerImpl->getFacade()->workerThreadEnd(perThreadCtx);
}
}