There is a test code for main, It's pretty simple.
void AddData(std::shared_ptr<std::queue<std::string>> stores)
{
stores->push("A\n");
stores->push("B\n");
stores->push("C\n");
stores->push("D\n");
stores->push("E\n");
}
int main()
{
std::shared_ptr<std::queue<std::string>> stores = std::make_shared<std::queue<std::string>>();
AddData(stores);
boost::asio::io_context io_context;
ServerTools::Server srv(io_context, 15004);
std::shared_ptr<ServerTools::session> sess = srv.accept_one();
sess->SetStoreForServer(stores);
sess->StartAsyncRead();
sess->StartAsyncSend();
io_context.run();
}
There is a function for asynchronous reading and asynchronous writing.
void ServerTools::session::StartAsyncSend()
{
if (!p_messageStore->empty())
{
std::string data = p_messageStore->front();
boost::asio::async_write(socket, boost::asio::buffer(data),
[this](boost::system::error_code ec, std::size_t bytes_transferred)
{
if (!ec)
{
std::cout <<"Sending " << bytes_transferred << " bytes" << '\n';
this->p_messageStore->pop();
}
else
{
std::cerr << "Mistake answer: " << ec.message() << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(3));
StartAsyncSend();
});
}
}
void ServerTools::session::StartAsyncRead()
{
std::vector<char> buffer(100);
boost::asio::async_read(socket, boost::asio::buffer(buffer), [this](const boost::system::error_code& ec, std::size_t size_transferred)
{
if (!ec)
{
std::cout << "Received " << size_transferred << "bytes" << std::endl;
}
StartAsyncRead();
});
}
If you run asynchronous reading only, then messages are read without problems and without delays, if you run asynchronous writing, then there are no problems either. But if you run both, then the socket write operation will be involved most of the time. And reading messages will occur much less frequently. It is clear that the code is usually organized differently in applications, but what is the reason for this behavior of the program? If there are ways to fix it, or is the only thing that can be done correctly to organize the sequence of asynchronous reads and writes?
That code looks almost normal. Except there's bugs because
async_readsleep_for inside a completion handler. This is probably what prevents the execution context from making any progress when you are writing messagesAssuming you
std::deque)this pattern is 100% fine and should work as you expect. The only important things are:
Q. And reading messages will occur much less frequently
That completely and only depends on the traffic patterns of your application
I'll try to make a fixed complete demo here for you to check notes.
Live On Coliru
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
namespace ServerTools {
using namespace std::chrono_literals;
using asio::ip::tcp;
using boost::system::error_code;
using Message = std::string;
using MessageQueue = std::deque<Message>;
class Session : public std::enable_shared_from_this<Session> {
public:
Session(tcp::socket socket) : socket_(std::move(socket)) {}
void Start(std::shared_ptr<MessageQueue> store) {
SetStoreForServer(std::move(store));
StartAsyncRead();
StartAsyncSend();
}
private:
void StartAsyncSend();
void StartAsyncRead();
void SetStoreForServer(std::shared_ptr<MessageQueue> store) { outgoing_ = std::move(store); }
tcp::socket socket_;
asio::steady_timer timer_{socket_.get_executor()};
Message incoming_;
std::shared_ptr<MessageQueue> outgoing_;
};
class Server {
public:
Server(asio::any_io_executor ex, uint16_t port) : acceptor(ex, {{}, port}) {}
std::shared_ptr<Session> accept_one() { return std::make_shared<Session>(acceptor.accept()); }
private:
tcp::acceptor acceptor;
};
void Session::StartAsyncSend() {
if (!outgoing_->empty()) {
Message const& data = outgoing_->front(); // no copy
async_write(
socket_, asio::buffer(data), [this, self = shared_from_this()](error_code ec, size_t xfr) {
if (!ec) {
std::cout << "Sending " << xfr << " bytes" << std::endl;
outgoing_->pop_front();
timer_.expires_after(3s);
timer_.async_wait([this, self = shared_from_this()](error_code ec) {
if (!ec)
StartAsyncSend();
});
} else {
std::cerr << "Mistake answer: " << ec.message() << std::endl;
}
});
}
}
void Session::StartAsyncRead() {
async_read_until( //
socket_, asio::dynamic_buffer(incoming_), '\n',
[this, self = shared_from_this()](error_code ec, size_t xfr) {
std::cout << "Received " << xfr << " bytes (" << ec.message() << ")" << std::endl;
if (xfr)
std::cout << "Message: " << quoted(std::string_view(incoming_.data(), xfr - 1))
<< std::endl;
if (!ec) {
incoming_.erase(0, xfr);
StartAsyncRead();
}
});
}
void AddData(std::shared_ptr<MessageQueue> stores) {
stores->push_back("A\n");
stores->push_back("B\n");
stores->push_back("C\n");
stores->push_back("D\n");
stores->push_back("E\n");
}
} // namespace ServerTools
int main() {
auto store = std::make_shared<ServerTools::MessageQueue>();
ServerTools::AddData(store);
asio::io_context io_context;
ServerTools::Server srv{io_context.get_executor(), 15004};
srv.accept_one()->Start(store);
io_context.run();
}
With a live demo:

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With