Click here to Skip to main content
16,018,818 members
Articles / Programming Languages / C#

Full Duplex Asynchronous Read/Write with Named Pipes

Rate me:
Please Sign up or sign in to vote.
4.97/5 (55 votes)
28 Mar 2017CPOL6 min read 132.3K   5.7K   89   26
Revisting an old tech for some simple inter-process communication

Image 1

Introduction

I needed a simple way to communicate between multiple applications running locally on the same computer -- basically, the "server" application spawns "client" applications and needs to communicate with the child apps, directing them to perform certain tasks and return their results.

There's a lot of ways to communicate between apps nowadays -- web sockets, HTTP, WCF, memory mapped files, even using files, to name a few.  I ended up selecting named pipes because it's a:

  • Simple approach with little overhead.
  • Doesn't require opening custom ports.
  • Doesn't require complex configuration like WCF.
  • Bit more higher level than memory mapped files.

I figured there must be a GitHub (or other) library out there already, and I was surprised that for something as "old tech" has named pipes, while I found various examples and demo code, nothing was neatly packaged into a library with some basic (and good, I guess, that's the real qualifier) functionality.  Eventually I came across a post by Eric Frazer where he was looking for a full duplex asynchronous named pipe implementation, and ended up answering his own question with some code.  See here.

The code presented here is based on his code though I've refactored it quite a bit in some cases as well as implemented a demo and some unit tests around it. 

So what you'll learn here is how to implement full duplex named pipe communication, and even though it's old technology, there are times when you just want something brain-dead simple!  And of course the core named pipe client/server code is implemented in a library separate from the demo app.

The Demo

The demo app guides you through creating the name pipe server and instantiating clients and sending text messages between each client/server connection.  While the demo runs in a single application process space, it would of course work as separate client applications.

Start the Server

Image 2

To begin the demo, the only action you can take is to click on the Start button that instantiates the server.

Create a Client

Image 3

Once you've started the server (you can only do that once), the Create Client is enabled.  Click on it.  (The reason for this step by step process is that it makes it easier to debug into the demo and see what each step is doing.)

Send Messages!

Image 4

Now that you've created a client, you can send messages between the client and server.  Note that this is a full duplex connection.

Create Another Client

Image 5

You can create additional clients by clicking on the Create Client again (do this too much and you'll run out of screen space!) and send messages between the server and the new client.  Again, notice that this is full duplex and each client-server connection is a direct channel as opposed to broadcast behavior.  (Well, broadcast behavior is really just a change in how the server would manage sending messages to each of it's clients.)

Behind the Scenes

Let's look at what's going on next.

Overall Architecture

Image 6

The BasicPipe abstract class is the base class for both ClientPipe and ServerPipe, and among other things, implements three events, DataReceived, PipeConnected and PipeClosedDataReceived will pass in a PipeEventArgs.

The BasicPipe Core Implementation

The base class implements common methods for both the server and client. 

public abstract class BasicPipe
{
  public event EventHandler<PipeEventArgs> DataReceived;
  public event EventHandler<EventArgs> PipeClosed;
  public event EventHandler<EventArgs> PipeConnected;

  protected PipeStream pipeStream;
  protected Action<BasicPipe> asyncReaderStart;

  public BasicPipe()
  {
  }

  public void Close()
  {
    pipeStream.WaitForPipeDrain();
    pipeStream.Close();
    pipeStream.Dispose();
    pipeStream = null;
  }

  public void Flush()
  {
    pipeStream.Flush();
  }

  protected void Connected()
  {
    PipeConnected?.Invoke(this, EventArgs.Empty);
  }

  protected void StartByteReaderAsync(Action<byte[]> packetReceived)
  {
    int intSize = sizeof(int);
    byte[] bDataLength = new byte[intSize];

    pipeStream.ReadAsync(bDataLength, 0, intSize).ContinueWith(t =>
    {
      int len = t.Result;

      if (len == 0)
      {
        PipeClosed?.Invoke(this, EventArgs.Empty);
      }
      else
      {
        int dataLength = BitConverter.ToInt32(bDataLength, 0);
        byte[] data = new byte[dataLength];

        pipeStream.ReadAsync(data, 0, dataLength).ContinueWith(t2 =>
        {
          len = t2.Result;

          if (len == 0)
          {
            PipeClosed?.Invoke(this, EventArgs.Empty);
          }
          else
          {
            packetReceived(data);
            StartByteReaderAsync(packetReceived);
          }
        });
      }
    });
  }
}

Note that the reader/writer is asynchronous, with the writer implemented as awaitable an Task by taking advantage of System.IO.Stream.WriteAsync. The receiver is of course asynchronous, so care must be taken to marshal the DataReceived event onto the appropriate thread, for example, the UI thread.

Buffers

One of the low-level issues is that named pipes don't have a concept of the amount of data being sent, so that's something that you have to manage.  Creating a receive buffer of fixed size is wasteful if the buffer is bigger than the data being sent, and conversely susceptible to overruns when the data being sent is larger than allocated size.

This is one of the refactorings that I ended up doing to Eric's code -- always sending the length of the data as the initial portion of the packet (you're application is responsible for serializing the data into a string or byte stream, these are the only two supported methods of communication.)

Writers

Two writers are implemented:

public Task WriteString(string str)
{
  return WriteBytes(Encoding.UTF8.GetBytes(str));
}

public Task WriteBytes(byte[] bytes)
{
  var blength = BitConverter.GetBytes(bytes.Length);
  var bfull = blength.Concat(bytes).ToArray();

  return pipeStream.WriteAsync(bfull, 0, bfull.Length);
}

The idea here is that you can write both a raw byte array or a string (for example, a JSON or XML serialized packet), but it's important to know that on the receive side, the receiver has to be in agreement with what it is expecting.

Readers

As stated above, the writer and receiver have to be in agreement as to what is being sent -- strings, byte arrays, whatever else that isn't implemented. 

/// <summary>
/// Reads an array of bytes, where the first [n] bytes (based on the server's intsize) indicates the number of bytes to read
/// to complete the packet.
/// </summary>
public void StartByteReaderAsync()
{
  StartByteReaderAsync((b) =>
    DataReceived?.Invoke(this, new PipeEventArgs(b, b.Length)));
}

/// <summary>
/// Reads an array of bytes, where the first [n] bytes (based on the server's intsize) indicates the number of bytes to read
/// to complete the packet, and invokes the DataReceived event with a string converted from UTF8 of the byte array.
/// </summary>
public void StartStringReaderAsync()
{
  StartByteReaderAsync((b) =>
  {
    string str = Encoding.UTF8.GetString(b).TrimEnd('\0');
    DataReceived?.Invoke(this, new PipeEventArgs(str));
  });
}

As the above code illustrates, both byte[] and string readers call the common StartByteReaderAsync method but implement different Action<byte[]> handlers.

If you want to implement your own parsing of the raw byte[] packet, use:

public void StartReaderAsync(Action<byte[]> handler = null)
{
  StartByteReaderAsync((b) =>
  {
    DataReceived?.Invoke(this, new PipeEventArgs(b, b.Length));
    handler?.Invoke(b);
  });
}

Here you really two options -- do you want to process the raw byte[] packet using DataReceived event or with an optional handler?  It's up to you.

The ServerPipe Class

public class ServerPipe : BasicPipe
{
  protected NamedPipeServerStream serverPipeStream;
  protected string PipeName { get; set; }

  public ServerPipe(string pipeName, Action<BasicPipe> asyncReaderStart)
  {
    this.asyncReaderStart = asyncReaderStart;
    PipeName = pipeName;

    serverPipeStream = new NamedPipeServerStream(
    pipeName,
    PipeDirection.InOut,
    NamedPipeServerStream.MaxAllowedServerInstances,
    PipeTransmissionMode.Message,
    PipeOptions.Asynchronous);

    pipeStream = serverPipeStream;
    serverPipeStream.BeginWaitForConnection(new AsyncCallback(Connected), null);
  }

  protected void Connected(IAsyncResult ar)
  {
    Connected();
    serverPipeStream.EndWaitForConnection(ar);
    asyncReaderStart(this);
  }
}

When you instantiate the server pipe, you have to tell it how it what reader to use.  For example, this tells the server to use the string reader behavior:

ServerPipe serverPipe = new ServerPipe("Test", p => p.StartStringReaderAsync());

The ClientPipe Class

public class ClientPipe : BasicPipe
{
  protected NamedPipeClientStream clientPipeStream;

  public ClientPipe(string serverName, string pipeName, Action<BasicPipe> asyncReaderStart)
  {
    this.asyncReaderStart = asyncReaderStart;
    clientPipeStream = new NamedPipeClientStream(serverName, pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
    pipeStream = clientPipeStream;
  }

  public void Connect()
  {
    clientPipeStream.Connect();
    Connected();
    asyncReaderStart(this);
  }
}

Similarly, when you instantiate the client, you'll need to tell the client what reader to use as well.  As mentioned before, the server and client need to use the same read/write "protocol", otherwise Bad Things will happen.

The client is instantiated like this:

ClientPipe clientPipe = new ClientPipe(".", "Test", p=>p.StartStringReaderAsync());

Marshalling Events in the Demo

The demo illustrates how the asynchronous read events are marshaled onto the UI thread.  This is a very common behavior when I write desktop apps, so BeginInvoke (and Invoke) is implemented as an extension method in my library toolbox:

public static class WinFormExtensionMethods
{
  /// <summary>
  /// Asynchronous invoke on application thread. Will return immediately unless invocation is not required.
  /// </summary>
  public static void BeginInvoke(this Control control, Action action)
  {
    if (control.InvokeRequired)
    {
      // We want a synchronous call here!!!!
      control.BeginInvoke((Delegate)action);
    }
    else
    {
      action();
    }
  }

  /// <summary>
  /// Synchronous invoke on application thread. Will not return until action is completed.
  /// </summary>
  public static void Invoke(this Control control, Action action)
  {
    if (control.InvokeRequired)
    {
      // We want a synchronous call here!!!!
      control.Invoke((Delegate)action);
    }
    else
    {
      action();
    }
  }
}

Server Events

In the demo, this places the data received from the client into the appropriate TextBox.

private ServerPipe CreateServer()
{
  int serverIdx = serverPipes.Count;
  ServerPipe serverPipe = new ServerPipe("Test", p => p.StartStringReaderAsync());
  serverPipes.Add(serverPipe);

  serverPipe.DataReceived += (sndr, args) =>
    this.BeginInvoke(() =>
      tbServerReceivers[serverIdx].Text = args.String);

  serverPipe.PipeConnected += (sndr, args) =>
    this.BeginInvoke(() =>
    {
      CreateServerUI();
      btnServerSend[serverIdx].Enabled = true;
      CreateServer();
    });

  return serverPipe;
}

Notice how the server's Send button is enabled after the client has connected, and also notice how CreateServer is recursively called to set up the next listener.

Client Events

In the demo, as with the server, this places the data received from the server into the appropriate TextBox.  Note also that the client's Send button is enabled after the client connects:

private void CreateClient()
{
  int clientIdx = clientPipes.Count;
  ClientPipe clientPipe = new ClientPipe(".", "Test", p=>p.StartStringReaderAsync());
  clientPipes.Add(clientPipe);

  CreateClientUI();

  clientPipe.DataReceived += (sndr, args) =>
    this.BeginInvoke(() =>
      tbClientReceivers[clientIdx].Text = args.String);

  clientPipe.PipeConnected += (sndr, args) =>
    this.BeginInvoke(() =>
      btnClientSend[clientIdx].Enabled = true);

  clientPipe.Connect();
}

The Rest

The rest of the demo is really just management of the client/server groupboxes, which are implemented as user controls and added to the form when a new client connection is created:

protected void CreateServerControls(int n)
{
  ServerConnection cc = new ServerConnection();
  Button btnSend = (Button)cc.Controls.Find("btnSendToClient", true)[0];
  btnSend.Click += btnSendToClient_Click;
  btnSend.Tag = n;
  btnServerSend.Add(btnSend);
  tbServerSenders.Add((TextBox)cc.Controls.Find("tbServerSend", true)[0]);
  tbServerReceivers.Add((TextBox)cc.Controls.Find("tbServerReceived", true)[0]);
  cc.Location = new Point(gbServer.Location.X - 3, gbServer.Location.Y + (gbServer.Size.Height + 10) * n);
  Controls.Add(cc);
}

protected void CreateClientControls(int n)
{
  ClientConnection cc = new ClientConnection();
  Button btnSend = (Button)cc.Controls.Find("btnSendToServer", true)[0];
  btnSend.Click += btnSendToServer_Click;
  btnSend.Tag = n;
  btnClientSend.Add(btnSend);
  tbClientSenders.Add((TextBox)cc.Controls.Find("tbClientSend", true)[0]);
  tbClientReceivers.Add((TextBox)cc.Controls.Find("tbClientReceived", true)[0]);
  cc.Location = new Point(gbClient.Location.X - 3, gbClient.Location.Y + (gbClient.Size.Height + 10) * n);
  Controls.Add(cc);
}

Conclusion

So, there you have it -- hopefully a simple yet extensible library that implements asynchronous full duplex named pipe communication.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions

 
QuestionDoes anyone have C++ version of this ? Pin
Member 47592043-Aug-21 6:19
Member 47592043-Aug-21 6:19 
QuestionClient connection time out option Pin
Patrick Eckler11-Jul-20 3:39
Patrick Eckler11-Jul-20 3:39 
QuestionNot an IPC solution Pin
Member 1482626119-May-20 4:57
Member 1482626119-May-20 4:57 
AnswerRe: Not an IPC solution Pin
Patrick Eckler13-Jul-20 3:42
Patrick Eckler13-Jul-20 3:42 
QuestionVery nice and elegant solution Pin
codezeppo9-Feb-20 12:30
codezeppo9-Feb-20 12:30 
GeneralMy vote of 5 Pin
Karsten Otto2-Jan-20 22:08
Karsten Otto2-Jan-20 22:08 
QuestionWorks with two Apps? Pin
Member 1108158620-Dec-19 1:35
Member 1108158620-Dec-19 1:35 
QuestionVery Nice Pin
Member 130217196-Apr-18 4:57
Member 130217196-Apr-18 4:57 
QuestionRace condition in ServerPipe constructor Pin
kid kaneda2-Mar-18 3:33
kid kaneda2-Mar-18 3:33 
QuestionMy Vote of 5 Pin
Joe O'Connell9-Jan-18 2:21
Joe O'Connell9-Jan-18 2:21 
GeneralMy vote of 5 Pin
Robert_Dyball21-Dec-17 10:44
professionalRobert_Dyball21-Dec-17 10:44 
GeneralMy vote of 5 Pin
afluegge13-Dec-17 19:34
afluegge13-Dec-17 19:34 
QuestionComplie error Pin
awagner6440422-Sep-17 2:28
awagner6440422-Sep-17 2:28 
AnswerRe: Complie error Pin
Pavel Sommer4-Jan-18 11:10
Pavel Sommer4-Jan-18 11:10 
QuestionSystem.OutOfMemoryException in StartByteReaderAsync Pin
DeadSix1720-Jul-17 20:35
DeadSix1720-Jul-17 20:35 
AnswerRe: System.OutOfMemoryException in StartByteReaderAsync Pin
afluegge13-Dec-17 23:35
afluegge13-Dec-17 23:35 
Question+5 Question Pin
Kevin Marois29-Jun-17 6:18
professionalKevin Marois29-Jun-17 6:18 
Question+5 and a few questions Pin
BillWoodruff24-Jun-17 19:48
professionalBillWoodruff24-Jun-17 19:48 
AnswerRe: +5 and a few questions Pin
RyanHarris24-Jan-18 14:26
RyanHarris24-Jan-18 14:26 
QuestionVery nice (as usual) Pin
Garth J Lancaster30-Mar-17 17:03
professionalGarth J Lancaster30-Mar-17 17:03 
AnswerRe: Very nice (as usual) Pin
Marc Clifton6-Apr-17 3:22
mvaMarc Clifton6-Apr-17 3:22 
BugCompile errors Pin
Ravi Shankar K28-Mar-17 23:48
Ravi Shankar K28-Mar-17 23:48 
GeneralRe: Compile errors Pin
Marc Clifton29-Mar-17 2:18
mvaMarc Clifton29-Mar-17 2:18 
QuestionMissing Images? Pin
Marc Clifton28-Mar-17 4:01
mvaMarc Clifton28-Mar-17 4:01 
AnswerRe: Missing Images? Pin
Steve Messer28-Mar-17 8:59
Steve Messer28-Mar-17 8:59 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.