A C++ Websocket server for realtime interaction with Web clients






4.81/5 (23 votes)
A Websocket protocol implementation atop the ush Framework real time library plus a demo example featuring four types of communication workflows between the HTML5 web client and the server.
- Download source code - 347 KB
- Download Push Framework - 187 KB
- Download Web Client - 117 KB
- Download Websocket Protocol - 22 KB
- Download Websocket Server - 9.66 KB
Table of contents
Introduction
The introduction of the Websocket protocol marks an interesting milestone in the evolution of the web. Finally, it is possible for a web page to open a full duplex connection with a remote server and asynchronously receive data without having to poll for it. This opens the door for plenty of ideas to be easily doable by implementing a web front-end that gets deployed to multiple types of devises and a custom server side application able to handle thousands of simultaneously connected clients and which can be deployed on one low-cost server machine.
In this article, a Websocket server application is developed and we showcase its interaction with a webpage. The solution is based on a real time communication library that was previously published at CodeProject: Push Framework. The protocol layer is devised in an independent library project that can be easily reused by developers.
Protocol Extension Layer
The solution presented in this article is based on Push Framework which provides a foundation for creating real time servers able to manage large numbers of simultaneously connected clients. Push Framework is protocol independent: It is up to us to give protocol details and information by making concrete implementation of the following "abstract classes":
IncomingPacket
: What's the prototype for incoming messages, i.e., messages sent by the client and which the server needs to react to?OutgoingPacket
: What's the prototype for outgoing messages? Most protocols are symmetric, so it should be the same asIncomingPacket
.Protocol
: How incoming packets are deserialized and how outgoing packets are serialized so they get sent through the network.
To make PushFramework::Protocol
a concrete class, the following virtual methods must be overridden:
encodeOutgoingPacket
: it takes anOutgoingPacket
and encodes it.frameOutgoingPacket
: it takes an encodedOutgoingPacket
and inserts it into the output socket buffer.tryDeframeIncomingPacket
: it provides a reference to the received data. These should be examined and anIncomingPacket
object may be returned.decodeIncomingPacket
: iftryDeframeIncomingPacket
succeeds in making anIncomingPacket
, this function should decode its content.
These methods are requested internally by PF at serialization and de-serialization times and they provide enough abstraction to the majority of protocols.
Dealing with the Websocket protocol, it should be understood that the encoding/decoding part is a separate implementation: This is because,
the spec is more of a framing protocol. It details how the payload is encapsulated into a frame along with header information
so that it is transmitted into the network. But it does not impose how the payload is "encoded". So the methods that are really relevant to Websocket
are frameOutgoingPacket
and tryDeframeIncomingPacket
. In our example, we do not do a big job in the encoding stage. Developers might find it
suitable to modify this, by adding a JSON layer, for example.
The spec., however, talks about two communication stages in the protocol, which leads us to creating two types of data structures:
- A handshake message: when a connection is accepted at the transport layer, a handshake stage where some negotiation is made begins.
- A websocket data message: this will represent the data messages that are exchanged once the handshake stage is accomplished.
The framing code should distinguish between the two stages.
int WebsocketProtocol::tryDeframeIncomingPacket( PushFramework::DataBuffer& buffer,
PushFramework::IncomingPacket*& pPacket, int& serviceId,
unsigned int& nExtractedBytes, ConnectionContext* pContext )
{
if (buffer.GetDataSize() == 0)
return Protocol::eIncompletePacket;
WebsocketConnectionContext* pCxt = (WebsocketConnectionContext*) pContext;
if (pCxt->GetStage() == WebsocketConnectionContext::HandshakeStage)
{
WebsocketHandshakeMessage* pMessage =
new WebsocketHandshakeMessage(buffer.GetBuffer(), buffer.GetDataSize());
serviceId = 0;
nExtractedBytes = buffer.GetDataSize();
pPacket = pMessage;
return Protocol::Success;
}
//In the other cases, we should expect a data message :
int nMinExpectedSize = 6;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
BYTE payloadFlags = buffer.getAt(0);
if (payloadFlags != 129)
return Protocol::eUndefinedFailure;
BYTE basicSize = buffer.getAt(1) & 0x7F;
unsigned __int64 payloadSize;
int masksOffset;
if (basicSize <= 125)
{
payloadSize = basicSize;
masksOffset = 2;
}
else if (basicSize == 126)
{
nMinExpectedSize += 2;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
payloadSize = ntohs( *(u_short*) (buffer.GetBuffer() + 2) );
masksOffset = 4;
}
else if (basicSize == 127)
{
nMinExpectedSize += 8;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
payloadSize = ntohl( *(u_long*) (buffer.GetBuffer() + 2) );
masksOffset = 10;
}
else
return Protocol::eUndefinedFailure;
nMinExpectedSize += payloadSize;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
BYTE masks[4];
memcpy(masks, buffer.GetBuffer() + masksOffset, 4);
char* payload = new char[payloadSize + 1];
memcpy(payload, buffer.GetBuffer() + masksOffset + 4, payloadSize);
for (unsigned __int64 i = 0; i < payloadSize; i++) {
payload[i] = (payload[i] ^ masks[i%4]);
}
payload[payloadSize] = '\0';
WebsocketDataMessage* pMessage = new WebsocketDataMessage(payload);
serviceId = 1;
nExtractedBytes = nMinExpectedSize;
pPacket = pMessage;
delete payload;
return Protocol::Success;
}
The Websocket Server
In WebsocketServer, we instantiate a main object derived from PushFramework::Server
, initialize it by describing a Protocol object, a service object,
and a ClientFactory
object, then we start it by calling the ::Start
member function.
When this function is called, many resources are put in place:
- A listening thread
- A pool of threads (IO Workers) to service IO events
- A main thread to manage the overall server structures
- A number of "streaming threads", these will stream out data in broadcast queues to subscribers
The protocol object provided should be derived from the WebsocketProtocol
class designed in the separate DLL project.
As for the ClientFactory
subclass, it should manage the lifecycle of connected clients. Particularly, it decides on
the transition of when a newly accepted connection (PhysicalConnection
) is transformed into a
legitimate client (LogicalConnection
).
In our case, this transition is dependent on two validations: handshake validation as described by
the Websocket protocol, and
a login validation where we just require that clients send a unique pseudonym.
int WebsocketClientFactory::onFirstRequest( IncomingPacket& _request,
ConnectionContext* pConnectionContext, LogicalConnection*& lpClient,
OutgoingPacket*& lpPacket )
{
//received messages belong to a physical connection
//that still did not transform into a logical connection :
//understand in which stage we are :
WebsocketConnectionContext* pCxt = (WebsocketConnectionContext*) pConnectionContext;
if (pCxt->GetStage() == WebsocketConnectionContext::HandshakeStage)
{
WebsocketHandshakeMessage& request = (WebsocketHandshakeMessage&) _request;
if (!request.Parse())
{
return ClientFactory::RefuseAndClose;
}
WebsocketHandshakeMessage *pResponse = new WebsocketHandshakeMessage();
if (WebsocketProtocol::ProcessHandshake(request, *pResponse))
{
lpPacket = pResponse;
pCxt->SetStage(WebsocketConnectionContext::LoginStage);
}
return ClientFactory::RefuseRequest;
// Will not close the connection, but we still wait
// for login message to create a logical client.
}
if (pCxt->GetStage() == WebsocketConnectionContext::LoginStage)
{
WebsocketDataMessage& request = (WebsocketDataMessage&) _request;
WebsocketClient* pClient = new WebsocketClient(request.GetArg1());
lpClient = pClient;
WebsocketDataMessage *pResponse = new WebsocketDataMessage(LoginCommunication);
pResponse->SetArguments("Welcome " + request.GetArg1());
lpPacket = pResponse;
pCxt->SetStage(WebsocketConnectionContext::ConnectedStage);
return ClientFactory::CreateClient;
}
//Impossible to come here.
}
The server business code is organized into "Service" classes. Each is bound to a particular type of request:
WebsocketServer server;
server.registerService(EchoCommunication, new EchoService, "echo");
server.registerService(Routedcommunication, new RoutedCommunicationService, "routed");
server.registerService(GroupCommunication, new GroupCommunicationService, "grouped");
server.registerService(StreamedCommunication, new StreamedCommunicationService, "streamed");
Let's see the source code for two of them:
void RoutedCommunicationService::handle( LogicalConnection* pClient, IncomingPacket* pRequest )
{
WebsocketDataMessage& request = (WebsocketDataMessage&)(*pRequest);
WebsocketClient& client = (WebsocketClient&) (*pClient);
LogicalConnection* pRecipient = FindClient(request.GetArg1().c_str());
if (pRecipient)
{
WebsocketDataMessage response(Routedcommunication);
response.SetArguments(client.getKey(), request.GetArg2());
pRecipient->PushPacket(&response);
}
}
For the forth situation, all the servers care about is handling the subscribe, unsubscribe requests:
void StreamedCommunicationService::handle( LogicalConnection* pClient, IncomingPacket* pRequest )
{
WebsocketDataMessage& request = (WebsocketDataMessage&)(*pRequest);
WebsocketClient& client = (WebsocketClient&) (*pClient);
string opType = request.GetArg1();
if (opType == "subscribe")
{
broadcastManager.SubscribeConnectionToQueue(client.getKey(), "streamingQueue");
}
if (opType == "unsubscribe")
{
broadcastManager.UnsubscribeConnectionFromQueue(client.getKey(), "streamingQueue");
}
}
In fact, PF already has a publish/subscribe mechanism, so that we just care about setting up the queues, subscribe clients to these, and publish messages. Message senders do not know about receivers nor are receivers aware about senders. Available data is streamed continuously to those who are interested in it.
The Client
Our web page displays four tabs, in each tab we can trigger one type of operation:
- Echo tab: a message is sent and all the server does is echo it back to the client.
- Routed Communication: a message is sent to a particular client, the server takes care of routing it to its destination.
- Group Communication: a message is sent to the server so it is published to a broadcast queue. We can remotely subscribe to the queue, to begin receiving all content.
- Streamed Communication: allows subscription/unsubscription to a broadcast queue of which content is published automatically. A server thread will do this publishing, so we can experience real time data in the client.
To login, the client enters a pseudonym then clicks "Connect". The server then replies back.
You can test the different types of communication workflows like echo communication and streamed communication where you get a real time stream of messages automatically created by the server and sent to the webpage: