![]() |
General Programming »
Internet / Network »
Beginners
Intermediate
License: The Code Project Open License (CPOL)
Handling multiple pending socket read and write operationsBy Len HolgateThis article explains the potential problems with having multiple pending recvs calls on a single socket. |
VC6, VC7Win2K, WinXP, MFC, Dev
|
|
Advanced Search Add to IE Search |
|
|
|
||||||||||||||||
The following source was built using Visual Studio 6.0 SP5 and Visual Studio .Net. You need to have a version of the Microsoft Platform SDK installed
Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output. It's only worth profiling the release builds.
"How do you handle the problem of multiple pending WSARecv() calls?" is a common question on
the Winsock news groups. It seems that everyone knows that it's often a good idea to have more than
one outstanding read waiting on a socket and everyone's equally aware that sometimes code doesn't work
right when you do that. This article explains the potential problems with multiple pending recvs and
provides a solution within the reusable server framework that we've been developing over the last few
articles.
There is a subtle issue when using IO completion ports with multiple threads. Although operations
using the IO completion port will always complete in the order that they were submitted, thread scheduling
issues may mean that the actual work associated with the completion is processed in an undefined order.
For example, if we were to submit three WSARecv requests on a socket then they are
guaranteed to complete in the order that we submitted them, however if we have 2 threads servicing the IO completion
port two of the completions could be being processed simultaneously. If the thread processing the 'first'
WSARecv completion is interrupted the second may be completely processed before the first.
This is even more likely to occur on machines with multiple processors where the two threads may really
be executing simultaneously, but it's possible on single processor boxes too. As always, this is the kind
of subtle problem that probably wont show its face until you release the software to production...
The example above is easy to avoid, simply don't have multiple WSARecv requests outstanding
on a single socket. This is what we have done so far in the example servers developed in the previous articles.
However this reduces performance, it's always more performant to have a receive pending when the data
actually arrives on the wire than it is to post a receive after the data has already arrived. Having
multiple WSARecv calls outstanding ensures that there's always a call pending. What's more,
the problem isn't limited to having multiple WSARecvs. In our server framework we marshal all
socket IO calls from the user's threads into our IO thread pool using the IO completion port. This means
that there is the potential for a user thread to issue multiple consecutive writes to the socket and for
them to be executed in an undefined order.
In our example servers so far, code like this:
void CSocketServer::ReadCompleted( Socket *pSocket, CIOBuffer *pBuffer) { // do stuff... pSocket->Write(pBuffer); // echo the command pSocket->Write(pResponse1); // send part 1 response pSocket->Write(pResponse2); // send part 2 response }
is potentially unsafe because the writes don't occur synchronously on the user's thread, they are posted to the IO completion port and occur in the IO thread pool.
Preserving the order of IO completion operations is relatively straight forward. As you'll remember, the
overlapped structure passed to all calls that use IO completion ports represents 'per call' data. We can,
and do, extend the overlapped structure to include our own 'per call' data by using the
CIOBuffer class.
If we add a sequence number to the CIOBuffer we can set the sequence number to the 'next' value in
the user's thread and then make sure we process the buffers in order in the IO thread pool. This concept
applies to any IO completion port operation and each distinct operation requires its own sequence number.
For our server framework that means that our Socket class must now maintain independent sequence numbers
for read and write requests.
The sequence number management code inside the Socket's Write method could be something like this:
pBuffer->SetSequenceNumber(m_writeSequenceNumber++);
To ensure that the sequence numbers actually represent the order that the operations are submitted requires
that the setting of the sequence number and the submission of the operation are an atomic operation. For
our socket writes this isn't a problem as we only guarentee the order of writes that are performed on a single thread, for socket reads we need to ensure that the allocation of the sequence number and call to
WSARecv() occur without another thread having a chance to perform read at the same time. This
involves using a critical section to lock access to the socket during the sequence number allocation and
WSARecv() call. Failure to lock in this area can lead to the actual order that the
WSARecv() calls are made failing to match the ordering of the sequence numbers allocated.
The code to ensure that the IO completions are handled in order is a little more complex. For each distinct
IO operation we need to keep track of the next sequence number that we can process. When a call to
GetQueuedCompletionStatus() returns we need to compare the sequence number in the request with
the next sequence number that we can process. If these numbers match then we can process the request. If they
don't then the request cannot be processed at this time. If an IO operation cannot be processed it should be
stored for later processing. The storage of the out of sequence request needs to be keyed on the sequence number. When an IO thread finds that it can't process the current request it should add the current request
to the store and see if there's a request in the store that can be processed. When a request is processed
the last thing that the IO thread should do is atomically increment the value representing the next sequence
number to process and check to see if there's an IO request in the store that can be processed.
The above strategy handles the situation where multiple IO requests complete concurrently. Only one thread can be processing an IO request that meets the criteria of being the next one to process, all other threads will simply add their requests to the store. When the thread that's processing a request finishes processing it can check to see if there are other requests in the store that can now be processed. If a thread needs to store its IO request then it can do so and then check for a request that can be processed in an atomic operation.
It's actually more complex to read about than it is to look at, the code to process an operation in order might look like this:
pBuffer = pSocket->m_outOfSequenceWrites.GetNext(pBuffer); while(pBuffer) { DWORD dwFlags = 0; DWORD dwSendNumBytes = 0; if (SOCKET_ERROR == ::WSASend( pSocket->m_socket, pBuffer->GetWSABUF(), 1, &dwSendNumBytes, dwFlags, pBuffer, NULL)) { // handle errors etc. } pBuffer = pSocket->m_outOfSequenceWrites.ProcessAndGetNext(); }
The store itself needs to map sequence numbers to CIOBuffers. The obvious choice of data
structure is a std::map<> though your performance requirements and profiling may
dictate a different choice. GetNext() takes a buffer, compares its sequence number with the
next one we can process and either returns the buffer or adds the buffer to the map and then checks the
map to see if the first buffer in the map is the one we can process. Remember that the map stores its
elements in order of their keys and that we're using the sequence number as the key, so
m_list.begin() refers to the element in the map with the lowest sequence number. If
this function returns null then we're still waiting for the 'next' buffer to arrive.
CIOBuffer *CIOBuffer::InOrderBufferList::GetNext(
CIOBuffer *pBuffer)
{
CCriticalSection::Owner lock(m_criticalSection);
if (m_next == pBuffer->GetSequenceNumber())
{
return pBuffer;
}
BS::value_type value(pBuffer->GetSequenceNumber(), pBuffer));
std::pair<BS::iterator, bool> result = m_list.insert(value);
if (result.second == false)
{
// handle error, element already in map
}
CIOBuffer *pNext = 0;
BufferSequence::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
After processing a buffer the thread can check to see if there's another buffer that it can handle. It needs to increase the last processed value and perform the check atomically, hence the locking.
CIOBuffer *CIOBuffer::InOrderBufferList::ProcessAndGetNext()
{
CCriticalSection::Owner lock(m_criticalSection);
::InterlockedIncrement(&m_next);
CIOBuffer *pNext = 0;
BufferSequence::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
If the CIOBuffer used by every write that occurs contains a sequence number then similar
code could be used to ensure that completed read requests are processed in the correct order. However,
there's little point in this code being placed in the server framework as different users of the framework
may require different functionality. The CSocketServer derived class could use the
CIOBuffer::InOrderBufferList class to maintain processing order or it could simply
dispatch the read completions to another IO completion port to pass them across to
a business logic thread pool.
In this case it's the code in the business logic thread pool that actually processes the data and
the order should be maintained there. It may even need do both, ensuring packet order in the
CSocketServer class itself so that it can successfully break the byte stream into messages
and then dispatching the messages to the business logic thread pool and ensuring that these complete
messages are also processed in the correct order.
Each Socket must now keep track of independent read and write sequence numbers and maintain a map of
out of sequence write requests. Manipulation of the map and associated next sequence number counter must
be protected. We use a critical section to protect this code. Be aware that allocating a critical section for each
Socket connection is potentially resource intensive. Instead we could choose to trade locking granularity for
performance. The CSocketServer class already has a critical section that it uses to protect its lists
of Sockets, we could pass a reference to this critical section to each Socket rather than have them
create their own critical section. The problem with doing this is that we serialise every Socket's map access.
This work performed inside the critical section is small, but a better solution might be to create a critical
section for every X sockets where X is a value that is determined by profiling your application.
Including sequence numbers in all buffers used for sending and receiving and ensuring the writes are
processed in order adds a little overhead to the work of the IO threads. If you are sure that your server
doesn't require this functionality, perhaps because you know that due to your protocol design there will
only ever be a single read or write request pending, you can opt not to include this functionality by
passing false as the useSequenceNumbers flag in the CSocketServer's constructor.
Enabling read or write sequence numbers independently is left as an exercise for the reader.
To demonstrate the concept of ensuring the ordering of multiple reads and writes we've come up with a rather contrived example. The packet echo server that we developed in the previous article has been changed as follows:
CSocketServer::ProcessDataStream() has changed so that when more data is required we don't
simply reissue a read to read more data into the same buffer.The large packet echo server is available for download here in SocketServer6.zip. Testing using telnet is possible, though more complex, you may find it easier to use the test harness that we develop here to test it. As with the previous examples, the server runs until a named event is set and then shuts down. The very simple Server Shutdown program, available here, provides an off switch for the server.
Although both the server and thread pool classes are configurable as to whether they use sequence numbers to maintain packet order these settings can only be set in one way for the server to actually work in the way that the test harness expects. All packet ordering flags must be set to true. The purpose of the flags is so that you can turn off the various sequencing required and see the effect on the test. It's not intended that the server can run reliably in any other configuration.
CThreadPool pool( 5, // initial number of threads to create 5, // minimum number of threads to keep in the pool 10, // maximum number of threads in the pool 5, // maximum number of "dormant" threads 5000, // pool maintenance period (millis) 100, // dispatch timeout (millis) 10000, // dispatch timeout for when pool is at max threads 20, // (1) number of reads to post true, // (2) maintain packet order with sequence numbers true); // (3) echo packets with multiple writes pool.Start(); CSocketServer server( INADDR_ANY, // address to listen on 5001, // port to listen on 10, // max number of sockets to keep in the pool 10, // max number of buffers to keep in the pool 1024, // buffer size pool, 65536, // max message size true, // (4) maintain read packet order with sequence numbers true, // (5) maintain write packet order with sequence numbers true); // (6) issue a new read before we've completely processed // this one
The configuration flags can be adjusted to witness the following effects:
General
News
Question
Answer
Joke
Rant
Admin
|
PermaLink |
Privacy |
Terms of Use
Last Updated: 17 Aug 2002 Editor: Chris Maunder |
Copyright 2002 by Len Holgate Everything else Copyright © CodeProject, 1999-2009 Web19 | Advertise on the Code Project |