Click here to Skip to main content
15,892,517 members
Articles / Programming Languages / C++

RCF - Interprocess Communication for C++

Rate me:
Please Sign up or sign in to vote.
4.94/5 (147 votes)
25 Oct 2011CPOL20 min read 4.6M   8.4K   331  
A server/client IPC framework, using the C++ preprocessor as an IDL compiler.
//*****************************************************************************
// RCF - Remote Call Framework
// Copyright (c) 2005. All rights reserved.
// Developed by Jarl Lindrud.
// Contact: jlindrud@hotmail.com .
//*****************************************************************************

#include <RCF/TcpAsioClientTransport.hpp>

#include <boost/bind.hpp>

#include <RCF/ByteOrdering.hpp>
#include <RCF/InitDeinit.hpp>
#include <RCF/TcpAsioSynchronizedSocket.hpp>
#include <RCF/TcpEndpoint.hpp>
#include <RCF/TimedBsdSockets.hpp> // timedConnect()

namespace RCF {
    // TcpAsioClientTransport

    TcpAsioClientTransport::TcpAsioClientTransport(const std::string &ip, int port, DemuxerPtr demuxerPtr/* = DemuxerPtr()*/) :
        mDemuxerPtr(demuxerPtr),// ? demuxerPtr : getSharedDemuxerPtr() ),
        mSynchronizedSocketPtr( new SynchronizedSocket(mDemuxerPtr, getSharedReadWriteMutexPtr())),
        mIp(ip),
        mPort(port),
        mEndpoint(),
        mEndTimeMs(),
        mStatePtr(new State(*mDemuxerPtr)),
        mTimer(mStatePtr->mTimer),
        mBytesTransferred(mStatePtr->mBytesTransferred),
        mTimeoutFlag(mStatePtr->mTimeoutFlag),
        mError(mStatePtr->mError),
        mEventMutex(mStatePtr->mEventMutex),
        mEvent(mStatePtr->mEvent)
    {
    }

    TcpAsioClientTransport::TcpAsioClientTransport(SynchronizedSocketPtr synchronizedSocketPtr, DemuxerPtr demuxerPtr/* = DemuxerPtr()*/) :
        mDemuxerPtr(demuxerPtr),// ? demuxerPtr : getSharedDemuxerPtr() ),
        mSynchronizedSocketPtr(synchronizedSocketPtr),
        mIp(),
        mPort(),
        mEndpoint(),
        mEndTimeMs(),
        mStatePtr(new State(*mDemuxerPtr)),
        mTimer(mStatePtr->mTimer),
        mBytesTransferred(mStatePtr->mBytesTransferred),
        mTimeoutFlag(mStatePtr->mTimeoutFlag),
        mError(mStatePtr->mError),
        mEventMutex(mStatePtr->mEventMutex),
        mEvent(mStatePtr->mEvent)
    {
    }

    TcpAsioClientTransport::TcpAsioClientTransport(const TcpAsioClientTransport &rhs) :
        mDemuxerPtr(rhs.mDemuxerPtr),
        mSynchronizedSocketPtr( new SynchronizedSocket(mDemuxerPtr, getSharedReadWriteMutexPtr())),
        mIp(rhs.mIp),
        mPort(rhs.mPort),
        mEndpoint(rhs.mEndpoint),
        mEndTimeMs(),
        mStatePtr(new State(*mDemuxerPtr)),
        mTimer(mStatePtr->mTimer),
        mBytesTransferred(mStatePtr->mBytesTransferred),
        mTimeoutFlag(mStatePtr->mTimeoutFlag),
        mError(mStatePtr->mError),
        mEventMutex(mStatePtr->mEventMutex),
        mEvent(mStatePtr->mEvent)
    {
    }

    TcpAsioClientTransport::~TcpAsioClientTransport()
    {
        close();
    }

    // this function exists because borland c++ doesn't seem to find the comparison operator
    // for asio::ipv4::tcp::endpoint
    bool isClear(const asio::ip::tcp::endpoint &endpoint)
    {
        asio::ip::tcp::endpoint endpoint0;
        return 
            endpoint.address() == endpoint0.address() && 
            endpoint.port() == endpoint0.port();
    }

    ClientTransportAutoPtr TcpAsioClientTransport::clone() const
    {
        if (mSynchronizedSocketPtr.get() && mIp == "" && isClear(mEndpoint))
        {
            mSynchronizedSocketPtr->getRemoteEndpoint(mEndpoint);
            mIp = mEndpoint.address().to_string();
            mPort = mEndpoint.port();
        }
        return ClientTransportAutoPtr( new TcpAsioClientTransport(*this) );
    }

    EndpointPtr TcpAsioClientTransport::getEndpointPtr() const
    {
        if (mSynchronizedSocketPtr.get() && mIp == "" && isClear(mEndpoint))
        {
            mSynchronizedSocketPtr->getRemoteEndpoint(mEndpoint);
            mIp = mEndpoint.address().to_string();
            mPort = mEndpoint.port();
        }
        return EndpointPtr( new TcpEndpoint(mIp, mPort) );
    }

    int TcpAsioClientTransport::send(const std::string &data, unsigned int timeoutMs)
    {
        mEndTimeMs = getCurrentTimeMs() + timeoutMs;

        // connect
        if (!isConnected())
        {
            int ret = connect();
            if (ret != 1)
            {
                return ret;
            }
        }

        // encode message length
        BOOST_STATIC_ASSERT(sizeof(unsigned int) == 4);
        mWriteBuffer.resize(4+data.length());
        *(unsigned int *) &mWriteBuffer[0] = static_cast<unsigned int>(data.length());
        RCF::machineToNetworkOrder(&mWriteBuffer[0], 4, 1);
        memcpy(&mWriteBuffer[4], data.c_str(), data.length());

        // send message
        write(&mWriteBuffer[0], mWriteBuffer.size());
        return errFromAsioErr();
    }

    int TcpAsioClientTransport::receive(std::string &data, unsigned int timeoutMs)
    {
        // read message length
        BOOST_STATIC_ASSERT(sizeof(unsigned int) == 4);
        mReadBuffer.resize(4);
        std::size_t bytesTransferred = read(&mReadBuffer[0], mReadBuffer.size());

        if (bytesTransferred == 4)
        {
            // read message
            networkToMachineOrder(&mReadBuffer[0], 4, 1);
            unsigned int length = * (unsigned int *) &mReadBuffer[0];
            if (length == 0 || length > getMaxMessageLength())
            {
                RCF_THROW(ClientTransportException, "bad message length")(length)(getMaxMessageLength());
            }
            mReadBuffer.resize(length);
            bytesTransferred = read(&mReadBuffer[0], mReadBuffer.size());
            if (bytesTransferred == length)
            {
                data.assign(&mReadBuffer[0], mReadBuffer.size());
                return 1;
            }
        }

        return errFromAsioErr();
    }

    bool TcpAsioClientTransport::isConnected()
    {
        return mSynchronizedSocketPtr->isConnected();
    }

    void TcpAsioClientTransport::close()
    {
        if (mSynchronizedSocketPtr)
        {
            mSynchronizedSocketPtr->close();
        }
    }
    
    void TcpAsioClientTransport::onFilteredReadWriteCompletion(std::size_t bytesTransferred, int error)
    {
        mBytesTransferred = bytesTransferred;
        mError = error;
    }

    void TcpAsioClientTransport::setTransportFilters(const std::vector<FilterPtr> &filters)
    {
        mTransportFilters.assign(filters.begin(), filters.end());

        RCF::connectFilters(
            mTransportFilters, 
            boost::bind(&TcpAsioClientTransport::readSingle, this, _1, _2),
            boost::bind(&TcpAsioClientTransport::writeSingle, this, _1, _2),
            boost::bind(&TcpAsioClientTransport::onFilteredReadWriteCompletion, this, _1, _2));

    }

    TcpAsioClientTransport::SynchronizedSocketPtr TcpAsioClientTransport::releaseSynchronizedSocketPtr()
    {
        SynchronizedSocketPtr synchronizedSocketPtr(mSynchronizedSocketPtr);
        mSynchronizedSocketPtr.reset();
        return synchronizedSocketPtr;
    }

    TcpAsioClientTransport::SynchronizedSocketPtr TcpAsioClientTransport::getSynchronizedSocketPtr()
    {
        return mSynchronizedSocketPtr;
    }

    int TcpAsioClientTransport::connect()
    {
        mSynchronizedSocketPtr->close();
        mSynchronizedSocketPtr->open();

        int fd = static_cast<int>(mSynchronizedSocketPtr->impl());
        unsigned long ul_addr = 0; // in network order
        int port = 0; // in host order
        
        if (isClear(mEndpoint))
        {
            hostent *hostDesc = ::gethostbyname( mIp.c_str() );
            if (hostDesc)
            {
                char *szIp = ::inet_ntoa( * (in_addr*) hostDesc->h_addr_list[0]);
                ul_addr = ::inet_addr(szIp);
                port = mPort;
            }
        }
        else
        {
            asio::ip::address address = mEndpoint.address();
            ul_addr = htonl(address.to_v4().to_ulong());
            port = mEndpoint.port();
        }
  
        sockaddr_in remoteAddr;
        memset(&remoteAddr, 0, sizeof(remoteAddr));
        remoteAddr.sin_family = AF_INET;
        remoteAddr.sin_addr.s_addr = ul_addr;
        //remoteAddr.sin_port = ::htons(port); // the :: seems to screw up gcc (!?!)
        remoteAddr.sin_port = htons(port); // the :: seems to screw up gcc (!?!)

        unsigned int timeoutMs = generateTimeoutMs(mEndTimeMs);
        int ret = timedConnect(timeoutMs, fd, (sockaddr*) &remoteAddr, sizeof(remoteAddr)); // TODO: error checking and reporting
        return ret == 0 ? 1 : -1;
    }

    bool waitUntilFdReadable(int fd, unsigned int timeoutMs)
    {
        fd_set fdSet;
        FD_ZERO(&fdSet);
        FD_SET(fd, &fdSet);
        timeval timeout;
        timeout.tv_sec = timeoutMs/1000;
        timeout.tv_usec = 1000*(timeoutMs%1000);
        int ret = ::select(fd+1, &fdSet, NULL, NULL, &timeout);
        return ret == 0 ? false : true; // return true for timeout (ret == 0), otherwise false
    }

    void TcpAsioClientTransport::readSingle(char *buffer, std::size_t bufferLen)
    {
        mError = asio::error();
        mBytesTransferred = 0;
        mTimeoutFlag = false;
        int fd = static_cast<int>(mSynchronizedSocketPtr->impl());
        unsigned int timeoutMs = generateTimeoutMs(mEndTimeMs);
        if (waitUntilFdReadable(fd, timeoutMs))
        {
            mError = asio::error();
            std::size_t ret = mSynchronizedSocketPtr->read(buffer, bufferLen, mError);
            if (ret > 0)
            {
                mTransportFilters.empty() ? 
                    onFilteredReadWriteCompletion(ret, 0) :
                    mTransportFilters.back()->onReadWriteCompleted(ret, 0);
                return;
            }
        }
        else
        {
            mTimeoutFlag = true;
        }

        mTransportFilters.empty() ? 
            onFilteredReadWriteCompletion(0, -1) :
            mTransportFilters.back()->onReadWriteCompleted(0, -1);
    }

    void TcpAsioClientTransport::writeSingle(const char *buffer, std::size_t bufferLen)
    {
        mError = asio::error();
        std::size_t bytesWritten = mSynchronizedSocketPtr->write(buffer, bufferLen, mError);
        if (mError)
        {
            mSynchronizedSocketPtr->setCloseFlag();
        }
        int err = mError ? -1 : 0;
        mTransportFilters.empty() ? 
            onFilteredReadWriteCompletion(bytesWritten, err) :
            mTransportFilters.back()->onReadWriteCompleted(bytesWritten, err);
    }

    std::size_t TcpAsioClientTransport::read(char *buffer, std::size_t bufferLen)
    {
        std::size_t offset = 0;
        while (!mError && !mTimeoutFlag && offset < bufferLen)
        {
            mTransportFilters.empty() ?
                readSingle(buffer+offset, bufferLen-offset) :
                mTransportFilters.front()->read(buffer+offset, bufferLen-offset);
                
            offset += (!mError) ? mBytesTransferred : 0;
        }
        RCF_ASSERT(offset <= bufferLen);
        return offset;
    }

    std::size_t TcpAsioClientTransport::write(const char *buffer, std::size_t bufferLen)
    {
        std::size_t offset = 0;
        while (!mError && !mTimeoutFlag && offset < bufferLen)
        {
            mTransportFilters.empty() ?
                writeSingle(buffer+offset, bufferLen-offset) :
                mTransportFilters.front()->write(buffer+offset, bufferLen-offset);
            offset += (!mError) ? mBytesTransferred : 0;
        }
        RCF_ASSERT(offset <= bufferLen);
        return offset;
    }

    int TcpAsioClientTransport::errFromAsioErr()
    {
        if (mTimeoutFlag)
        {
            return -2;
        }
        else if (mError && mError.code() == asio::error::connection_reset)
        {
            return 0;
        }
        else if (mError)
        {
            RCF_TRACE("asio error: ")(mError.code());
            return -1;
        }
        else
        {
            return 1;
        }
    }
    void TcpAsioClientTransport::init()
    {
        deinit();
        spReadWriteMutexPtr = new ReadWriteMutexPtr( new ReadWriteMutex(WriterPriority));
        spDemuxerPtr = new DemuxerPtr( new Demuxer );
    }

    void TcpAsioClientTransport::deinit()
    {
        delete spReadWriteMutexPtr;
        spReadWriteMutexPtr = NULL;

        delete spDemuxerPtr;
        spDemuxerPtr = NULL;
    }

    ReadWriteMutexPtr TcpAsioClientTransport::getSharedReadWriteMutexPtr()
    {
        return *spReadWriteMutexPtr;
    }

    DemuxerPtr TcpAsioClientTransport::getSharedDemuxerPtr()
    {
        return *spDemuxerPtr;
    }

    DemuxerPtr *            TcpAsioClientTransport::spDemuxerPtr;
    ReadWriteMutexPtr *     TcpAsioClientTransport::spReadWriteMutexPtr;

    RCF_ON_INIT_DEINIT(TcpAsioClientTransport::init(), TcpAsioClientTransport::deinit())

} // namespace RCF

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Software developer, from Sweden and now living in Canberra, Australia, working on distributed C++ applications. When he is not programming, Jarl enjoys skiing and playing table tennis. He derives immense satisfaction from referring to himself in third person.

Comments and Discussions