Click here to Skip to main content
15,889,839 members
Articles / Desktop Programming / MFC

AxPipe - Multi-Threaded Binary Stream C++ Class Library

Rate me:
Please Sign up or sign in to vote.
3.71/5 (7 votes)
14 Jan 20042 min read 61.4K   1.1K   17  
A small and efficient collection of classes and templates to create multi-stage multi-threaded data processing pipe lines
/*! \file
    \brief Implementation of AxPipe::CFilter, pull-style filters of different kinds

    @(#) $Id: CFilter.cpp,v 1.2 2004/01/01 20:05:53 svante Exp $

    AxPipe - Binary Stream Framework

    Copyright (C) 2003 Svante Seleborg/Axon Data, All rights reserved.

    This program is free software; you can redistribute it and/or modify it under the terms
    of the GNU General Public License as published by the Free Software Foundation;
    either version 2 of the License, or (at your option) any later version.

    This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
    without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
    See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with this program;
    if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
    Boston, MA 02111-1307 USA

    The author may be reached at mailto:axpipe@axondata.se and http://axpipe.sourceforge.net

    Why is this framework released as GPL and not LGPL? See http://www.gnu.org/philosophy/why-not-lgpl.html

----
\verbatim
    CFilter.cpp                     Implementation of CFilter, pull-style filters of different kinds

    E-mail                          YYYY-MM-DD              Reason
    axpipe@axondata.se              2003-12-01              Initial
\endverbatim
*/
#include "stdafx.h"

namespace AxPipe {
    /// \brief Helper static member function to send as StartProc to the CCoContext m_ctxFilter.
    ///
    /// It's only function is to get us 'back into the class', by calling CoStartFilter().
    /// \param pvThis A pointer to 'this' CFilter.
    void
    CFilter::CoFilter(void *pvThis) {
        ((CFilter *)pvThis)->CoStartFilter(pvThis);
        // Should never get here!
        ASSCHK(false, _T("CFilter::CoFilter unexpected fiber exit"));
    }

    /// \brief Initialize member variables.
    CFilter::CFilter() {
        m_ctxFilter.Init(this, CoFilter, this);
        m_ctxWork.Init(this, NULL, NULL);   // The Work context starts as the 'current'
        m_fFirstWork = true;                // Ensure that we know when Work() is called first time.
    }

    CFilter::~CFilter() {
        m_ctxFilter.Stop();
    }

    /// \brief Overriden Out() to handle switching to Filter co-routine context.
    ///
    /// A CFilter derived Out() may be called with zero-length and NULL pSeg from OutClose() and
    /// OutFlush(), as this is how those conditions are signalled to the InFilter() via Read().
    /// \param pSeg The segment to send to the filter.
    void
    CFilter::Out(CSeg *pSeg) {
        if (!m_fIsOpen) {
            SetError(ERROR_CODE_NOTOPEN, ERROR_MSG_NOTOPEN);
            return;
        }

        m_pSeg = pSeg;
        // Switch to Read() and InFilter()
        OutputDebugString(_T("CFilter::Out(CSeg *pSeg) m_ctxFilter.Go()\n"));
        m_ctxFilter.Go();
    }

    /// \brief Prepare for processing.
    ///
    /// Filters by default do nothing on Open() request, this is called
    /// by Work() in the worker thread upon reception of the open
    /// in band signal.
    ///
    /// If overriden in derived classes, CFilter::OutOpen() must also be
    /// called, to ensure proper co-routine context initialization. Normally
    /// this will not be overridden for CFilter derived classes.
    ///
    /// \return true to cause propagation of Open() - the default here is false.
    bool
    CFilter::OutOpen() {
        OutputDebugString(_T("CFilter::OutOpen()\n"));
        if (m_fFirstWork) {
            // If first time, ensure we are executing in Work coroutine context now
            OutputDebugString(_T("CFilter::OutOpen() m_ctxWork.Go()\n"));
            m_ctxWork.Go();                 // This only initializes the context...
            m_fFirstWork = false;           // ...we get here immediately
        }
        // Default for filters is to not propagate
        return false;
    }
    
    /// \brief Send a NULL segment close signal to InFilter() and Read().
    ///
    /// If overriden in derived classes, CFilter::OutClose() must also be
    /// called, to ensure signalling to InFilter() / Read() . Normally
    /// this will not be overridden for CFilter derived classes.
    /// \return true to cause propagation of Close() - the default here is false.
    bool
    CFilter::OutClose() {
        Out(NULL);
        // Default for filters is to not propagate
        return false;
    }
    
    /// \brief Send flush-request as a zero-length segment to Read()
    /// \return true to cause propagation of Flus() - the default here is true.
    bool
    CFilter::OutFlush() {
        Out(new CSeg);
        return true;
    }

    /// \brief Send the m_pSeg segment to the Filter
    ///
    /// This override of the default adds the stopping of the filter context
    /// upon reception of a plug signal.
    void
    CFilter::Work() {
        CPipe::Work();
    }

    /// \brief The start in-class of the filter co-routine context.
    /// We get here when we have the first data segment ready for the InFilter()
    /// and we're opened by the previous.
    /// \param pvThis A pointer to 'this' CFilter (not really necessary).
    void
    CFilter::CoStartFilter(void *pvThis) {
        // The filter may be called multiple times. It should exit when receiving
        // a eSegTypeClose segment, ready to be called again when more arrives.
        do {
            OutputDebugStringF(_T("CFilter::CoStartFilter(void *pvThis) InFilter(), this=%p\n"), this);
            InFilter();                     // Shold return when eof/empty is signalled.

            // Drive the sender until we either get a valid data segment, or we're killed
            do {
                OutputDebugStringF(_T("CFilter::CoStartFilter waiting for data, this=%p\n"), this);
                m_ctxWork.Go();
            } while (m_pSeg == NULL || m_pSeg->Len() == 0);
            OutputDebugStringF(_T("CFilter::CoStartFilter found data, this=%p\n"), this);
        } while (true);
        // Never get here!
    }

    /// \brief Get a segment, call from InFilter().
    ///
    /// Get a valid, zero-length or NULL segment for data, flush and
    /// close respectively.
    /// \return A memory segment, or zero-length or NULL (not an error).
    CSeg *
    CFilter::Read() {
        // We may already have a segment waiting, at first call.
        if (!m_pSeg) {
            m_ctxWork.Go();
        }
        // m_pSeg can be valid, zero-length or NULL here. Nothing else.
        CSeg *pSeg = m_pSeg;
        m_pSeg = NULL;
        return pSeg;
    }
    /// \brief Helper routine to get next segment.
    ///
    /// m_pSeg can only be valid, zero-length or NULL here.
    /// If we already have a valid segment in m_pSeg, we don't
    /// get a new one.
    /// \return true if we return with a segment ready to use in m_pSeg.
    bool
    CFilterByte::GetNextSeg() {
        // Release if empty.
        if (m_pSeg && !m_pSeg->Len()) {
            m_pSeg->Release();
            m_pSeg = NULL;
        }
        if (!m_pSeg) {
            m_ctxWork.Go();
        }
        return m_pSeg != NULL;              // Success as long as we get a segment, but it might be zero-len
    }
    /// \brief Read a byte from the stream.
    /// \return A byte as an int, or -1 on eos or error
    int
    CFilterByte::ReadByte() {
        do {
            if (!GetNextSeg()) {
                return -1;
            }
        } while (!m_pSeg->Len());       // Ignore flush requests, just wait for data.

        // Now we now we have at least one byte.
        unsigned char c = *m_pSeg->PtrRd();
        m_pSeg->Drop(1);
        return c;
    }

    /// \brief Errror catcher, can't call Read() from CFilterByte derived.
    CSeg *
    CFilterByte::Read() {
        SetError(ERROR_CODE_GENERIC, ERROR_MSG_GENERIC, _T("Attempt to call CFilterByte::Read()"));
        return NULL;
    }

    /// \brief Skip bytes in stream.
    /// \param cb Number of bytes to skip
    /// \return Number of bytes not skipped because stream ended prematurely.
    size_t
    CFilterByte::Skip(size_t cb) {
        while (cb) {
            if (!GetNextSeg()) {
                break;
            }
            size_t cbChunk = m_pSeg->Len();
            if (cbChunk > cb) {
                cbChunk = cb;
            }
            m_pSeg->Drop(cbChunk);
            cb -= cbChunk;
        }
        return cb;
    }

    /// \brief Attempt to get a segment of a requested size.
    ///
    /// Always get the amount requested if possible. Return less than requested if
    /// EOS is detected. Can return NULL. Cannot return zero-length
    /// For this type of filter, honoring and handling flush-requests
    /// do not really make sense. Use a regular CFilter or CPipe if you need do do
    /// that. Asking for zero bytes means that we'll take what we get, right now we
    /// don't care about how much we get.
    /// \param cb The number of bytes we want in the returned segment.
    /// \return A segment with the request number of bytes, or less if eos, or NULL if no data at all.
    CSeg *
    CFilterBlock::ReadBlock(size_t cb) {
        // Zero means take what we get
        if (!cb) {
            while (GetNextSeg()) {
                if (m_pSeg->Len()) {
                    CSeg *pSeg = m_pSeg;
                    m_pSeg = NULL;
                    return pSeg;
                }
                m_pSeg->Release();
                m_pSeg = NULL;
            }
            return NULL;                    // No data - this is shown with NULL
        }
        // If no buffered data - we must get more in any case, so let's do it.
        if (!GetNextSeg()) {
            return NULL;                    // No data to get.
        }
        // This is a slight optimization to try to keep chunks in the original
        // segment as much as possible.
        // If the buffer contains the proper number of bytes already, clone it,
        // drop the bytes off the original, and set the length of the copy
        // returned.
        if (m_pSeg->Len() >= cb) {
            CSeg *pSeg = m_pSeg->Clone();
            m_pSeg->Drop(cb);
            pSeg->Len(cb);
            return pSeg;
        }

        // Now we know we must merge two or more buffers. Let's allocate a segment to return.
        CSeg *pSeg = new CSeg(cb);
        if (!pSeg) {
            SetError(ERROR_CODE_GENERIC, ERROR_MSG_GENERIC, _T("Out of memory"));
            return NULL;
        }

        size_t cbSeg;
        pSeg->Len(cbSeg = 0);           // Set the length of valid data in the segment to zero.
        // We also know at this point that we have valid data in the buffer.
        do {
            size_t cbChunk = m_pSeg->Len();
            if (cbChunk > cb - cbSeg) {
                cbChunk = cb - cbSeg;
            }
            memcpy(&pSeg->PtrWr()[cbSeg], m_pSeg->PtrRd(), cbChunk);
            m_pSeg->Drop(cbChunk);
            pSeg->Len(cbSeg += cbChunk);// Update the length of valid data in the segment.

            // If we've gotten all we need...
            if (cbSeg == cb) {
                return pSeg;
            }

            if (!GetNextSeg()) {
                return pSeg;        // Return what data we have.
            }
        } while (true);
        // Can't get here!
    }
};

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here


Written By
Web Developer Axantum Software AB
Sweden Sweden
I've been working with all aspects of software development since 1979 - from compiler construction to management. Currently I'm an independent consultant mostly specializing in computer security. Please see my homepage for contact details.

I speak C like a native, and have a pretty good grasp of C++. The most recent five years C# has been the main development language. Traditionally Unix has been the dominating environment, but currently the scales have tipped over to Windows, due to market demands but I'm equally at home developing in both environments.

When I'm not coding I'm usually sitting on one of my 4 bikes, indoors or outdoors, on the road or in the woods.

Comments and Discussions