The following source was built using Visual Studio 6.0 SP5 and Visual Studio .Net. You need to have
a version of the Microsoft Platform
SDK installed
Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output.
It's only worth profiling the release builds.
Overview
"How do you handle the problem of multiple pending WSARecv() calls?" is a common question on
the Winsock news groups. It seems that everyone knows that it's often a good idea to have more than
one outstanding read waiting on a socket and everyone's equally aware that sometimes code doesn't work
right when you do that. This article explains the potential problems with multiple pending recvs and
provides a solution within the reusable server framework that we've been developing over the last few
articles.
That's out of order
There is a subtle issue when using IO completion ports with multiple threads. Although operations
using the IO completion port will always complete in the order that they were submitted, thread scheduling
issues may mean that the actual work associated with the completion is processed in an undefined order.
For example, if we were to submit three WSARecv requests on a socket then they are
guaranteed to complete in the order that we submitted them, however if we have 2 threads servicing the IO completion
port two of the completions could be being processed simultaneously. If the thread processing the 'first'
WSARecv completion is interrupted the second may be completely processed before the first.
This is even more likely to occur on machines with multiple processors where the two threads may really
be executing simultaneously, but it's possible on single processor boxes too. As always, this is the kind
of subtle problem that probably wont show its face until you release the software to production...
The example above is easy to avoid, simply don't have multiple WSARecv requests outstanding
on a single socket. This is what we have done so far in the example servers developed in the previous articles.
However this reduces performance, it's always more performant to have a receive pending when the data
actually arrives on the wire than it is to post a receive after the data has already arrived. Having
multiple WSARecv calls outstanding ensures that there's always a call pending. What's more,
the problem isn't limited to having multiple WSARecvs. In our server framework we marshal all
socket IO calls from the user's threads into our IO thread pool using the IO completion port. This means
that there is the potential for a user thread to issue multiple consecutive writes to the socket and for
them to be executed in an undefined order.
In our example servers so far, code like this:
void CSocketServer::ReadCompleted(
Socket *pSocket,
CIOBuffer *pBuffer)
{
pSocket->Write(pBuffer); pSocket->Write(pResponse1); pSocket->Write(pResponse2); }
is potentially unsafe because the writes don't occur synchronously on the user's thread, they are posted
to the IO completion port and occur in the IO thread pool.
Preserving the order of IO completion operations is relatively straight forward. As you'll remember, the
overlapped structure passed to all calls that use IO completion ports represents 'per call' data. We can,
and do, extend the overlapped structure to include our own 'per call' data by using the
CIOBuffer class.
If we add a sequence number to the CIOBuffer we can set the sequence number to the 'next' value in
the user's thread and then make sure we process the buffers in order in the IO thread pool. This concept
applies to any IO completion port operation and each distinct operation requires its own sequence number.
For our server framework that means that our Socket class must now maintain independent sequence numbers
for read and write requests.
The sequence number management code inside the Socket's Write method could be something like this:
pBuffer->SetSequenceNumber(m_writeSequenceNumber++);
To ensure that the sequence numbers actually represent the order that the operations are submitted requires
that the setting of the sequence number and the submission of the operation are an atomic operation. For
our socket writes this isn't a problem as we only guarentee the order of writes that are performed on a single thread, for socket reads we need to ensure that the allocation of the sequence number and call to
WSARecv() occur without another thread having a chance to perform read at the same time. This
involves using a critical section to lock access to the socket during the sequence number allocation and
WSARecv() call. Failure to lock in this area can lead to the actual order that the
WSARecv() calls are made failing to match the ordering of the sequence numbers allocated.
Orderly processing
The code to ensure that the IO completions are handled in order is a little more complex. For each distinct
IO operation we need to keep track of the next sequence number that we can process. When a call to
GetQueuedCompletionStatus() returns we need to compare the sequence number in the request with
the next sequence number that we can process. If these numbers match then we can process the request. If they
don't then the request cannot be processed at this time. If an IO operation cannot be processed it should be
stored for later processing. The storage of the out of sequence request needs to be keyed on the sequence number. When an IO thread finds that it can't process the current request it should add the current request
to the store and see if there's a request in the store that can be processed. When a request is processed
the last thing that the IO thread should do is atomically increment the value representing the next sequence
number to process and check to see if there's an IO request in the store that can be processed.
The above strategy handles the situation where multiple IO requests complete concurrently. Only one
thread can be processing an IO request that meets the criteria of being the next one to process, all
other threads will simply add their requests to the store. When the thread that's processing a request finishes processing it can check to see if there are other requests in the store that can now be processed.
If a thread needs to store its IO request then it can do so and then check for a request that can be processed
in an atomic operation.
It's actually more complex to read about than it is to look at, the code to process an operation
in order might look like this:
pBuffer = pSocket->m_outOfSequenceWrites.GetNext(pBuffer);
while(pBuffer)
{
DWORD dwFlags = 0;
DWORD dwSendNumBytes = 0;
if (SOCKET_ERROR == ::WSASend(
pSocket->m_socket,
pBuffer->GetWSABUF(),
1,
&dwSendNumBytes,
dwFlags,
pBuffer,
NULL))
{
}
pBuffer = pSocket->m_outOfSequenceWrites.ProcessAndGetNext();
}
The store itself needs to map sequence numbers to CIOBuffers. The obvious choice of data
structure is a std::map<> though your performance requirements and profiling may
dictate a different choice. GetNext() takes a buffer, compares its sequence number with the
next one we can process and either returns the buffer or adds the buffer to the map and then checks the
map to see if the first buffer in the map is the one we can process. Remember that the map stores its
elements in order of their keys and that we're using the sequence number as the key, so
m_list.begin() refers to the element in the map with the lowest sequence number. If
this function returns null then we're still waiting for the 'next' buffer to arrive.
CIOBuffer *CIOBuffer::InOrderBufferList::GetNext(
CIOBuffer *pBuffer)
{
CCriticalSection::Owner lock(m_criticalSection);
if (m_next == pBuffer->GetSequenceNumber())
{
return pBuffer;
}
BS::value_type value(pBuffer->GetSequenceNumber(), pBuffer));
std::pair<BS::iterator, bool> result = m_list.insert(value);
if (result.second == false)
{
}
CIOBuffer *pNext = 0;
BufferSequence::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
After processing a buffer the thread can check to see if there's another buffer that it can handle. It needs to increase the last processed value and perform the check atomically, hence the locking.
CIOBuffer *CIOBuffer::InOrderBufferList::ProcessAndGetNext()
{
CCriticalSection::Owner lock(m_criticalSection);
::InterlockedIncrement(&m_next);
CIOBuffer *pNext = 0;
BufferSequence::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
Handling reads
If the CIOBuffer used by every write that occurs contains a sequence number then similar
code could be used to ensure that completed read requests are processed in the correct order. However,
there's little point in this code being placed in the server framework as different users of the framework
may require different functionality. The CSocketServer derived class could use the
CIOBuffer::InOrderBufferList class to maintain processing order or it could simply
dispatch the read completions to another IO completion port to pass them across to
a business logic thread pool.
In this case it's the code in the business logic thread pool that actually processes the data and
the order should be maintained there. It may even need do both, ensuring packet order in the
CSocketServer class itself so that it can successfully break the byte stream into messages
and then dispatching the messages to the business logic thread pool and ensuring that these complete
messages are also processed in the correct order.
Locking granularity
Each Socket must now keep track of independent read and write sequence numbers and maintain a map of
out of sequence write requests. Manipulation of the map and associated next sequence number counter must
be protected. We use a critical section to protect this code. Be aware that allocating a critical section for each
Socket connection is potentially resource intensive. Instead we could choose to trade locking granularity for
performance. The CSocketServer class already has a critical section that it uses to protect its lists
of Sockets, we could pass a reference to this critical section to each Socket rather than have them
create their own critical section. The problem with doing this is that we serialise every Socket's map access.
This work performed inside the critical section is small, but a better solution might be to create a critical
section for every X sockets where X is a value that is determined by profiling your application.
Only paying for what you use
Including sequence numbers in all buffers used for sending and receiving and ensuring the writes are
processed in order adds a little overhead to the work of the IO threads. If you are sure that your server
doesn't require this functionality, perhaps because you know that due to your protocol design there will
only ever be a single read or write request pending, you can opt not to include this functionality by
passing false as the useSequenceNumbers flag in the CSocketServer's constructor.
Enabling read or write sequence numbers independently is left as an exercise for the reader.
An example
To demonstrate the concept of ensuring the ordering of multiple reads and writes we've come up with a
rather contrived example. The packet echo server that we developed in the
previous article has been changed as
follows:
- It now does its work in a business logic thread pool so that we can demonstrate maintaining receive
order in both the socket server and the business logic thread pool when the socket server's worker
thread isn't doing the processing itself.
- It works with larger packets; we use a two byte packet header rather than a one byte header.
This two byte header represents the length of the packet using the following format: packetLength =
byte1 + (byte2 * 256). The length of the packet includes the two byte header.
- When the client initially connects it posts a configurable number of reads. As each read completes it
posts a new read so that it maintains the number of outstanding reads.
- It processes the reads in order, and due to the fact that we now have multiple reads outstanding,
CSocketServer::ProcessDataStream() has changed so that when more data is required we don't
simply reissue a read to read more data into the same buffer.
- It echoes the packet back to the client in pieces by issuing multiple write requests.
The large packet echo server is available for download here in
SocketServer6.zip. Testing using telnet is possible, though more complex, you may find it easier to use
the test harness that we develop here to test it. As with the previous examples, the
server runs until a named event is set and then shuts down. The very simple Server Shutdown program,
available here,
provides an off switch for the server.
Although both the server and thread pool classes are configurable as to whether they use sequence
numbers to maintain packet order these settings can only be set in one way for the server to actually
work in the way that the test harness expects. All packet ordering flags must be set to true. The purpose
of the flags is so that you can turn off the various sequencing required and see the effect on the test.
It's not intended that the server can run reliably in any other configuration.
CThreadPool pool(
5, 5, 10, 5, 5000, 100, 10000, 20, true, true);
pool.Start();
CSocketServer server(
INADDR_ANY, 5001, 10, 10, 1024, pool,
65536, true, true, true);
The configuration flags can be adjusted to witness the following effects:
- The configuration shown above ensures that packets into the read completion method are processed
in sequence - this maintains the validity of the incoming packet data; Packets into the worker thread
are maintained in sequence - this maintains the order of echoing the actual packets; and the sequence
of write calls is maintained - this maintains the validity of the outgoing packet data.
- If the number of reads to post (1) is reduced to 1 then there is no need to maintain the read
completion sequencing ((4) can be set to false) as long as read completion method doesn't issue another
read until it has completed the processing of the current one ((6) should be set to false).
- If the business logic thread pool doesn't attempt to maintain packet ordering ((2) set to false)
then the test will likely report sequence number mismatches - as the packets are echoed out of sequence,
and response != message errors as multiple threads in the business logic thread pool attempt to write
fragments of the message to the socket in an unsynchronised manner and thus interleave
sections of
different messages.
- If (2) is left set to false but (3) is also set to false then the test will only fail with sequence
number mismatches as the threads are now echoing their packets in a single write so the data in the
individual packets cant be corrupted by being interleaved with sections of other packets.
- Unfortunately it's currently impossible to simply turn off write packet ordering at the socket
server (option (5)) as doing so also turns off read packet sequencing and so makes it impossible to get
valid data to the business logic thread so that it can echo it and we can witness the writes being
performed out of sequence. If you're interested in seeing this in action then you can hack at the
socket server code.
Revision history
- 15th July 2002 - Initial revision.
- 12th August 2002 - Removed the race condition in socket closure - Thanks to David McConnell for pointing this out.
Derived class can receive connection reset and connection error notifications. Socket provides a means to determine if
send/receive are connected. Dispatch to the thread pool now uses shared enums rather than hard coded constant values.
General code cleaning and lint issues. Adjusted the code and article so that each socket has its own critical section
and the resource utilisation optimisation is suggested, rather than imposed. Fixed a bug whereby the critical section
that is used to protect the per socket data was owned by the worker thread rather than the per socket data.
Other articles in the series
-
A reusable socket server class
-
Business logic processing in a socket server
-
Speeding up socket server connections with AcceptEx
-
Handling multiple pending socket read and write operations
-
Testing socket servers with C# and .Net
-
A high performance TCP/IP socket server COM component for VB