![]() |
General Programming »
Internet / Network »
Beginners
Intermediate
License: The Code Project Open License (CPOL)
A reusable, high performance, socket server class - Part 2By Len HolgateTo maintain performance a socket server shouldn't make blocking calls from its IO threads. This article builds on the previous one to add a business logic thread pool to our example server. |
VC6, VC7Win2K, WinXP, MFC, Dev
|
|
Advanced Search Add to IE Search |
|
|
|
||||||||||||||||
The following source was built using Visual Studio 6.0 SP5 and Visual Studio .NET. You need to have a version of the Microsoft Platform SDK installed
Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output. It's only worth profiling the release builds.
In the previous article we designed a reusable socket server class to make writing high performance socket based servers easy. We presented a series of simple examples, from the humble echo server through to some slightly more real-world packet echo server and a fake POP3 server. This article continues to make the example server more usable in the real-world by adding a business logic thread pool to the server so that messages are processed by a thread that isn't part of the IO thread pool. This helps to maintain the scalability and performance of the server by moving potentially blocking work off into its own thread pool.
To be able to handle variable load it's often useful to have a thread pool that can be expanded and contracted depending on the current load on the server. As we pointed out in the last article, all of our asynchronous socket IO is handled by the socket server's thread pool. The threads in this pool cannot be terminated whilst they have outstanding IO operations or the operations will be terminated. This means that the socket server's thread pool cannot shrink without us keeping track of the IO operations associated with a particular worker thread and only allowing the thread to terminate when all IO operations have completed. To maintain performance we need to make sure that the threads in the socket server's thread pool do not block, there are a finite number of them and if they all block then no socket IO will occur until they unblock. The easiest way to ensure that the IO threads don't block is to move the business logic processing out of the IO thread pool and into a new thread pool. The IO threads then simply handle the IO, chunk the data stream into messages and pass the messages off to the business logic thread pool.
Our requirements for the business logic thread pool are that it should be flexible and capable of increasing and decreasing the number of worker threads as the load on the server dictates. Passing work items into the thread pool should be a non blocking operation so that the IO threads can operate at maximum efficiency but we need to be able to know when a work item hasn't been picked up by a thread within a certain time period so that we can add more threads to the pool. We also need to keep a track off the number of idle threads that we have and, every so often, reduce the number of threads in the pool to conserve resources in times of low server loading.
As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to worker threads. To be able to monitor how long a work item takes to be processed and therefore be able to work out when we need to add more threads to the pool we use an event. When we dispatch a work item to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks up a work item from the completion port the first thing that it does is signal the event. If all threads are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this case we may wish to add another thread to the pool to deal with the work load. The dispatch code could look something like this:
void CThreadPool::HandleDispatch( ULONG_PTR completionKey, DWORD dwNumBytes, OVERLAPPED *pOverlapped) { m_dispatchCompleteEvent.Reset(); bool processed = false; m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped); // wait for someone to toggle the 'got message' event? bool threadStarted = false; while (!processed) { DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis); if (result == WAIT_OBJECT_0) { processed = true; } else if (result == WAIT_TIMEOUT) { if (!threadStarted && m_processingThreads == m_activeThreads && (size_t)m_activeThreads < m_maxThreads) { StartWorkerThread(); threadStarted = true; } } else { throw CWin32Exception(_T("CThreadPool::Dispatch()"), GetLastError()); } } }
Whilst there are threads available to process the work items we don't need to start new threads. As soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we're not already running with the maximum number of threads that we've been configured for, we start a new thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the timeout when we're already running at our maximum number of threads. The dispatcher needs to know how many threads we have in the pool and how many of those threads are processing so each worker thread calls back to the thread pool to let the pool know what state it's in.
The problem with this piece of work item dispatch code is that it doesn't fulfill our requirement of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add another level of indirection, and another IO Completion Port.
To ensure that users wishing to dispatch a work item to the thread pool can do so without blocking we implement the user level dispatch function as follows:
void CThreadPool::Dispatch( ULONG_PTR completionKey, DWORD dwNumBytes /*= 0*/, OVERLAPPED *pOverlapped /*= 0*/) { if (completionKey == 0) { throw CException(_T("CThreadPool::Dispatch()"), _T("0 is an invalid value for completionKey")); } m_dispatchPort.PostStatus(completionKey, dwNumBytes, pOverlapped); }
The restriction on 0 valued completion keys is unfortunate but allows us to shut down the thread pool's
dispatch thread by posting a 0 to its completion port. The thread pool now has two IO Completion Ports. The
dispatch port is serviced by a single maintenance thread which executes the HandleDispatch()
method to dispatch work items to the worker threads. Users dispatch without blocking and the maintenance thread
dispatches in a blocking manner so that it can expand the thread pool when it needs to. The work item
port is serviced by a variable number of threads. We've seen how we know when we need to expand the
number of threads, now we'll look at how we reduce the number of threads when the work load is low.
Often work items come in batches, the thread pool gets busy, expands, services all of the work items and then becomes less busy. At this point the pool contains threads which aren't being used but which are still consuming resources. These dormant threads can be safely shutdown as the pool can expand again as load increases. The question is, how do we decide when to shut down some threads?
The maintenance thread that handles our blocking dispatch also handles checking for dormant threads. Every so often (a configurable amount) the maintenance thread uses an algorithm to determine if it should shut some threads down. The current algorithm is as follows:
void CThreadPool::HandleDormantThreads() { if ((size_t)m_activeThreads > m_minThreads) { const size_t dormantThreads = m_activeThreads - m_processingThreads; if (dormantThreads > m_maxDormantThreads) { const size_t threadsToShutdown = (dormantThreads - m_maxDormantThreads) / 2 + 1; StopWorkerThreads(threadsToShutdown); } } }
If we have more threads than the minimum number we're allowed to have, find out how many threads aren't currently processing work items and if that number is more than the number of dormant threads that we're allowed to have, shut half of them down (rounding up). Stopping worker threads is a simple case of posting an IO completion key of 0 to the work port for each worker thread that we want to shut down.
We now have a thread pool that fulfills our requirements of automatic expansion and contraction
depending upon load and non blocking dispatch for users. The remaining thing to do is allow the derived
class to provide its own WorkerThread class to do the work. The worker thread class must implement the
following interface:
virtual bool Initialise(); virtual void Process( ULONG_PTR completionKey, DWORD dwNumBytes, OVERLAPPED *pOverlapped) = 0; virtual void Shutdown();
Initialise() is called when it's first created, Shutdown() is called when
the thread is terminating and Process() is called for each work item.
Now that we have a suitable thread pool we can integrate it with our fake POP3 socket server so that the actual processing of commands can occur in the business logic pool whilst the IO pool is left to get on with the IO operations. We can also move socket closure off to the business logic pool so that we don't block the IO threads with a lingering socket close.
The first thing we need to do is create and configure our thread pool. Then we can pass a reference to it to our socket server class so that it can pass a reference to it to our IO threads.
CThreadPool pool(
5, // initial number of threads to create
5, // minimum number of threads to keep in the pool
10, // maximum number of threads in the pool
5, // maximum number of "dormant" threads
5000, // pool maintenance period (millis)
100, // dispatch timeout (millis)
10000); // dispatch timeout for when pool is at max threads
pool.Start();
CSocketServer server(
INADDR_ANY, // address to listen on
5001, // port to listen on
10, // max number of sockets to keep in the pool
10, // max number of buffers to keep in the pool
1024, // buffer size
pool);
server.Start();
When our socket server has a complete, distinct, message to process it can dispatch it to the thread pool for processing, rather than processing it on one of its IO threads.
void CSocketServer::ProcessCommand( CSocketServer::Socket *pSocket, CIOBuffer *pBuffer) { pSocket->AddRef(); pBuffer->AddRef(); m_pool.Dispatch(reinterpret_cast<ULONG_PTR>(pSocket), 0, pBuffer->GetAsOverlapped()); }
Since we're passing the socket and IO buffer to another thread we have to increment their reference counts so that they don't get cleared up from underneath us. Over in our business logic thread we can finally process the message, and then release the references we took on the socket and IO buffer.
void CThreadPoolWorkerThread::Process( ULONG_PTR completionKey, DWORD operation, OVERLAPPED *pOverlapped) { Socket *pSocket = reinterpret_cast<Socket *>(completionKey); CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped); ProcessMessage(pSocket, pBuffer); pSocket->Release(); pBuffer->Release(); }
Since the socket class marshals all IO requests back to the IO thread pool we can safely make read and write requests from our business logic thread even though the thread may be terminated before the IO requests completes.
The final thing that our server may need to do is associate some internal server state with a
particular socket connection, the Socket class makes this particularly easy as it provides
the following member functions:
void *GetUserPtr() const; void SetUserPtr(void *pData); unsigned long GetUserData() const; void SetUserData(unsigned long data);
These provide access to a single void * user data pointer which is stored in the
Socket. The common usage pattern for this user data is as follows: When the connection is
established the socket server is notified by OnConnectionEstablished(), the server can
allocate a new per-connection data structure and associate it with the socket passed to
OnConnectionEstablished() by calling SetUserPtr(), in subsequent
read and write completions the pointer to the per-connection user data structure can be extracted with
GetUserPtr(). When the connection is terminated the server is notified by
OnConnectionClosed and the per-connection user data can be retrieved and deleted.
Although there are two versions of the user data access functions, one for a void * and
one for an unsigned long there is only a single storage location. The two versions are
merely for convenience and to reduce casting if the user data required is simply an index into an
internal server structure rather than a pointer.
The example server marshals the OnConnectionEstablished() and
OnConnectionClosed() calls across to the business logic thread pool and maintains some
fairly trivial per-connection user data there. The data we maintain is the address of the client
connection (obtained from the buffer passed into OnConnectionEstablished() and the number
of messages that have been processed on this particular connection.
The shell of a POP3 server which performs its business logic processing in a separate
thread pool to
its IO can be downloaded from
here.
The server has a call to
::Sleep() within its message processing code so that the processing takes some time and
blocks. Notice how the IO on other connections is unaffected by this, and, if you want, add a similar
call to the server we developed at the end of the last article and compare the behavior.
As with the other examples, simply telnet to localhost 5001 to test the server. The server runs until a named event is set and then shuts down. The very simple Server Shutdown program, available here, provides the off switch.
CSocket to protect from resource leaks when creating the listening socket. Refactored the Socket and
CIOBuffer classes so that common list management code is now in CNodeList and common user data code is now in
COpaqueUserData.ReuseAddress() during the creation of the listening socket as it not required - Thanks to Alun Jones for pointing this out to me. SocketServer can now be set to ensure read and write packet sequences.CSocketServer::ProcessDataStream(). We were
reusing the buffer when we
shouldn't have been. Code was fine up until the changes on 30th June and is fine again now. Thanks to an anonymous CodeProject reader for pointing this out to me.
General
News
Question
Answer
Joke
Rant
Admin
Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads.
|
PermaLink |
Privacy |
Terms of Use
Last Updated: 17 Aug 2002 Editor: Chris Maunder |
Copyright 2002 by Len Holgate Everything else Copyright © CodeProject, 1999-2010 Web21 | Advertise on the Code Project |