Click here to Skip to main content
15,886,873 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
I wrote a very simple chat server using boost::asio. Everything works as it should. As I just redid the TCPSession class to use "strand", there was an error in the functionality of the class: The server accepts a connection and immediately drops it, because I get the error "Bad file descriptor" in the function "async_read". Can't understand why.

The problem is in class TCPSession as class TCPServer and function main remain unchanged for both cases.

Can anybody help me to resolve this problem?
Thank you in advance for any help or indication of where I made an error.

What I have tried:

#include <boost/asio.hpp>
#include <queue>
#include <unordered_set>
#include <iostream>

namespace io = boost::asio;
using tcp = io::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::placeholders;

using message_handler = std::function<void (std::string)>;
using error_handler = std::function<void ()>;

// this class works as I want: the client connects to server (I get the message "One client came in") 
// and he can send messages until he close the connection (then I get the message "One client went out")
class TCPSession : public std::enable_shared_from_this<TCPSession>
{
public:
    TCPSession(tcp::socket&& socket)
    : socket_(std::move(socket))
    {
    }

    void start(message_handler&& on_message, error_handler&& on_error)
    {
        this->on_message_ = std::move(on_message);
        this->on_error_ = std::move(on_error);
        async_read();
    }

    void post(std::string const& message)
    {
        bool idle = outgoing_.empty();
        outgoing_.push(message);

        if(idle)
        {
            async_write();
        }
    }

private:

    void async_read()
    {
        io::async_read_until(socket_, streambuf_, "\n", std::bind(&TCPSession::on_read, shared_from_this(), _1, _2));
    }

    void on_read(error_code error, std::size_t bytes_transferred)
    {
        if(!error)
        {
            std::stringstream message;
            message << socket_.remote_endpoint(error) << ": " << std::istream(&streambuf_).rdbuf();

            std::string strr = message.str();
            std::cout << "Received message from client: " << strr << std::endl;

            streambuf_.consume(bytes_transferred);
            on_message_(message.str());
            async_read();
        }
        else
        {
            socket_.close(error);
            on_error_();
        }
    }

    void async_write()
    {
        io::async_write(socket_, io::buffer(outgoing_.front()), std::bind(&TCPSession::on_write, shared_from_this(), _1, _2));
    }

    void on_write(error_code error, std::size_t bytes_transferred)
    {
        if(!error)
        {
            outgoing_.pop();

            if(!outgoing_.empty())
            {
                async_write();
            }
        }
        else
        {
            socket_.close(error);
            on_error_();
        }
    }

    tcp::socket socket_;
    io::streambuf streambuf_;
    std::queue<std::string> outgoing_;
    message_handler on_message_;
    error_handler on_error_;
};

// this class works not the way I want: the client connects and disconnects straightaway 
// (I get two messages "One client came in" and "One client went out")
class TCPSession : public std::enable_shared_from_this<TCPSession>
{
public:
    TCPSession(io::io_context& io_context)
    : socket_(io_context)
    , read  (io_context)
    , write (io_context)
    {
    }

    void start(message_handler&& on_message, error_handler&& on_error)
    {
        this->on_message_ = std::move(on_message);
        this->on_error_ = std::move(on_error);
        async_read();
    }

    void post(std::string const& message)
    {
        bool idle = outgoing_.empty();
        outgoing_.push(message);

        if(idle)
        {
            async_write();
        }
    }

private:

    void async_read()
    {
        // here I get the error "Bad file descriptor". But can't understand why
        io::async_read(socket_, streambuf_, io::bind_executor(read, 
            [&](error_code error, std::size_t bytes_transferred)
            {
                if (!error)
                {
                    std::istream is(&streambuf_);
                    std::string message(std::istreambuf_iterator<char>(is), {});

                    std::cout << "Received message from client: " << message << std::endl;
                    on_message_(message);  // Process the received message

                    streambuf_.consume(bytes_transferred);
                    async_read();
                }
                else
                {
                    socket_.close(error);
                    on_error_();
                }
            }));
    }

    void async_write()
    {
        auto self = shared_from_this();  // Keep session object alive until write completed
        io::async_write(socket_, io::buffer(outgoing_.front()), io::bind_executor(write, 
            [self, this](error_code error, std::size_t /*bytes_transferred*/)
            {
                if(!error)
                {
                    outgoing_.pop();

                    if(!outgoing_.empty())
                    {
                        async_write();
                    }
                }
                else
                {
                    socket_.close(error);
                    on_error_();
                }
            }));
    }

    tcp::socket socket_;
    io::streambuf streambuf_;
    std::queue<std::string> outgoing_;
    message_handler on_message_;
    error_handler on_error_;
    io::io_context::strand read;
    io::io_context::strand write;
};


class TCPServer
{
public:

    TCPServer(io::io_context& io_context, std::uint16_t port)
    : io_context_(io_context)
    , acceptor_  (io_context, tcp::endpoint(tcp::v4(), port))
    {
    }

    void async_accept()
    {
        socket_.emplace(io_context_);

        acceptor_.async_accept(*socket_, [&] (error_code error)
        {
            auto client = std::make_shared<TCPSession>(io_context_);

            clients_.insert(client);
            std::cout << "One client came in\n";

            client->start( std::bind(&TCPServer::post, this, _1),
                            [&, weak = std::weak_ptr(client)]
                            {
                                if(auto shared = weak.lock(); shared && clients_.erase(shared))
                                {
                                    std::cout << "One client went out\n";
                                }
                            }
                        );

            async_accept();
        });
    }

    void post(std::string const& message)
    {
        for(auto& client : clients_)
        {
            client->post(message);
        }
    }

private:

    io::io_context& io_context_;
    tcp::acceptor acceptor_;
    std::optional<tcp::socket> socket_;
    std::unordered_set<std::shared_ptr<TCPSession>> clients_;
};

int main()
{
    io::io_context io_context;
    TCPServer srv(io_context, 15001);
    srv.async_accept();
    io_context.run();
    return 0;
}
Posted
Updated 7-Nov-23 1:25am
v2

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900