|
|||||||||||||||||||||
|
|||||||||||||||||||||
|
Announcements
Chapters
Services
Feature Zones
|
The following source was built using Visual Studio 6.0 SP5 and Visual Studio .NET. You need to have a version of the Microsoft Platform SDK installed Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output. It's only worth profiling the release builds.
OverviewIn the previous article we designed a reusable socket server class to make writing high performance socket based servers easy. We presented a series of simple examples, from the humble echo server through to some slightly more real-world packet echo server and a fake POP3 server. This article continues to make the example server more usable in the real-world by adding a business logic thread pool to the server so that messages are processed by a thread that isn't part of the IO thread pool. This helps to maintain the scalability and performance of the server by moving potentially blocking work off into its own thread pool. Why do we need another thread poolTo be able to handle variable load it's often useful to have a thread pool that can be expanded and contracted depending on the current load on the server. As we pointed out in the last article, all of our asynchronous socket IO is handled by the socket server's thread pool. The threads in this pool cannot be terminated whilst they have outstanding IO operations or the operations will be terminated. This means that the socket server's thread pool cannot shrink without us keeping track of the IO operations associated with a particular worker thread and only allowing the thread to terminate when all IO operations have completed. To maintain performance we need to make sure that the threads in the socket server's thread pool do not block, there are a finite number of them and if they all block then no socket IO will occur until they unblock. The easiest way to ensure that the IO threads don't block is to move the business logic processing out of the IO thread pool and into a new thread pool. The IO threads then simply handle the IO, chunk the data stream into messages and pass the messages off to the business logic thread pool. A business logic thread poolOur requirements for the business logic thread pool are that it should be flexible and capable of increasing and decreasing the number of worker threads as the load on the server dictates. Passing work items into the thread pool should be a non blocking operation so that the IO threads can operate at maximum efficiency but we need to be able to know when a work item hasn't been picked up by a thread within a certain time period so that we can add more threads to the pool. We also need to keep a track off the number of idle threads that we have and, every so often, reduce the number of threads in the pool to conserve resources in times of low server loading. As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to worker threads. To be able to monitor how long a work item takes to be processed and therefore be able to work out when we need to add more threads to the pool we use an event. When we dispatch a work item to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks up a work item from the completion port the first thing that it does is signal the event. If all threads are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this case we may wish to add another thread to the pool to deal with the work load. The dispatch code could look something like this: void CThreadPool::HandleDispatch( ULONG_PTR completionKey, DWORD dwNumBytes, OVERLAPPED *pOverlapped) { m_dispatchCompleteEvent.Reset(); bool processed = false; m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped); // wait for someone to toggle the 'got message' event? bool threadStarted = false; while (!processed) { DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis); if (result == WAIT_OBJECT_0) { processed = true; } else if (result == WAIT_TIMEOUT) { if (!threadStarted && m_processingThreads == m_activeThreads && (size_t)m_activeThreads < m_maxThreads) { StartWorkerThread(); threadStarted = true; } } else { throw CWin32Exception(_T("CThreadPool::Dispatch()"), GetLastError()); } } } Whilst there are threads available to process the work items we don't need to start new threads. As soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we're not already running with the maximum number of threads that we've been configured for, we start a new thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the timeout when we're already running at our maximum number of threads. The dispatcher needs to know how many threads we have in the pool and how many of those threads are processing so each worker thread calls back to the thread pool to let the pool know what state it's in. The problem with this piece of work item dispatch code is that it doesn't fulfill our requirement of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add another level of indirection, and another IO Completion Port. Non blocking dispatchTo ensure that users wishing to dispatch a work item to the thread pool can do so without blocking we implement the user level dispatch function as follows: void CThreadPool::Dispatch( ULONG_PTR completionKey, DWORD dwNumBytes /*= 0*/, OVERLAPPED *pOverlapped /*= 0*/) { if (completionKey == 0) { throw CException(_T("CThreadPool::Dispatch()"), _T("0 is an invalid value for completionKey")); } m_dispatchPort.PostStatus(completionKey, dwNumBytes, pOverlapped); } The restriction on 0 valued completion keys is unfortunate but allows us to shut down the thread pool's
dispatch thread by posting a 0 to its completion port. The thread pool now has two IO Completion Ports. The
dispatch port is serviced by a single maintenance thread which executes the Shutting down dormant threadsOften work items come in batches, the thread pool gets busy, expands, services all of the work items and then becomes less busy. At this point the pool contains threads which aren't being used but which are still consuming resources. These dormant threads can be safely shutdown as the pool can expand again as load increases. The question is, how do we decide when to shut down some threads? The maintenance thread that handles our blocking dispatch also handles checking for dormant threads. Every so often (a configurable amount) the maintenance thread uses an algorithm to determine if it should shut some threads down. The current algorithm is as follows: void CThreadPool::HandleDormantThreads() { if ((size_t)m_activeThreads > m_minThreads) { const size_t dormantThreads = m_activeThreads - m_processingThreads; if (dormantThreads > m_maxDormantThreads) { const size_t threadsToShutdown = (dormantThreads - m_maxDormantThreads) / 2 + 1; StopWorkerThreads(threadsToShutdown); } } } If we have more threads than the minimum number we're allowed to have, find out how many threads aren't currently processing work items and if that number is more than the number of dormant threads that we're allowed to have, shut half of them down (rounding up). Stopping worker threads is a simple case of posting an IO completion key of 0 to the work port for each worker thread that we want to shut down. Doing the workWe now have a thread pool that fulfills our requirements of automatic expansion and contraction
depending upon load and non blocking dispatch for users. The remaining thing to do is allow the derived
class to provide its own virtual bool Initialise(); virtual void Process( ULONG_PTR completionKey, DWORD dwNumBytes, OVERLAPPED *pOverlapped) = 0; virtual void Shutdown();
A socket server with a business logic thread poolNow that we have a suitable thread pool we can integrate it with our fake POP3 socket server so that the actual processing of commands can occur in the business logic pool whilst the IO pool is left to get on with the IO operations. We can also move socket closure off to the business logic pool so that we don't block the IO threads with a lingering socket close. The first thing we need to do is create and configure our thread pool. Then we can pass a reference to it to our socket server class so that it can pass a reference to it to our IO threads.
CThreadPool pool(
5, // initial number of threads to create
5, // minimum number of threads to keep in the pool
10, // maximum number of threads in the pool
5, // maximum number of "dormant" threads
5000, // pool maintenance period (millis)
100, // dispatch timeout (millis)
10000); // dispatch timeout for when pool is at max threads
pool.Start();
CSocketServer server(
INADDR_ANY, // address to listen on
5001, // port to listen on
10, // max number of sockets to keep in the pool
10, // max number of buffers to keep in the pool
1024, // buffer size
pool);
server.Start();
When our socket server has a complete, distinct, message to process it can dispatch it to the thread pool for processing, rather than processing it on one of its IO threads. void CSocketServer::ProcessCommand( CSocketServer::Socket *pSocket, CIOBuffer *pBuffer) { pSocket->AddRef(); pBuffer->AddRef(); m_pool.Dispatch(reinterpret_cast<ULONG_PTR>(pSocket), 0, pBuffer->GetAsOverlapped()); } Since we're passing the socket and IO buffer to another thread we have to increment their reference counts so that they don't get cleared up from underneath us. Over in our business logic thread we can finally process the message, and then release the references we took on the socket and IO buffer. void CThreadPoolWorkerThread::Process( ULONG_PTR completionKey, DWORD operation, OVERLAPPED *pOverlapped) { Socket *pSocket = reinterpret_cast<Socket *>(completionKey); CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped); ProcessMessage(pSocket, pBuffer); pSocket->Release(); pBuffer->Release(); } Since the socket class marshals all IO requests back to the IO thread pool we can safely make read and write requests from our business logic thread even though the thread may be terminated before the IO requests completes. Maintaining per-connection stateThe final thing that our server may need to do is associate some internal server state with a
particular socket connection, the void *GetUserPtr() const; void SetUserPtr(void *pData); unsigned long GetUserData() const; void SetUserData(unsigned long data); These provide access to a single Although there are two versions of the user data access functions, one for a The example server marshals the The complete exampleThe shell of a POP3 server which performs its business logic processing in a separate
thread pool to
its IO can be downloaded from
here.
The server has a call to
As with the other examples, simply telnet to localhost 5001 to test the server. The server runs until a named event is set and then shuts down. The very simple Server Shutdown program, available here, provides the off switch. Revision history
Other articles in the series | ||||||||||||||||||||