首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用boost to客户端实现发送心跳消息

如何使用boost to客户端实现发送心跳消息
EN

Stack Overflow用户
提问于 2021-11-30 19:27:05
回答 1查看 857关注 0票数 1

我需要实现定期向第三方服务器发送心跳消息。服务器使用这些心跳消息来确定客户机是否仍然连接到它。当接收到心跳消息时,服务器不会响应它们,而只是简单地注意到它们。如果服务器在一定时间内(20秒)内没有接收到下一个心跳消息,那么它将终止连接。

这里也提出了类似的话题。塞赫给出了一个示例,说明如何以异步模式从boost tcp服务器向客户端发送心跳消息。这个例子很好用。

然而,当我尝试使用websocket (boost猛兽)实现这样的东西时,我几乎立即得到了错误:

代码语言:javascript
运行
复制
// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);

下面是一个示例代码来说明这个问题。

服务器(取自这里中的示例,稍加更改):

代码语言:javascript
运行
复制
namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

//------------------------------------------------------------------------------

// Report a failure
void
fail(beast::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session>
{
    websocket::stream<beast::tcp_stream> ws_;
    beast::flat_buffer buffer_;

public:
    // Take ownership of the socket
    explicit
    session(tcp::socket&& socket)
            : ws_(std::move(socket))
    {
    }

    // Get on the correct executor
    void
    run()
    {
        // We need to be executing within a strand to perform async operations
        // on the I/O objects in this session. Although not strictly necessary
        // for single-threaded contexts, this example code is written to be
        // thread-safe by default.
        net::dispatch(ws_.get_executor(),
                      beast::bind_front_handler(
                              &session::on_run,
                              shared_from_this()));
    }

    // Start the asynchronous operation
    void
    on_run()
    {
        // Set suggested timeout settings for the websocket
        ws_.set_option(
                websocket::stream_base::timeout::suggested(
                        beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_.set_option(websocket::stream_base::decorator(
                [](websocket::response_type& res)
                {
                    res.set(http::field::server,
                            std::string(BOOST_BEAST_VERSION_STRING) +
                            " websocket-server-async");
                }));
        // Accept the websocket handshake
        ws_.async_accept(
                beast::bind_front_handler(
                        &session::on_accept,
                        shared_from_this()));
    }

    void
    on_accept(beast::error_code ec)
    {
        if(ec)
            return fail(ec, "accept");

        // Read a message
        do_read();
    }

    void
    do_read()
    {
        // Read a message into our buffer
        ws_.async_read(
                buffer_,
                beast::bind_front_handler(
                        &session::on_read,
                        shared_from_this()));
    }

    void
    on_read(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        // This indicates that the session was closed
        if(ec == websocket::error::closed)
            return;

        if(ec)
            fail(ec, "read");

        std::string message = boost::beast::buffers_to_string(buffer_.data());
        if (message != std::string("ZZZ"))
        {
            // Echo the message
            ws_.text(ws_.got_text());
            ws_.async_write(
                    buffer_.data(),
                    beast::bind_front_handler(
                            &session::on_write,
                            shared_from_this()));
        }

    }

    void
    on_write(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "write");

        // Clear the buffer
        buffer_.consume(buffer_.size());

        // Do another read
        do_read();
    }
};

//------------------------------------------------------------------------------

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
    net::io_context& ioc_;
    tcp::acceptor acceptor_;

public:
    listener(
            net::io_context& ioc,
            tcp::endpoint endpoint)
            : ioc_(ioc)
            , acceptor_(ioc)
    {
        beast::error_code ec;

        // Open the acceptor
        acceptor_.open(endpoint.protocol(), ec);
        if(ec)
        {
            fail(ec, "open");
            return;
        }

        // Allow address reuse
        acceptor_.set_option(net::socket_base::reuse_address(true), ec);
        if(ec)
        {
            fail(ec, "set_option");
            return;
        }

        // Bind to the server address
        acceptor_.bind(endpoint, ec);
        if(ec)
        {
            fail(ec, "bind");
            return;
        }

        // Start listening for connections
        acceptor_.listen(
                net::socket_base::max_listen_connections, ec);
        if(ec)
        {
            fail(ec, "listen");
            return;
        }
    }

    // Start accepting incoming connections
    void
    run()
    {
        do_accept();
    }

private:
    void
    do_accept()
    {
        // The new connection gets its own strand
        acceptor_.async_accept(
                net::make_strand(ioc_),
                beast::bind_front_handler(
                        &listener::on_accept,
                        shared_from_this()));
    }

    void
    on_accept(beast::error_code ec, tcp::socket socket)
    {
        if(ec)
        {
            fail(ec, "accept");
        }
        else
        {
            // Create the session and run it
            std::make_shared<session>(std::move(socket))->run();
        }

        // Accept another connection
        do_accept();
    }
};

//------------------------------------------------------------------------------

int main(int argc, char* argv[])
{
    // Check command line arguments.
    if (argc != 4)
    {
        std::cerr <<
                  "Usage: websocket-server-async <address> <port> <threads>\n" <<
                  "Example:\n" <<
                  "    websocket-server-async 0.0.0.0 8080 1\n";
        return EXIT_FAILURE;
    }
    auto const address = net::ip::make_address(argv[1]);
    auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
    auto const threads = std::max<int>(1, std::atoi(argv[3]));

    // The io_context is required for all I/O
    net::io_context ioc{threads};

    // Create and launch a listening port
    std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();

    // Run the I/O service on the requested number of threads
    std::vector<std::thread> v;
    v.reserve(threads - 1);
    for(auto i = threads - 1; i > 0; --i)
        v.emplace_back(
                [&ioc]
                {
                    ioc.run();
                });
    ioc.run();

    return EXIT_SUCCESS;
}

异步boost猛兽客户端(以来自这里的一个示例为基础):

代码语言:javascript
运行
复制
namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

//------------------------------------------------------------------------------

// Report a failure
void
fail(beast::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
    boost::asio::ip::tcp::socket socket_;
    tcp::resolver resolver_;
    websocket::stream<beast::tcp_stream> ws_;
    beast::flat_buffer buffer_;
    std::string host_;
    std::string text_;
    const std::string hbmsg = "ZZZ";
    boost::asio::high_resolution_timer hbtimer { socket_.get_executor() };

public:
    // Resolver and socket require an io_context

    explicit
    session(net::io_context& ioc)
            : socket_ (net::make_strand(ioc)),
              resolver_(net::make_strand(ioc)),
              ws_(net::make_strand(ioc))
    {
    }

    //explicit
    //session(boost::asio::ip::tcp::socket&& s) : socket_(std::move(s)), ws_(std::move(socket_)) {}

    // Start the asynchronous operation
    void
    run(
            char const* host,
            char const* port,
            char const* text)
    {
        // Save these for later
        host_ = host;
        text_ = text;

        // Look up the domain name

        resolver_.async_resolve(
                host,
                port,
                beast::bind_front_handler(
                        &session::on_resolve,
                        shared_from_this()));
    }

    void
    on_resolve(
            beast::error_code ec,
            tcp::resolver::results_type results)
    {
        if(ec)
            return fail(ec, "resolve");

        // Set the timeout for the operation
        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

        // Make the connection on the IP address we get from a lookup
        beast::get_lowest_layer(ws_).async_connect(
                results,
                beast::bind_front_handler(
                        &session::on_connect,
                        shared_from_this()));
    }

    void
    on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type)
    {
        if(ec)
            return fail(ec, "connect");

        // Turn off the timeout on the tcp_stream, because
        // the websocket stream has its own timeout system.
        beast::get_lowest_layer(ws_).expires_never();

        // Set suggested timeout settings for the websocket
        ws_.set_option(
                websocket::stream_base::timeout::suggested(
                        beast::role_type::client));

        // Set a decorator to change the User-Agent of the handshake
        ws_.set_option(websocket::stream_base::decorator(
                [](websocket::request_type& req)
                {
                    req.set(http::field::user_agent,
                            std::string(BOOST_BEAST_VERSION_STRING) +
                            " websocket-client-async");
                }));

        // Perform the websocket handshake
        ws_.async_handshake(host_, "/",
                            beast::bind_front_handler(
                                    &session::on_handshake,
                                    shared_from_this()));
    }

    void
    on_handshake(beast::error_code ec)
    {
        if(ec)
            return fail(ec, "handshake");

        // Send the message
        /*
        ws_.async_write(
                net::buffer(text_),
                beast::bind_front_handler(
                        &session::on_write,
                        shared_from_this()));
        */
        hb_wait();
        req_loop();

    }

    void hb_wait(boost::beast::error_code ec = {}) {
        if(ec)
            return fail(ec, "hb_wait");

        hbtimer.expires_from_now(std::chrono::milliseconds(1000));
        hbtimer.async_wait([this](boost::system::error_code ec) { hb_send(ec); });
    }

    void hb_send(boost::beast::error_code ec) {
        if(ec)
            return fail(ec, "hb_send");

        ws_.async_write(boost::asio::buffer(hbmsg), [this](boost::system::error_code ec, size_t) { hb_wait(ec); });
    }

    void req_loop(boost::beast::error_code ec = {}, std::size_t bytes_transferred = 0) {
        if(ec)
            return fail(ec, "req_loop");

        ws_.async_write(
                net::buffer(text_),
                beast::bind_front_handler(
                        &session::on_write,
                        shared_from_this()));
    }


    void
    on_write(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "write");

        // Read a message into our buffer
        ws_.async_read(
                buffer_,
                beast::bind_front_handler(
                        &session::on_read,
                        shared_from_this()));
    }

    void
    on_read(
            beast::error_code ec,
            std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
            return fail(ec, "read");

        std::string message = boost::beast::buffers_to_string(buffer_.data());
        std::cout << beast::make_printable(buffer_.data()) << std::endl;
        buffer_.consume(buffer_.size());

        ws_.async_write(
                boost::asio::buffer(message),
                beast::bind_front_handler(
                        &session::req_loop,
                        shared_from_this()));

        //ws_.async_write(boost::asio::buffer(message), [this](boost::beast::error_code ec, size_t) { req_loop(ec); });



    }

};

//------------------------------------------------------------------------------

int main(int argc, char** argv)
{
    // Check command line arguments.
    if(argc != 4)
    {
        std::cerr <<
                  "Usage: websocket-client-async <host> <port> <text>\n" <<
                  "Example:\n" <<
                  "    websocket-client-async echo.websocket.org 80 \"Hello, world!\"\n";
        return EXIT_FAILURE;
    }
    auto const host = argv[1];
    auto const port = argv[2];
    auto const text = argv[3];

    // The io_context is required for all I/O
    net::io_context ioc;
    boost::asio::ip::tcp::socket sock(ioc);
    // Launch the asynchronous operation
    std::make_shared<session>(ioc)->run(host, port, text);

    // Run the I/O service. The call will return when
    // the socket is closed.
    ioc.run();

    return EXIT_SUCCESS;
}

如何解决异步模式下在boost野兽中发送心跳消息的问题?

原则上这是可能的吗?对不起我的英语不好。

EN

回答 1

Stack Overflow用户

发布于 2022-04-18 08:00:51

boost可以做这样的工作,我现在也在用boost编写心跳网络套接字。下面是我的一些实现细节和其他问题,这些问题已经在本期中解决了。

通常,我遇到类似的错误,断言信息说:

您正在尝试同时发出两个相同的异步I/O操作,而无需等待第一个操作完成。例如,尝试同时调用两个async_read_some。只允许每个I/O类型的挂起调用(读和写)。

这意味着您可以有意或无意地连续调用同一个async_xxxx。

下面是boost线程异步操作文档,上面写着:

与常规的Boost.Asio套接字一样,流并不是线程安全的。根据Asio文档,调用者负责使用隐式或显式字符串同步套接字上的操作()。websocket流异步接口支持下列操作之一同时处于活动状态:

  • async_readasync_read_some
  • async_writeasync_write_some
  • async_pingasync_pong
  • async_close

以下代码会产生未定义的行为,因为程序试图同时执行两次读取:

代码语言:javascript
运行
复制
ws.async_read(b, [](error_code, std::size_t){});
ws.async_read(b, [](error_code, std::size_t){});

例如,您可以在短时间内进行两次async_connect编辑,然后这些async_connect处理程序在遵循代码流的同时调用自己的async_writeasync_read

希望这能有所帮助。如果在解释中有什么问题或错误,请指出。谢谢。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70174970

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档