Click here to Skip to main content
Click here to Skip to main content

Threads and Pipes in Console Apps

By , 10 Jul 2008
 

Introduction

Like many of my projects, this one started out due to a question in microsoft.public.vc.mfc. Someone had written a program that was supposed to transform the output of a program and convert the stderr and stdout streams of a child process to different representations, and write them out to the stdout handler of the filter process.

Unfortunately, it was a really bad example of a program that would work for this purpose. There is a problem with anonymous pipes; they can't be asynchronous. So, the problem was to grab data from stderr and stdout, but blocking on one pipe means that output that came via the other pipe would not be seen. So, the solution was to use ::PeekNamedPipe to see if there was any data to read. If there was, then a ReadFile was issued to read it. The problem with this was that it was in an infinite loop, and it was polling each time. There was no way it could block waiting for input, so as a result, it would run continuously, ultimately using close to 100% of the CPU time, accomplishing nothing with most of it.

So, the problem was: how to create a program that would block, but would be able to receive data from stderr or stdout as the data became available.

The answer was obvious: threads.

The problem with threads is that far too many people suffer from klostiphobia, the morbid fear of threads, so never think to use threads when they are appropriate, or necessary. In this case, they were necessary.

So, I decided to write a program to demonstrate how threads could be used for this purpose.

Most of my articles on threading use GUI apps, so this shows how threading can be used in a console app.

The Test Program

I wrote a little test program, called tester, which simply generates a random number and, based on that random number, writes a line to either stdout (using an integer value and the imaginative contents "output") or stderr (using an integer value and the equally imaginative contents "error").

int _tmain(int argc, _TCHAR* argv[])
{
 for(int i = 0; i < 100; i++)
     { /* test */
      int r = rand();
      if(r & 0x100)
         { /* stderr */
          _ftprintf(stderr, _T("%4d: error\n"), i);
          fflush(stderr);
         } /* stderr */
      else
         { /* stdout */
          _ftprintf(stdout, _T("%4d: output\n"), i);
          fflush(stdout);
         } /* stdout */

      int w = rand() % 500;
      Sleep(200 + w);
     } /* test */
        return 0;
}

The output from running this program by itself is shown below:

Generating Decorated Console Output

The goal of the poster was to enclose these in some HTML commands that would cause, with appropriate HTML definitions around them, to display the data in some suitable form.

I generalized this to support writing directly to a console window. So, when run under my piper program, the lines to stdout and stderr display differently, as shown below:

Generating HTML Output

However, if I add the -html option to the command line, I get HTML commands wrapped around them, such as:

To make this display work, I had to add (by hand) to this HTML file the following declarations in the <head>...</head> region:

<style type="text/css">
    p.stdout {margin-top:0; margin-bottom:0; color:green}
    p.stderr {margin-top:0; margin-bottom:0; font-weight:bold; color:red}
</style>

The Program

The structure of the program was to create two threads: one thread to handle stdout and one thread to handle stdin. The main thread would gather the data from the two worker threads and display it. When both threads terminated, the main thread would then exit.

The question was what mechanisms to implement to do this. I naturally fell back on my favorite inter-thread queuing mechanism, the I/O Completion Port. I/O Completion Ports are cool, and I describe several techniques in my accompanying article on them. This is just another generalization of that mechanism, with some interesting twists and features to illustrate their generality.

I used VS.NET 2003 to generate this program; to generate it, I asked for a console application, with ATL support, so I could use the CString data type. Note that this is not really possible in VS6. In VS.NET, several important classes were moved out of MFC and moved into the ATL domain, making them usable in console apps and other non-MFC contexts.

_tmain(): Command Line Handling

The command line handling is fairly straightforward, but to give some elegance to the solution, I created a CommandLine class that held all the parameters.

class CommandLine {
    public:
       CommandLine() { HTML = FALSE; IsUnicode = FALSE; program = NULL; }
       BOOL HTML;
       BOOL IsUnicode;
       LPTSTR program;
    }; // class CommandLine

I wanted to be able to support Unicode pipes, so I added an option to treat the incoming data as Unicode data. The HTML member tells whether to use console decoration or generate the HTML I illustrated above.

int _tmain(int argc, _TCHAR* argv[])
   {
    //****************************************************************
    // Argument processing
    //****************************************************************
    if(argc == 1)
       { /* usage */
        CString module;
        LPTSTR p = module.GetBuffer(MAX_PATH);
        ::GetModuleFileName(NULL, p, MAX_PATH);
        module.ReleaseBuffer();
        int n = module.ReverseFind(_T('\\'));
        if(n < 0)
           n = 0;
        module = module.Mid(n + 1);
        n = module.ReverseFind(_T('.'));
        if(n < 0)
           n = module.GetLength();
        module = module.Left(n);
        _ftprintf(stderr, _T("Usage:\n%s [-u] [-html] command\n"), module);
        return Result::INVALID_ARGUMENT;
       } /* usage */

The above code may look a little odd, but I learned years ago in writing console apps that users will rename them. So, if I hardwire the name of the executable into the program, the "usage" message will show the wrong program name. This code merely extracts the file name and displays it properly as part of the usage message.

To deal with the return values, since this is a console app, I wanted to have unique codes for many of the error returns, but I didn't want to have to worry about assigning values to them, so I created a class to represent them.

    CommandLine cmd;

    for(int i = 1; i < argc; i++)
       { /* scan args */
        CString arg = argv[i];
        if(arg[0] == _T('-'))
           { /* option */
            if(arg == _T("-u"))
               { /* unicode */
                cmd.IsUnicode = TRUE;
                continue;
               } /* unicode */
            if(arg == _T("-html"))
               { /* html */
                cmd.HTML = TRUE;
                continue;
               } /* html */
            _ftprintf(stderr, _T("Unrecognized option \"%s\"\n"), arg);
            return Result::INVALID_ARGUMENT;
           } /* option */

        if(cmd.program != NULL)
           { /* two files */
            _ftprintf(stderr, 
                      _T("Two command directives given:\n  [1] \"%s\"\n  [2]\"%s\"\n"),
                      cmd,
                      arg);
            return Result::TWO_COMMANDS;
           } /* two files */
        cmd.program = argv[i];
       } /* scan args */

    if(cmd.program == NULL)
       { /* no args */
        _ftprintf(stderr, _T("need program to run\n"));
        return Result::NO_PROGRAM;
       } /* no args */

Class Result

The result types are defined by the class:

/****************************************************************************
*                                class Result
****************************************************************************/

class Result {
    public:
       typedef enum { SUCCESS = 0,
                      NO_PROGRAM,
                      INVALID_ARGUMENT,
                      TWO_COMMANDS,
                      IOCP_FAILED,
                      IOCP_ERROR,
                      THREAD_FAILURE,
                      STDOUT_CREATION_FAILED,
                      STDERR_CREATION_FAILED,
                      CREATEPROCESS_FAILED
       } Type;
};

Class SmartHandle

One of the problems in many resource allocation schemes, such as opening handles, is that you end up having to make sure everything is closed properly when you terminate a subroutine. There are some smart classes available, but I wanted to illustrate how easy it is to write one. So, I wrote the class SmartHandle, which closes the handle when the variable goes out of scope.

/****************************************************************************
*                              class SmartHandle
****************************************************************************/

class SmartHandle {
    public:
       SmartHandle() { handle = NULL; }
       SmartHandle(HANDLE h) { handle = h; }
       virtual ~SmartHandle() { if(handle != NULL) ::CloseHandle(handle); }
    public:
       operator HANDLE() { return handle; }
       operator LPHANDLE() { return & handle; }
       bool operator==(HANDLE h) { return handle == h; }
       SmartHandle & operator=(HANDLE h) { handle = h; return *this; }
    public:
       void Close() { if(handle != NULL) ::CloseHandle(handle); handle = NULL; }
    protected:
       HANDLE handle;
};

There is one limitation on this class: you must not explicitly call ::CloseHandle. The SmartHandle::Close method must be used, or exceptions can be thrown.

_tmain(): Create I/O Completion Port

The following code creates the I/O Completion Port:

    //****************************************************************
    // Create the I/O Completion Port queue
    //****************************************************************
    SmartHandle iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

    if(iocp == NULL)
       { /* failed iocp */
        DWORD err = ::GetLastError();
        _ftprintf(stderr, _T("CreateIoCompletionPort failed, error %s\n"), 
                  ErrorString(err));
        return Result::IOCP_FAILED;
       } /* failed iocp */

This creates an I/O Completion Port that is not associated with any file handle.

ErrorString

The ErrorString function is very simple, and is a stripped-down version of my more general ErrorString function suitable for this simple program.

/****************************************************************************
*                                 ErrorString
* Inputs:
*       UINT err: Error code
* Result: CString
*       Result
****************************************************************************/

CString ErrorString(UINT err)
   {
    LPTSTR msg;
    FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
                  NULL,
                  err,
                  0,
                  (LPTSTR) &msg,
                  0,
                  NULL);
    CString result = msg;
    LocalFree(msg);
    return result;
   }

_tmain(): Create Pipes

    //****************************************************************
    // Create the pipes to route to the child process
    //****************************************************************
    SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), NULL, TRUE};
    SmartHandle stdout_read;
    SmartHandle stdout_write;
    SmartHandle stderr_read;
    SmartHandle stderr_write;


    static const UINT PIPE_BUFFER_SIZE = 32;

    if(!::CreatePipe((LPHANDLE)stdout_read, (LPHANDLE)stdout_write, &sa, PIPE_BUFFER_SIZE))
       { /* failed stdout */
        DWORD err = ::GetLastError();
        _tprintf(_T("stdout pipe failure: %s\n"), ErrorString(err));
        return Result::STDOUT_CREATION_FAILED;
       } /* failed stdout */

    if(!::CreatePipe((LPHANDLE)stderr_read, (LPHANDLE)stderr_write, &sa, PIPE_BUFFER_SIZE))
       { /* failed stderr */
        DWORD err = ::GetLastError();
        _tprintf(_T("stderr pipe failure: %s\n"), ErrorString(err));
        return Result::STDERR_CREATION_FAILED;
       } /* failed stderr */

This creates four handles representing the read-side and write-side of the stdout and stderr pipes.

_tmain(): Create Process

    //****************************************************************
    // Create the child process
    //****************************************************************
    STARTUPINFO startup = {sizeof(STARTUPINFO)};
    startup.dwFlags = STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW;
    startup.wShowWindow = SW_HIDE;
    startup.hStdOutput = stdout_write;
    startup.hStdError = stderr_write;

    PROCESS_INFORMATION procinfo;

    if(!::CreateProcess(NULL, cmd.program, NULL, NULL, 
                        TRUE, CREATE_NEW_CONSOLE, NULL, NULL, &startup, &procinfo))
       { /* failed */
        DWORD err = ::GetLastError();
        _tprintf(_T("CreateProcess failed for \"%s\": %s"), 
                 cmd.program, ErrorString(err));
        return Result::CREATEPROCESS_FAILED;
       } /* failed */

    ::CloseHandle(procinfo.hProcess);   // handle will never be needed
    ::CloseHandle(procinfo.hThread);    // handle will never be needed

    stdout_write.Close();               // Close our end of the pipe
    stderr_write.Close();               // Close our end of the pipe

After successful completion, there will be no further need of the process and thread handles returned by CreateProcess, so they are closed. Since these handles are not in a SmartHandle structure, they are closed explicitly with ::CloseHandle. The handles this process has to the write side of the stdout and stderr handles are closed.

_tmain(): Thread Creation

To create the thread, I will use _beginthreadex. But there's a problem I have to solve first. By default, a console app assumes that it is not going to be multithreaded, so it is configured with the "single threaded" C runtime library. To create threads, I have to reconfigure the build to use the "multithreaded" C runtime library. Select the project in the Solutiontab, right click on it, and ask for Properties. The screen below will be displayed:

Select the configuration you want (in this case, the Debug configuration), select the C/C++ property, select Code Generation, go to the Runtime Library option, and drop it down. Select the appropriate Multi-threaded library. For the Release configuration, this would be the Multi-threaded runtime library; for the Debug configuration, it would be the Multi-threaded Debug library. Alternatively, you may choose to use the DLL versions of the C runtime library.

    //****************************************************************
    // Create the threads to handle the pipes
    //****************************************************************
    unsigned id;

    //----------------
    // stdout
    //----------------
    SmartHandle stdoutThread = (HANDLE)_beginthreadex(NULL, 0, reader, 
                new ThreadParms(stdout_read, SourceFlags::StdOut, 
                iocp, cmd.IsUnicode), 0, &id);
    if(stdoutThread == NULL)
       { /* thread create failed */
        DWORD err = ::GetLastError();
        _ftprintf(stderr, _T("Thread creation for stdout failed, error %s\n"), 
                  ErrorString(err));
        return Result::THREAD_FAILURE;
       } /* thread create failed */        

    stdoutThread.Close(); // handle will never be used

    //----------------
    // stderr
    //----------------
    SmartHandle stderrThread = (HANDLE)_beginthreadex(NULL, 0, reader, 
                new ThreadParms(stderr_read, SourceFlags::StdErr, 
                iocp, cmd.IsUnicode), 0, &id);
    if(stderrThread == NULL)
       { /* thread create failed */
        DWORD err = ::GetLastError();
        _ftprintf(stderr, _T("Thread creation for stderr failed, error %s\n"), 
                  ErrorString(err));
        return Result::THREAD_FAILURE;
       } /* thread create failed */        

    stderrThread.Close(); // handle will never be used

To pass parameters to the thread, I use the ThreadParms class. It packages up the HANDLE of the stream to read, a flag that we will use to distinguish which stream is notifying us about an event, the handle of the I/O Completion Port, and the Boolean flag to indicate if the child pipe is Unicode or not. If one of these fails, it will return, and note that all currently-open handles are all SmartHandle objects, so the handles will be implicitly closed by the destructor SmartHandle::~SmartHandle, so we don't need to keep track of the handles on our own.

Class ThreadParms

/****************************************************************************
*                              class ThreadParms
****************************************************************************/

class ThreadParms {
    public:
       ThreadParms(HANDLE h, SourceFlags::FlagType f, HANDLE io, BOOL uni) {
          stream = h;
          flags = f;
          iocp = io;
          IsUnicode = uni;
       }
    public:
       HANDLE stream;
       SourceFlags::FlagType flags;
       HANDLE iocp;
       BOOL IsUnicode;
};

The I/O Completion Port Protocol

I am going to use the I/O Completion Port for inter-thread communication from the worker threads to the main thread. To do this, I will use ::PostQueuedCompletionStatus which allows me to pass three parameters: a DWORD, a ULONG_PTR, and a pointer, which is nominally an LPOVERLAPPED pointer, but in fact can be any pointer of our choosing for ::PostQueuedCompletionStatus.

I could have chosen a variety of techniques to handle this, and I chose the following encoding. There is no reason to use this design in preference to other designs, such as packaging everything up in an object pointed to by the pointer parameter. Any combination not shown below would be an error.

DWORD NumberOfBytesTransferred ULONG_PTR CompletionKey LPOVERLAPPED Overlapped Meaning
SourceFlags::StdOut 0 (LPOVERLAPPED)(CString *) stdout line to display
SourceFlags::StdErr 0 (LPOVERALLPED)(CString *) stderr line to display
0 SourceFlags::StdOut NULL stdout has terminated
0 SourceFlags::StdErr NULL stderr has terminated

Class SourceFlags

The flags are specified in the SourceFlags class:

/****************************************************************************
*                              class SourceFlags
****************************************************************************/

class SourceFlags {
    public:
       typedef enum { None = 0, StdOut=1, StdErr=2 } FlagType;
    }; // class SourceFlags

_tmain(): Receive Thread Messages

    //****************************************************************
    // Run the loop until both stdout and stderr are broken
    //****************************************************************

    SourceFlags::FlagType broken = SourceFlags::None;

    Result::Type result = Result::SUCCESS;
    
    while(broken != (SourceFlags::StdOut | SourceFlags::StdErr))
       { /* watch pipes */
        OVERLAPPED * ovl;
        DWORD bytesRead;
        ULONG_PTR key;

        //----------------------------------------------------------------
        // bytesRead: the item flag
        //          SourceFlags::StdOut for stdout data
        //          SourceFlags::StdErr for stderr data
        // key: the termination flag
        //          SourceFlags::StdOut when stdout breaks
        //          SourceFlags::StdErr when stderr breaks
        // Note: the <bytesRead, key> pair will either be of the form
        //              <flag_*, 0>   for data notification
        //              <0, flag_*>   for termination notification
        // ovl: (LPOVERLAPPED)(CString *)
        //----------------------------------------------------------------
        BOOL ok = ::GetQueuedCompletionStatus(iocp, &bytesRead, 
                                         &key, &ovl, INFINITE);

        if(!ok)
           { /* failed */
            DWORD err = ::GetLastError();
            result = Result::IOCP_ERROR;
            _ftprintf(stderr, _T("GetQueuedCompletionStatus failed, error %s\n"), 
                      ErrorString(err));
            break;
           } /* failed */

       broken = (SourceFlags::FlagType)(broken | (int)key);
        if(key != 0)
           continue;  // termination notifications contain no data

        CString * s = (CString *)ovl;

        WriteToOutput(*s, (SourceFlags::FlagType)bytesRead, cmd);

        delete s;
       } /* watch pipes */

The trick here is that as each thread finishes, it sends a termination notification. When both threads have sent their termination notification, the main thread will exit the loop, and then the program will terminate.

_tmain(): Cleanup

    //****************************************************************
    // Cleanup
    //****************************************************************
    stdout_read.Close();
    stderr_read.Close();

    return result;
   }  // _tmain

WriteToOutput

This method takes a pointer to the string, the flags that indicate the source, and a pointer to the command line options structure (which is used to determine the format of the output, for example). Its responsibility is to "wrap" the string it is given in whatever context is required to make it "display" correctly. If the output is directed to a console, I do this by setting the text attributes of the console buffer to display in the correct colors; if it is going to HTML (using the -html flag in the command line), I put the right kind of environment around it. There isn't anything really deep going on here.

/****************************************************************************
*                                WriteToOutput
* Inputs:
*       const CString & s: String to write
*       SourceFlags::FlagType flag: Flag to indicate source, StdIn or StdOut
*       CommandLine & cmd: Command line options
* Result: void
*       
* Effect: 
*       Writes the string to the output stream
* Notes:
*       
****************************************************************************/

void WriteToOutput(const CString & s, SourceFlags::FlagType flag, CommandLine & cmd)
    {
     if(cmd.HTML)
        { /* HTML */
         CString classname;
         switch(flag)
            { /* decode */
             case SourceFlags::StdOut:
                classname = _T("stdout");
                break;
             case SourceFlags::StdErr:
                classname = _T("stderr");
                break;
            } /* decode */
         _ftprintf(stdout, _T("<p.%s>%s</p>\n"), classname, ToHTML(s));
        } /* HTML */
     else
        { /* console */
         HANDLE console = ::GetStdHandle(STD_OUTPUT_HANDLE);

         CONSOLE_SCREEN_BUFFER_INFO info;
         ::GetConsoleScreenBufferInfo(console, &info);

         switch(flag)
            { /* decode */
             case SourceFlags::StdOut:
                ::SetConsoleTextAttribute(console, FOREGROUND_INTENSITY | FOREGROUND_GREEN);
                break;
             case SourceFlags::StdErr:
                ::SetConsoleTextAttribute(console, FOREGROUND_INTENSITY | FOREGROUND_RED);
                break;
            } /* decode */
         _fputts(s, stdout);
         _fputts(_T("\r\n"), stdout);
         ::SetConsoleTextAttribute(console, info.wAttributes);
        } /* console */
    } // WriteToOutput

ToHTML: HTML Conversion

Because the output might contain characters such as '<', '>', or '&', which have meaning in HTML as metacharacters for formatting, it is necessary to convert such characters so they are not going to cause a conflict with the HTML rendering engine. The translations are:

Character Translation
& &amp;
< &lt;
> &gt;
/****************************************************************************
*                                   ToHTML
* Inputs:
*       const CString & s:
* Result: CString
*       Modified string with <, > and & replaced with HTML escapes
****************************************************************************/

CString ToHTML(const CString & s)
    {
     CString t = s;
     t.Replace(_T("&"), _T("&amp;")); // this must be the first one
     t.Replace(_T("<"), _T("&lt;"));
     t.Replace(_T(">"), _T("&gt;"));
     return t;
    } // ToHTML

Reader: Top-level Thread Function

This is the top-level thread function. As such, it is coded very simply: while there is something in the pipe, send a notification to the main thread for each line in the data. Then, flush any pending partial line, and finally, send a notification that the thread has terminated.

/****************************************************************************
*                                   reader
* Inputs:
*       LPVOID p: (LPVOID)(ThreadParms *) Thread parameters
* Result: UINT
*       irrelevant, 0, always
* Effect: 
*       Parses the data from the stream and emits it as a sequence of lines
****************************************************************************/

UINT __stdcall reader(LPVOID p)
    {
     ThreadParms * parms = (ThreadParms *)p;

     PipeReader pipe(parms->stream, parms->IsUnicode);
     
     CString Prefix;

     while(TRUE)
        { /* processing loop */      
         if(!pipe.Read())
            { /* failed stream */
             break;
            } /* failed stream */

         FormatAndOutput(pipe.GetString(), Prefix, parms);
        } /* processing loop */

     if(!Prefix.IsEmpty())
        { /* write out last line */
         CString text(_T("\r\n"));
         FormatAndOutput(text, Prefix, parms);
        } /* write out last line */

     ::PostQueuedCompletionStatus(parms->iocp, 0, parms->flags, NULL);
     return 0;
    } // reader

FormatAndOutput: Line Splitter

This function simply splits up the packet that comes back (which can contain several lines), and sends each complete line to the main thread for subsequent formatting and display. If there is any partial line left over, that is placed in the Prefix variable and will be concatenated to the front of the next line that comes in on the next iteration. Because this means that if the last few characters of the pipe stream will be held in the Prefix because they do not end with a line terminator sequence, it is necessary to "flush" these by calling it one last time with a newline sequence and the remaining characters of the partial line, but only if there is a remaining partial line. That is the code shown in the reader thread.

/****************************************************************************
*                               FormatAndOutput
* Inputs:
*       CString text: data to show
*       CString & prefix: Prefix for data--leftover partial line from last call
*       ThreadParms * parms: Parameters for the thread
* Result: void
*       
* Effect: 
*       Parses the data into lines. Retains any partial line (not terminated
*       by a newline sequence) in the Prefix for the next call
* Notes:
*       To force the last partial line out, it must be called with a newline
*       string as text.
*
*       The text is allocated in this thread, on the heap, and must be
*       disposed of by the recipient.
****************************************************************************/

void FormatAndOutput(CString text, CString & prefix, ThreadParms * parms)
    {
     text = prefix + text;

     while(TRUE)
        { /* break into lines */
         int n = text.Find(_T("\r\n"));
         if(n < 0)
            { /* done */
             prefix = text;
             return;
            } /* done */
         CString * s = new CString(text.Left(n));
         
         ::PostQueuedCompletionStatus(parms->iocp, (DWORD)parms->flags, 0, (LPOVERLAPPED)s);
         text = text.Mid(n+2);
        } /* break into lines */
    } // FormatAndOutput

The important feature here is that the string which is passed across the thread boundary is allocated from the heap, and disposed of by the recipient.

Class PipeReader

This is a class I adapted from another application I did. What it does is read from the pipe and handle all the details of what happens if an odd number of bytes is read from a pipe which is expected to be sending Unicode data. Otherwise, there's nothing really deep going on.

/****************************************************************************
*                              class PipeReader
****************************************************************************/

class PipeReader {
    protected:
       static const UINT MAX_BUFFER = 1024;

    public:
       PipeReader(HANDLE str, BOOL uni) { Init(); stream = str; IsUnicode = uni; }
       //****************************************************************
       //                       PipeReader::GetString
       // Result: CString
       //       The most recent string read by the Read method
       // Notes:
       //       This must be done before the next call on Read or the
       //       buffer will be overwritten
       //       Unicode reception is fully supported only in Unicode builds
       //****************************************************************
       CString GetString() { 
           if(IsUnicode) 
               return CString((LPCWSTR)buffer); 
           else 
               return CString((LPCSTR)buffer); }
       //****************************************************************
       //                         PipeReader::Read
       // Result: BOOL
       //       TRUE if there is data in the buffer
       //       FALSE if the pipe has broken
       // Effect:
       //       Reads data from the pipe, and makes it available for
       //       the next GetString call
       // Notes:
       //       If the pipe is being treated as Unicode and an odd number
       //       of bytes has been read, retain the last byte and prepend it
       //       to the next ReadFile buffer
       //****************************************************************
       BOOL Read() {
          if(Offset == 1)
             buffer[0] = reread;

          if(!ReadFile(stream, &buffer[Offset], MAX_BUFFER - 
                      (IsUnicode ? sizeof(WCHAR) : sizeof(char)), &bytesRead, NULL))
             return FALSE;

          if(IsUnicode)
             { /* unicode pipe */
              if((Offset + bytesRead) & 1)
                 { /* odd bytes read */
                  Offset = 1; // offset for next read
                  reread = buffer[Offset + bytesRead - 1]; // force reread
                  buffer[Offset + bytesRead - 1] = 0; // remove from current buffer
                  bytesRead--;   // pretend we didn't see it
                 } /* odd bytes read */
              else
                 { /* even bytes read */
                  Offset = 0; // offset for next read
                 } /* even bytes read */

              buffer[Offset + bytesRead] = 0;
              buffer[Offset + bytesRead + 1] = 0; // create Unicode NUL
             } /* unicode pipe */
          else
             { /* ANSI pipe */
              buffer[bytesRead] = '\0';
             } /* ANSI pipe */
         return TRUE;
       } // PipeReader::Read

    protected:
       void Init() { stream = NULL; Offset = 0; IsUnicode = FALSE; }
       BOOL IsUnicode;
       HANDLE stream;
    protected:
       BYTE buffer[MAX_BUFFER];
       DWORD Offset;
       BYTE reread;
       DWORD bytesRead;
}; // class PipeReader

Summary: Use Threads

Whenever you get into a situation where you end up polling, because you can't block the thread that is polling, consider instead using secondary threads to perform the computations. If there is nothing to do, this application consumes zero CPU time.

As far as programming style, note that there is not a single global variable in any of this code. None are needed, none are used. There is no explicit synchronization; all synchronization is implicit in the use of ::GetQueuedCompletionStatus and the corresponding ::PostQueuedCompletionStatus calls. The best synchronization is no synchronization. This code is a combination of the "positive handoff" model (wherein responsibility for an object is handed off from one thread to another) and the "central manager" model (the output stream, whether console or HTML stream, is managed by a single thread).

History

10 Jun 2008 - Small bugs fixed in ErrorString and ToHTML

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

Joseph M. Newcomer
United States United States
Member
No Biography provided

Sign Up to vote   Poor Excellent
Add a reason or comment to your vote: x
Votes of 3 or less require a comment

Comments and Discussions

 
You must Sign In to use this message board.
Search this forum  
    Spacing  Noise  Layout  Per page   
GeneralNo advantage over PeekNamedPipe()memberMember 428961318 Jul '08 - 0:24 
When using PeekNamedPipe() inside a separate thread with something similar to Sleep(50) between calls to PeekNamedPipe(), we have exactly the same result without 100% CPU usage. So it is the same, but does not need all the code you wrote.
By the way, I was hoping that you really have solved the problem of synchronous reading from a busy external process. But it is not solved. If you try to run this sample program:
 
void do_something();
 
int main(int argc, char* argv[])
{
    int i = 10;
    for ( ; i <= 100; i += 10 )
    {
        do_something();
        printf( "%u%%\n", i );
    }
}
 
void do_something()
{
    int i = 0;
    for ( ; i < 1000000; i++ )
    {
        sin( i/3.14 );
    }
}
 
you will see all output at once after the program exits.
 
In your example, you use fflush(), but we both know that usually external tools like math-calculation programs do not use fflush() inside their code, so we see all the output in the end of calculations, not in progress of the calculations.
This is the real problem, and I did not see the solution yet...
GeneralRe: No advantage over PeekNamedPipe()memberJoseph M. Newcomer18 Jul '08 - 6:23 
Putting gratuitous Sleep() calls in a thread reduce CPU utilization, and also limit throughput. So I see no particular reason to limit the throughput. It also means that if there is no output, the thread runs anyway, polling. So I don't understand how a trivial metric such as "CPU Utilization" is a measure of the quality of the program; I limit the CPU utilization by not doing anything if there is nothing to do.
 
I don't follow the sentence about fflush() at all. It doesn't even make sense. What is an "external tool"? Your example shows a long loop that computes the sine of an angle, which just seems to be a complicated way to waste time. But the sine routine is not an "external tool", it is a "library call", and since at no point are you doing anything that generates output, the comment about seeing *all* the output at the end doesn't even correlate to the example code you show.
 
Perhaps you are confusing the application programmer doing a flush operation with the library doing one internally, but I fail to see what any of this has to do with the example I showed. But I really can't make semantic sense of that sentence at all. And as what is the "real" problem, given I can't parse the previous sentence and apply any semantic meaning to it, I don't know what the "real" problem actually is.

GeneralRe: No advantage over PeekNamedPipe()memberMember 428961318 Jul '08 - 9:09 
Well, that's what I mean:
1. take the source code I've submitted, and build it as an console application.
2. run it in standard console (inside cmd.exe).
Here is the console's output:
(pause)
10%
(pause)
20%
(pause)
30%
...
100%
 
I.e. each percent value is shown after some pause - after do_something() is executed.
 
3. Now run the same console application through your piper.
Here is the piper's console's output:
(very long pause)
10%
20%
30%
...
100%
 
I.e. all percent values are shown at once - after the console application is executed. You don't see each percent value after each do_something() is executed, because the pipe does not read anything from the aplication's output while the application is "busy".
This is what I mean when talking about external application (i.e. about our console application which executes do_something()).
As I wrote, I hoped that you have solved this problem by using asynchronous reading from the pipe.
GeneralNon-locking designsmembersupercat910 Jul '08 - 14:26 
It seems to me that many data structures can be implemented reasonably easily using Interlocked.CompareExchange rather than locking primitives. This avoids problems that can occur if the execution of a thread that holds a resource gets stuck (e.g. because of priority inversion or other problems). I'm glad to see someone's using non-locking methods; I wonder why they're not more common.
 
BTW, I wonder what the best algorithm would be for using CompareExchange to build a class that supports true atomic assignments (i.e. not just an atomic read followed by an atomic write, but an assignment that guarantees that it is not possible for another thread to modify both variables between the read and the write). I figured out a way to do it that requires no locks nor memory allocations during executions (it does require passing in a pointer to some temporary objects in shared memory, but those may objects may be reused on subsequent calls). It seems somewhat absurdly complicated, though--I wonder if there's an easier method?
GeneralRe: Non-locking designsmemberJoseph M. Newcomer10 Jul '08 - 14:57 
Certain designs in fact *are* implemenented using InterlockedCompareExchange. However, these are extremely limited mechanisms. Note that my approach to "non-locking" often involves "locking below the level where the programmer has to reason about it" such as PostMessage, PostThreadMessage, and PostQueuedCompletionStatus.
 
Lock-free algorithms often involve what is called "speculative computations", such as you will find in OpenMP; there is an "atomic" *= operation but what it does is a very ordinary computation and I.C.E. to validate that the operand has not changed since the computation began, e.g.,
A *= B;
is
L: fetch A ("original_A")
make copy of A
multiply copy of A by B
I.C.E. A == original_A, exchange A with product if same
if(failed to exchange) goto L;
 
Studies have shown that except in high-collision situations this is actually the fastest way to accomplish this without locks, even if there are several retries (there's some white papers on the Intel site concerning this)
 
Most lock-free algorithms I've seen are either very subtle but absolute, or involve speculative computation with I.C.E.
 
Note that some locking primitives, such as spin locks (kernel), queued spin locks (kerenel) and CRITICAL_SECTION (user) all tend to use I.C.E., and all other primitives (mutex, semaphore, event, etc.) use I.C.E. at their lowest levels. Queued spin locks are interesting because they implement a FIFO queue with a spin lock that (a) reduces memory contention ont he bus by spinning on a local stack copy, essential for NUMA-architecture performance and (b) unlike normal spin locks, guarantee FIFO order (no starvation of a thread)
joe
GeneralRe: Non-locking designsmembersupercat910 Jul '08 - 16:07 
Joseph M. Newcomer wrote:
Studies have shown that except in high-collision situations this is actually the fastest way to accomplish this without locks, even if there are several retries (there's some white papers on the Intel site concerning this)

 
That's what I would expect. I see a lot of code that locks short critical sections rather than using speculative execution. Such code will probably work fine most of the time, but things like priority inversion can wreak havoc on a system.
 
It's been ages since I studied operating systems, but I would expect that in major high-end systems (e.g. credit-card processing) would support revocable lock requests. Unlike the spinlock situation, a task would wait in line to receive a resource, but unlike the conventional-locking case, a task that got waylaid wouldn't hold up the rest of the system (computation would have to be performed in such a way that the system could unwind it if it needed to take control from the wayward task). I wonder whether it would make sense to incorporate such a feature at the OS level?
 
BTW, do you know of any good algorithms for implementing an atomic-assignment class? It seems an interesting puzzle. It's pretty easy to formulate a solution that 'almost' works, but any simple approaches I can figure end up with some odd wrinkles in tricky cases. The only solid solution I can figure is to have the objects share a queue of pending requests. Each request includes a sequence number. Actual data items are stored in reference objects that contains the actual data along with a sequence number. The queue is constructed such that items will remain on it until such time as somebody completely finishes processing them. This means multiple threads may try to process the same item from the queue, but sequence numbers can be used to ensure that only the first such request will be honored. Rather a nasty approach, but I think it should be always correct. Can you offer a better approach?
GeneralRe: Non-locking designsmemberJoseph M. Newcomer10 Jul '08 - 19:06 
Priority inversion matters only when there is a sufficiently long computation that the priority inversion can matter. If a low-priority thread locks a lock for 100ns, it probably will have no detectable impact on the system. Spin locks tend to lock very short sequences (often < 10ns) so there is rarely a problem.
 
Revokable locks pose problems in that there is no well-defined way to break a lock if it is set. The most common problem is deadlock detection, and that can be done by using locks with timeouts (e.g., mutex/WaitForSingleObject); the usual practice is that you are locking a sequence, for simplicity we'll take the short case A->B, but some other thread has locked B->A (violating canonical order rules). But if the protocol is
lock A
modify A
lock B
modify B
unlock B
potential further modifications of A
unlock A
 
then the "lock B" can have a timeout, so the protocol is
 
lock A
modify A
lock B
failed: restore A
unlock A
exponential time backoff (e.g., see Ethernet collision detection)
success: modify B
unlock B
potential further modifications of A
unlock A
 
Atomic assignment to scalar values which are DWORD-aligned are enforced by the hardware and therefore nothing special is required
 
Atomic assignments to compound values cannot be made atomic; that's why we have locking mechanisms.
 
A queue in which items remain until processed sounds like a singularly bad design; I wouldn't even consider it as a functional example of coding.

GeneralRe: Non-locking designsmembersupercat911 Jul '08 - 8:11 
My suggested design for a revocable lock (most typical in a database or similar application) would be to have the thread that acquires the lock do speculative execution, and then end by giving the OS the lock along with a small list of everything that needs to be done to finalize the transaction. If the lock had been kept, the transaction would completed. If the lock had been revoked, the transaction would be discarded. If desired, the holder of the lock could periodically check to ensure that it was still held and abort the transaction early if not.
 
Algorithmically, the situation would be somewhat analogous to spinlock algorithms, except that the request to complete the transaction could support multiple atomic writes (the OS would confirm that the writes could be made, then make the lock irrevocable, then perform the writes, then release the lock).
 
As to whether assignments are atomic, I suppose that depends what is meant by "assignment". So far as I'm aware, every single-processor system guarantees that reads and writes of operands that are no bigger than the internal data bus will be atomic; most multi-processor systems offer the same guarantee. On some systems, it is possible to perform a statement "x=y" and be certain that even if x and y are both writable by processes (or in an interrupt, or whatever) it would not be possible for both x and y to be written elsewhere between the read of y and the write of x. On some systems, the normal way of doing the assignment would be atomic; on others, there exists an instruction to do the atomic write but it would not generally be used for normal assignments.
 
Just it is useful to talk of a processor having (or not having) features for e.g. an atomic increment, I think it is relevant to talk about whether assignments (a read and write, with no other data manipulation) can be done atomically. IIRC, on the 80386 a MOVS instruction was atomic; if a page fault occurred on the destination operand, the source operand would be reread. I don't think MOVS can be made atomic on a multi-processor system.
 
It's possible to synthesize an atomic-assignment class using CompareExchange spinlocks, but I don't know how best to do it. The queue I described would only accumulate data if multiple assignment requests occurred simultaneously. To avoid waiting, every object would have to be able to read items from the queue and process them. Am object would give up processing items from the queue once its own item had been processed.
 
Using a centralized queue is a bit dodgy, but the solution should operate correctly without busy-waits in all cases. I'm curious how a better solution would work.
GeneralRe: Non-locking designsmemberJoseph M. Newcomer11 Jul '08 - 8:39 
The problem with "providing the OS with a list of everything that needs to be done" means you have to provide the address of a rollback callback routine, since there is no possible way you can embody every possible way to undo in a "list" of things to do. This could be done simply as a wait operation WaitForRevokableLock(lock, timeout, callback) or something like that. When the callback completes, the lock is released. Multiprocessor systems guarantee that an assignment is atomic (I know of no exceptions to this; otherwise it would require allowing two processors to write the same memory location at the same memory cycle, something that is nearly impossible to implement). For x=y, this cannot be made atomic, because the fetch is separate from the store; only the assignment itself (the store) is atomic.
 
MOVS was never atomic, not even on an 8088, let alone a 386. Neither the fetch nor the store is protected in any way. Remember that under DMA, there was always a second "processor" that could access the memory.
 
One reason that hardware favors simple approaches to synchronization (no hardware-based revokable locks, etc.) is that there is no reason to build incredibly complex and specialized mechanisms in hardware when these mechanisms can be built in software. This tends to be the engineering criterion for general-purpose programmable architectures. Pipelined architectures, data-flow machines, and other esoteric architectures have approached this in other ways, and one of the most important was to design algorithms that partition for concurrency without requiring synchronization at all.
GeneralRe: Non-locking designs [modified]membersupercat911 Jul '08 - 9:06 
MOVS was never atomic, not even on an 8088, let alone a 386. Neither the fetch nor the store is protected in any way. Remember that under DMA, there was always a second "processor" that could access the memory.
 
Good point. On a system which included an 80386 and some DMA hardware that would write to memory, a MOVS would probably not have been atomic (the 80386 probably allows DMA operations mid-instruction, though some older processors like the Z80 and 6502 did not, IIRC). On a system without such hardware, MOVS would be atomic. In a system where it is known that there will be no DMA accesses to a region of memory, MOVS operations on such a region are atomic. A number of smaller microcontrollers do have atomic memory-to-memory move instructions, with nothing that could interfere with their atomic operation.
 
BTW, I just thought of a simple approach for an atomic-assignment class if one is allowed to allocate a new object with each read or write operation and two for each assignment. The root object contains a reference to a holding object which has the data and a counter. For a simple write operation, create a new object, latch the root object's pointer, fill in the data fields, and increment the counter by one (compared to the one in the latched object). CompareExchange the root object's pointer to point to the new one; if it fails, loop back to latching the old object pointer. For a read, create a new object, latch the root object's pointer, copy the data from the latched one, increment the counter, and again do the CompareExchange.
 
Then to do an atomic assignment, create objects for the read and write, then combine the read and write operations, looping back to the start of the read if either operation fails.
 
The only problem I see is that the approach requires allocating new objects for each operation performed. I can't see any way to safely use an object pool, though perhaps there is a way.
 
Edit: hmm... another problem: livelock. Read operations in and of themselves do not constitute useful work. A failure on the ICE for the write operation doesn't necessarily mean the data needs to be reread. If I relatch the values for the write and then see that the read operand hasn't been written (this would imply separate counters for writes and reads) there's no need to "reread" it. Thus, the only way the work accomplished by a read can be undone is for another process to manage a successful write.
 
modified on Friday, July 11, 2008 3:16 PM

GeneralRe: Non-locking designsmemberJoseph M. Newcomer11 Jul '08 - 9:28 
No, MOVS was never atomic. There was a fetch of a value, and a store of a value. These were two separate operations. If a page fault occurred during the store, the fetch was redone, but by that time the fetched value could have changed.
 
Whether or not DMA or similar mechanisms could occur depends on the processor design and where it is willing to do bus grants during the bus transactions.
 
Allocation of objects is quite expensive, and itis not clear from a performance viewpoint that anything that involves doing allocations in this fashion would produce acceptable performance, and it also doesn't guarantee atomicity at more than a scalar value level. Unfortunately, nearly all intersting tasks require multiple value manipulation.
GeneralRe: Non-locking designsmembersupercat911 Jul '08 - 10:35 
A move operation can be regarded as atomic if it is not possible for the following sequence of events to take place in sequence:
  1. The source value is latched
  2. The source operand is changed
  3. The destination operator is read or changed
  4. The latched value is written to the destination
. The only read operation which is of any consequence is the one which latches the value written. If a page fault during a MOVS will cause the read to be repeated, and if there is no hardware DMA that can overwrite the source operand, the operation is atomic. The only way the MOVS can have any effect at all is if it successfully reads the source and writes the destination without any intervening instructions.
 
I'm not clear what you mean when you say that atomicity would only be guaranteed at a scalar value level. The target data object could contain any number of fields, and they would all be updated atomically. The only limitation on the amount of data it could contain would be that as it became bigger, more time would be wasted on retries.
 
I agree that having to allocate new objects would be icky. It would seem like it shouldn't be necessary, but I haven't yet figured out a safe way to avoid it. An interesting puzzle.
 
BTW, with regard to priority inversion, I would think that while limiting the amount of code in a critical section could minimize the likelihood of priority inversion occurring, it would not protect against adverse effects if it does occur. In some situations, low priority threads may go over a second without executing any instructions. A critical section may only be five instructions long, but if a low-priority thread loses its quantum during those five instructions and doesn't executing anything else for a second or two, that could severely disrupt a realtime-priority thread.
GeneralOne minor correction - ErrorString leaksmembererangi9 Jul '08 - 20:47 
This has absolutely nothing to do with the actual problem whose solution is demonstrated above, but still -
When calling FormatMessage with FORMAT_MESSAGE_ALLOCATE_BUFFER, a buffer is allocated and assigned to lpBuffer. As per MSDN, this buffer should be freed by calling LocalFree. In ErrorString, msg get that buffer, which is then assigned to the temp CString that is returned. This means the CString will contain the right message, but the buffer pointed by msg will leak.
 
In this project this leak will have no practical effect. However, in case someone likes ErrorString and copies it into a different project, it'd better off be fixed.
GeneralRe: One minor correction - ErrorString leaksmemberJoseph M. Newcomer9 Jul '08 - 22:05 
You are right; I had commented that this was a "stripped-down" version of my more general solution, and I seemed to have been a bit too brutal in the strip-down. In the full version, I assign the LPTSTR to the CString, LocalFree the LPTSTR, and return the CString. This was clearly stripped down far too late at night...
 
Good catch, I'll fix the master article and tell the CodeProject folks to fix this one.
GeneralVery nice!memberHarold Bamford1 Jul '08 - 9:44 
Nice to see you posting here again! And this code will definitely get squirreled away for later use.
 
Good job.
GeneralSubtitle from another article. :)memberLeo Davidson1 Jul '08 - 4:06 
I think the subtitle from another article was accidentally pasted into this article.
 
Below the main article title at the top of the page (and also in the RSS feed), it says:
 
"When a dynamically-resizable dialog is created, you may find it desirable to have controls resize or rearrange themselves dynamically in response to these changes. This project shows some simple techniques to use when the arrangements are not too complex. It also incorporates two other projects, the..."
 
Smile | :)
GeneralOT: strange syntaxmemberpetr330 Jun '08 - 22:51 
first: sorry for OT...
Q: what for is the notation:
 
"class CommandLine">class CommandLine
{
   //something
};
?
i mean the string in front of class-keyword.
 
tnx.
GeneralRe: OT: strange syntaxmemberJoseph M. Newcomer1 Jul '08 - 5:00 
I suspect it is an error in your browser's rendering of the HTML. I certainly don't see it in IE. It is probably part of
 
class CommandLine
 
joe

GeneralRe: OT: strange syntaxmemberpetr31 Jul '08 - 6:46 
you could be right. ATM the code is displayed correctly in my Firefox.
Funny part is if you try to compile the code I pasted in my previous message it succeeds.

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Permalink | Advertise | Privacy | Mobile
Web03 | 2.6.130516.1 | Last Updated 10 Jul 2008
Article Copyright 2008 by Joseph M. Newcomer
Everything else Copyright © CodeProject, 1999-2013
Terms of Use
Layout: fixed | fluid