Click here to Skip to main content
Click here to Skip to main content
Technical Blog

Tagged as

TPL-based TCP server with cancellation support

, 19 Jul 2013 CPOL
Rate this:
Please Sign up or sign in to vote.
TPL-based TCP server with cancellation support.

When running TCP server there are two points where you have to either block and wait or use asynchronous "Begin"..."End" function pair:

  • When listening for incoming connections
  • When reading from connected socket

None of these options support cancelation. It seems the only way to stop the server is to periodically poll TcpListener.Pending and TcpClient.Available and check CancellationToken.IsCancellationRequested.

This sample demonstrates a simple TCP server wrapped as a Task. The task supports cancelation, both listening thread and all client threads are canceled by single CancellationToken.

The task calls your callback function and passes list of lines received from the client. You can write your data back to client and close the connection in the end. Like so:

_server = TcpServer.Run(80, cancellationToken, (lines, client, cancellation) =>
    {
        while (DoStuff(lines) && !cancellation.IsCancellationRequested)
        {
            client.GetStream().WriteStuff();
        }

        client.Close();
    });

...

cancellationToken.Cancel();
_server.Wait();

This is the server code:

public static class TcpServer
{
    private const int SleepTimeout = 50;// miliseconds

    public static Task Run(int port, CancellationToken cancellation, 
         Action<List<string>, 
         TcpClient, CancellationToken> onLineReceived)
    {
        return Accept(port, cancellation, (client, token) =>
            {
                var receivedLines = new List<string>();

                while(!cancellation.IsCancellationRequested && client.Connected)
                {
                    var available = client.Available;

                    if (available == 0)
                    {
                        Thread.Sleep(SleepTimeout);
                        continue;
                    }

                    var buffer = new byte[available];
                    client.GetStream().Read(buffer, 0, available);

                    var newData = Encoding.UTF8.GetString(buffer);
                    bool newLine;
                    AppendLines(receivedLines, newData, out newLine);

                    if (newLine) onLineReceived(receivedLines, client, cancellation);
                }

                client.Close();
            });
    }

    private static Task Accept(int port, CancellationToken cancellation, 
         Action<TcpClient, CancellationToken> onClientAccepted)
    {
        var childTasks = new List<Task>();
        var listener = new TcpListener(IPAddress.Any, port);
        listener.Server.SetSocketOption(SocketOptionLevel.Socket, 
                 SocketOptionName.ReuseAddress, 1);
        listener.Start();

        return Task.Factory.StartNew(() =>
            {
                while (!cancellation.IsCancellationRequested)
                {
                    if (!listener.Pending())
                    {
                        Thread.Sleep(SleepTimeout);
                        continue;
                    }

                    var client = listener.AcceptTcpClient();

                    var childTask = new Task(() => onClientAccepted(client, cancellation));
                    childTasks.Add(childTask);
                    childTask.ContinueWith(t => childTasks.Remove(t));
                    childTask.Start();
                }
            }, cancellation).ContinueWith(t =>
                {
                    Task.WaitAll(childTasks.ToArray());
                    listener.Stop();
                });
    }
    private static void AppendLines(List<string> lines, string newData, out bool newLine)
    {
        int i;
        int pos = 0;
        string line;
        newLine = false;

        for (i = pos; i < newData.Length; i++)
        {
            if (newData[i] == '\n')
            {
                line = (i > 0 && newData[i - 1] == '\r') ?
                                newData.Substring(pos, i - pos -1) :
                                newData.Substring(pos, i - pos);

                if (lines.Count == 0) lines.Add(line);
                else
                {
                    if (newLine) lines.Add(line);
                    else lines[lines.Count - 1] = lines[lines.Count - 1] + line;
                }

                newLine = true;
                pos = i + 1;
            }
        }

        line = newData.Substring(pos);
        if (!string.IsNullOrEmpty(line)) lines[lines.Count - 1] = lines[lines.Count - 1] + line;
    }
}

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

m_kramar

Australia Australia
No Biography provided

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.150327.1 | Last Updated 19 Jul 2013
Article Copyright 2013 by m_kramar
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid