Click here to Skip to main content
13,146,315 members (43,415 online)
Click here to Skip to main content
Add your own
alternative version

Stats

14.7K views
575 downloads
55 bookmarked
Posted 28 Mar 2017

Full Duplex Asynchronous Read/Write with Named Pipes

, 28 Mar 2017
Rate this:
Please Sign up or sign in to vote.
Revisting an old tech for some simple inter-process communication

Introduction

I needed a simple way to communicate between multiple applications running locally on the same computer -- basically, the "server" application spawns "client" applications and needs to communicate with the child apps, directing them to perform certain tasks and return their results.

There's a lot of ways to communicate between apps nowadays -- web sockets, HTTP, WCF, memory mapped files, even using files, to name a few.  I ended up selecting named pipes because it's a:

  • Simple approach with little overhead.
  • Doesn't require opening custom ports.
  • Doesn't require complex configuration like WCF.
  • Bit more higher level than memory mapped files.

I figured there must be a GitHub (or other) library out there already, and I was surprised that for something as "old tech" has named pipes, while I found various examples and demo code, nothing was neatly packaged into a library with some basic (and good, I guess, that's the real qualifier) functionality.  Eventually I came across a post by Eric Frazer where he was looking for a full duplex asynchronous named pipe implementation, and ended up answering his own question with some code.  See here.

The code presented here is based on his code though I've refactored it quite a bit in some cases as well as implemented a demo and some unit tests around it. 

So what you'll learn here is how to implement full duplex named pipe communication, and even though it's old technology, there are times when you just want something brain-dead simple!  And of course the core named pipe client/server code is implemented in a library separate from the demo app.

The Demo

The demo app guides you through creating the name pipe server and instantiating clients and sending text messages between each client/server connection.  While the demo runs in a single application process space, it would of course work as separate client applications.

Start the Server

To begin the demo, the only action you can take is to click on the Start button that instantiates the server.

Create a Client

Once you've started the server (you can only do that once), the Create Client is enabled.  Click on it.  (The reason for this step by step process is that it makes it easier to debug into the demo and see what each step is doing.)

Send Messages!

Now that you've created a client, you can send messages between the client and server.  Note that this is a full duplex connection.

Create Another Client

You can create additional clients by clicking on the Create Client again (do this too much and you'll run out of screen space!) and send messages between the server and the new client.  Again, notice that this is full duplex and each client-server connection is a direct channel as opposed to broadcast behavior.  (Well, broadcast behavior is really just a change in how the server would manage sending messages to each of it's clients.)

Behind the Scenes

Let's look at what's going on next.

Overall Architecture

The BasicPipe abstract class is the base class for both ClientPipe and ServerPipe, and among other things, implements three events, DataReceived, PipeConnected and PipeClosedDataReceived will pass in a PipeEventArgs.

The BasicPipe Core Implementation

The base class implements common methods for both the server and client. 

public abstract class BasicPipe
{
  public event EventHandler<PipeEventArgs> DataReceived;
  public event EventHandler<EventArgs> PipeClosed;
  public event EventHandler<EventArgs> PipeConnected;

  protected PipeStream pipeStream;
  protected Action<BasicPipe> asyncReaderStart;

  public BasicPipe()
  {
  }

  public void Close()
  {
    pipeStream.WaitForPipeDrain();
    pipeStream.Close();
    pipeStream.Dispose();
    pipeStream = null;
  }

  public void Flush()
  {
    pipeStream.Flush();
  }

  protected void Connected()
  {
    PipeConnected?.Invoke(this, EventArgs.Empty);
  }

  protected void StartByteReaderAsync(Action<byte[]> packetReceived)
  {
    int intSize = sizeof(int);
    byte[] bDataLength = new byte[intSize];

    pipeStream.ReadAsync(bDataLength, 0, intSize).ContinueWith(t =>
    {
      int len = t.Result;

      if (len == 0)
      {
        PipeClosed?.Invoke(this, EventArgs.Empty);
      }
      else
      {
        int dataLength = BitConverter.ToInt32(bDataLength, 0);
        byte[] data = new byte[dataLength];

        pipeStream.ReadAsync(data, 0, dataLength).ContinueWith(t2 =>
        {
          len = t2.Result;

          if (len == 0)
          {
            PipeClosed?.Invoke(this, EventArgs.Empty);
          }
          else
          {
            packetReceived(data);
            StartByteReaderAsync(packetReceived);
          }
        });
      }
    });
  }
}

Note that the reader/writer is asynchronous, with the writer implemented as awaitable an Task by taking advantage of System.IO.Stream.WriteAsync. The receiver is of course asynchronous, so care must be taken to marshal the DataReceived event onto the appropriate thread, for example, the UI thread.

Buffers

One of the low-level issues is that named pipes don't have a concept of the amount of data being sent, so that's something that you have to manage.  Creating a receive buffer of fixed size is wasteful if the buffer is bigger than the data being sent, and conversely susceptible to overruns when the data being sent is larger than allocated size.

This is one of the refactorings that I ended up doing to Eric's code -- always sending the length of the data as the initial portion of the packet (you're application is responsible for serializing the data into a string or byte stream, these are the only two supported methods of communication.)

Writers

Two writers are implemented:

public Task WriteString(string str)
{
  return WriteBytes(Encoding.UTF8.GetBytes(str));
}

public Task WriteBytes(byte[] bytes)
{
  var blength = BitConverter.GetBytes(bytes.Length);
  var bfull = blength.Concat(bytes).ToArray();

  return pipeStream.WriteAsync(bfull, 0, bfull.Length);
}

The idea here is that you can write both a raw byte array or a string (for example, a JSON or XML serialized packet), but it's important to know that on the receive side, the receiver has to be in agreement with what it is expecting.

Readers

As stated above, the writer and receiver have to be in agreement as to what is being sent -- strings, byte arrays, whatever else that isn't implemented. 

/// <summary>
/// Reads an array of bytes, where the first [n] bytes (based on the server's intsize) indicates the number of bytes to read
/// to complete the packet.
/// </summary>
public void StartByteReaderAsync()
{
  StartByteReaderAsync((b) =>
    DataReceived?.Invoke(this, new PipeEventArgs(b, b.Length)));
}

/// <summary>
/// Reads an array of bytes, where the first [n] bytes (based on the server's intsize) indicates the number of bytes to read
/// to complete the packet, and invokes the DataReceived event with a string converted from UTF8 of the byte array.
/// </summary>
public void StartStringReaderAsync()
{
  StartByteReaderAsync((b) =>
  {
    string str = Encoding.UTF8.GetString(b).TrimEnd('\0');
    DataReceived?.Invoke(this, new PipeEventArgs(str));
  });
}

As the above code illustrates, both byte[] and string readers call the common StartByteReaderAsync method but implement different Action<byte[]> handlers.

If you want to implement your own parsing of the raw byte[] packet, use:

public void StartReaderAsync(Action<byte[]> handler = null)
{
  StartByteReaderAsync((b) =>
  {
    DataReceived?.Invoke(this, new PipeEventArgs(b, b.Length));
    handler?.Invoke(b);
  });
}

Here you really two options -- do you want to process the raw byte[] packet using DataReceived event or with an optional handler?  It's up to you.

The ServerPipe Class

public class ServerPipe : BasicPipe
{
  protected NamedPipeServerStream serverPipeStream;
  protected string PipeName { get; set; }

  public ServerPipe(string pipeName, Action<BasicPipe> asyncReaderStart)
  {
    this.asyncReaderStart = asyncReaderStart;
    PipeName = pipeName;

    serverPipeStream = new NamedPipeServerStream(
    pipeName,
    PipeDirection.InOut,
    NamedPipeServerStream.MaxAllowedServerInstances,
    PipeTransmissionMode.Message,
    PipeOptions.Asynchronous);

    pipeStream = serverPipeStream;
    serverPipeStream.BeginWaitForConnection(new AsyncCallback(Connected), null);
  }

  protected void Connected(IAsyncResult ar)
  {
    Connected();
    serverPipeStream.EndWaitForConnection(ar);
    asyncReaderStart(this);
  }
}

When you instantiate the server pipe, you have to tell it how it what reader to use.  For example, this tells the server to use the string reader behavior:

ServerPipe serverPipe = new ServerPipe("Test", p => p.StartStringReaderAsync());

The ClientPipe Class

public class ClientPipe : BasicPipe
{
  protected NamedPipeClientStream clientPipeStream;

  public ClientPipe(string serverName, string pipeName, Action<BasicPipe> asyncReaderStart)
  {
    this.asyncReaderStart = asyncReaderStart;
    clientPipeStream = new NamedPipeClientStream(serverName, pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
    pipeStream = clientPipeStream;
  }

  public void Connect()
  {
    clientPipeStream.Connect();
    Connected();
    asyncReaderStart(this);
  }
}

Similarly, when you instantiate the client, you'll need to tell the client what reader to use as well.  As mentioned before, the server and client need to use the same read/write "protocol", otherwise Bad Things will happen.

The client is instantiated like this:

ClientPipe clientPipe = new ClientPipe(".", "Test", p=>p.StartStringReaderAsync());

Marshalling Events in the Demo

The demo illustrates how the asynchronous read events are marshaled onto the UI thread.  This is a very common behavior when I write desktop apps, so BeginInvoke (and Invoke) is implemented as an extension method in my library toolbox:

public static class WinFormExtensionMethods
{
  /// <summary>
  /// Asynchronous invoke on application thread. Will return immediately unless invocation is not required.
  /// </summary>
  public static void BeginInvoke(this Control control, Action action)
  {
    if (control.InvokeRequired)
    {
      // We want a synchronous call here!!!!
      control.BeginInvoke((Delegate)action);
    }
    else
    {
      action();
    }
  }

  /// <summary>
  /// Synchronous invoke on application thread. Will not return until action is completed.
  /// </summary>
  public static void Invoke(this Control control, Action action)
  {
    if (control.InvokeRequired)
    {
      // We want a synchronous call here!!!!
      control.Invoke((Delegate)action);
    }
    else
    {
      action();
    }
  }
}

Server Events

In the demo, this places the data received from the client into the appropriate TextBox.

private ServerPipe CreateServer()
{
  int serverIdx = serverPipes.Count;
  ServerPipe serverPipe = new ServerPipe("Test", p => p.StartStringReaderAsync());
  serverPipes.Add(serverPipe);

  serverPipe.DataReceived += (sndr, args) =>
    this.BeginInvoke(() =>
      tbServerReceivers[serverIdx].Text = args.String);

  serverPipe.PipeConnected += (sndr, args) =>
    this.BeginInvoke(() =>
    {
      CreateServerUI();
      btnServerSend[serverIdx].Enabled = true;
      CreateServer();
    });

  return serverPipe;
}

Notice how the server's Send button is enabled after the client has connected, and also notice how CreateServer is recursively called to set up the next listener.

Client Events

In the demo, as with the server, this places the data received from the server into the appropriate TextBox.  Note also that the client's Send button is enabled after the client connects:

private void CreateClient()
{
  int clientIdx = clientPipes.Count;
  ClientPipe clientPipe = new ClientPipe(".", "Test", p=>p.StartStringReaderAsync());
  clientPipes.Add(clientPipe);

  CreateClientUI();

  clientPipe.DataReceived += (sndr, args) =>
    this.BeginInvoke(() =>
      tbClientReceivers[clientIdx].Text = args.String);

  clientPipe.PipeConnected += (sndr, args) =>
    this.BeginInvoke(() =>
      btnClientSend[clientIdx].Enabled = true);

  clientPipe.Connect();
}

The Rest

The rest of the demo is really just management of the client/server groupboxes, which are implemented as user controls and added to the form when a new client connection is created:

protected void CreateServerControls(int n)
{
  ServerConnection cc = new ServerConnection();
  Button btnSend = (Button)cc.Controls.Find("btnSendToClient", true)[0];
  btnSend.Click += btnSendToClient_Click;
  btnSend.Tag = n;
  btnServerSend.Add(btnSend);
  tbServerSenders.Add((TextBox)cc.Controls.Find("tbServerSend", true)[0]);
  tbServerReceivers.Add((TextBox)cc.Controls.Find("tbServerReceived", true)[0]);
  cc.Location = new Point(gbServer.Location.X - 3, gbServer.Location.Y + (gbServer.Size.Height + 10) * n);
  Controls.Add(cc);
}

protected void CreateClientControls(int n)
{
  ClientConnection cc = new ClientConnection();
  Button btnSend = (Button)cc.Controls.Find("btnSendToServer", true)[0];
  btnSend.Click += btnSendToServer_Click;
  btnSend.Tag = n;
  btnClientSend.Add(btnSend);
  tbClientSenders.Add((TextBox)cc.Controls.Find("tbClientSend", true)[0]);
  tbClientReceivers.Add((TextBox)cc.Controls.Find("tbClientReceived", true)[0]);
  cc.Location = new Point(gbClient.Location.X - 3, gbClient.Location.Y + (gbClient.Size.Height + 10) * n);
  Controls.Add(cc);
}

Conclusion

So, there you have it -- hopefully a simple yet extensible library that implements asynchronous full duplex named pipe communication.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Marc Clifton
United States United States
Marc is the creator of two open source projects, MyXaml, a declarative (XML) instantiation engine and the Advanced Unit Testing framework, and Interacx, a commercial n-tier RAD application suite.  Visit his website, www.marcclifton.com, where you will find many of his articles and his blog.

Marc lives in Philmont, NY.

You may also be interested in...

Pro
Pro

Comments and Discussions

 
QuestionComplie error Pin
awagner6440422hrs 48mins ago
memberawagner6440422hrs 48mins ago 
QuestionSystem.OutOfMemoryException in StartByteReaderAsync Pin
DeadSix1720-Jul-17 20:35
memberDeadSix1720-Jul-17 20:35 
Question+5 Question Pin
Kevin Marois29-Jun-17 6:18
professionalKevin Marois29-Jun-17 6:18 
Question+5 and a few questions Pin
BillWoodruff24-Jun-17 19:48
mvpBillWoodruff24-Jun-17 19:48 
QuestionVery nice (as usual) Pin
Garth J Lancaster30-Mar-17 17:03
mvpGarth J Lancaster30-Mar-17 17:03 
AnswerRe: Very nice (as usual) Pin
Marc Clifton6-Apr-17 3:22
protectorMarc Clifton6-Apr-17 3:22 
BugCompile errors Pin
ravikodukula28-Mar-17 23:48
memberravikodukula28-Mar-17 23:48 
GeneralRe: Compile errors Pin
Marc Clifton29-Mar-17 2:18
protectorMarc Clifton29-Mar-17 2:18 
QuestionMissing Images? Pin
Marc Clifton28-Mar-17 4:01
protectorMarc Clifton28-Mar-17 4:01 
AnswerRe: Missing Images? Pin
Steve Messer28-Mar-17 8:59
memberSteve Messer28-Mar-17 8:59 
GeneralRe: Missing Images? Pin
Marc Clifton28-Mar-17 11:12
protectorMarc Clifton28-Mar-17 11:12 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.170915.1 | Last Updated 28 Mar 2017
Article Copyright 2017 by Marc Clifton
Everything else Copyright © CodeProject, 1999-2017
Layout: fixed | fluid