首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何通过boost asio实现真正的异步客户端

如何通过boost asio实现真正的异步客户端
EN

Stack Overflow用户
提问于 2015-11-02 14:41:18
回答 1查看 498关注 0票数 0

我需要编写一个动态库,它应该导出三个函数:

代码语言:javascript
复制
bool init_sender(const char* ip_addr, int port);
void cleanup_sender();
void send_command(const char* cmd, int len);

init_sender应该同步连接到服务器,并根据是否成功返回true / false

cleanup_sender应该等待所有命令完成,然后返回。

send_command应该异步地向服务器发送指定的命令,并尽可能快地返回。

因此,我编写了以下代码:

代码语言:javascript
复制
boost::asio::io_service                         g_io_service;
std::unique_ptr<boost::asio::io_service::work>  g_work;
boost::asio::ip::tcp::socket                    g_sock(g_io_service);
boost::thread                                   g_io_service_th;

void io_service_processor()
{
  g_io_service.run();
}

bool __stdcall init_sender(const char* ip_addr, int port)
{
  try
  {
    g_work = std::make_unique<boost::asio::io_service::work>(g_io_service);
    boost::asio::ip::tcp::resolver resolver(g_io_service);
    boost::asio::connect(g_sock, resolver.resolve({ ip_addr, std::to_string(port) }));
    g_io_service_th = boost::thread(io_service_processor);
    return true;
  }
  catch (const std::exception& ex)
  {
    return false;
  }
}

void __stdcall cleanup_sender()
{
  g_work.reset();
  if (g_io_service_th.joinable())
  {
    g_io_service_th.join();
  }
}

void async_write_cb(
  const boost::system::error_code& error,
  std::size_t bytes_transferred)
{
  // TODO: implement
}

void __stdcall send_command(const char* cmd, int len)
{
  boost::asio::async_write(g_sock, boost::asio::buffer(cmd, len), async_write_cb);
}

据我从boost asio文档中了解,通过async_write函数调用发布的所有命令都将从一个线程(在我的示例中包含run函数调用-- g_io_service_th )执行。我说的对吗?如果是这样的话,对我来说,这似乎并不是完全异步的。我能做些什么来改变这种行为,同时从多个线程发送多个命令呢?我应该像这样创建boost::thread_group

代码语言:javascript
复制
for (int i = 0; i < pool_size; ++i)
{
  _thread_group.create_thread(boost::bind(&boost::asio::io_service::run, &_io_service));                             
}

或者还有别的办法吗?

EN

Stack Overflow用户

回答已采纳

发布于 2015-11-02 14:56:32

你在问一些问题,还有很多东西要学。可能最重要的理解是如何使用工作对象。

编辑:引用async_write限制:write/overload1.html

引用文件的话:

此操作是以对流的async_write_some函数的零调用或多次调用来实现的,称为组合操作。程序必须确保在此操作完成之前,流不执行任何其他写操作(例如async_write、流的async_write_some函数或执行写入的任何其他组合操作)。

asio线程代码应该如下所示:

代码语言:javascript
复制
#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <thread>


struct service_loop
{
    using io_service = boost::asio::io_service;

    io_service& get_io_service() {
        return _io_service;
    }

    service_loop(size_t threads = 1)
    : _strand(_io_service)
    , _work(_io_service)
    , _socket(_io_service)
    {
        for(size_t i = 0 ; i < threads ; ++i)
            add_thread();
    }

    ~service_loop() {
        stop();
    }

    // adding buffered sequential writes...

    void write(const char* data, size_t length)
    {
        _strand.dispatch([this, v = std::vector<char>(data, data + length)] {
            _write_buffer.insert(std::end(_write_buffer), v.begin(), v.end());
            check_write();
        });

    }
private:
    std::vector<char> _write_buffer;
    bool _writing;

    void check_write()
    {
        if (!_writing and !_write_buffer.empty()) {
            auto pv = std::make_shared<std::vector<char>>(std::move(_write_buffer));
            _writing = true;
            _write_buffer.clear();
            boost::asio::async_write(_socket,
                                     boost::asio::buffer(*pv),
                                     [this, pv] (const boost::system::error_code& ec, size_t written) {
                                         _strand.dispatch(std::bind(&service_loop::handle_write,
                                                                    this,
                                                                    ec,
                                                                    written));
                                     });
        }
    }

    void handle_write(const boost::system::error_code& ec, size_t written)
    {
        _writing = false;
        if (ec) {
            // handle error somehow
        }
        else {
            check_write();
        }
    }

private:
    io_service _io_service;
    io_service::strand _strand;
    io_service::work _work;
    std::vector<std::thread> _threads;
    boost::asio::ip::tcp::socket _socket;

    void add_thread()
    {
        _threads.emplace_back(std::bind(&service_loop::run_thread, this));
    }

    void stop()
    {
        _io_service.stop();
        for(auto& t : _threads) {
            if(t.joinable()) t.join();
        }
    }

    void run_thread()
    {
        while(!_io_service.stopped())
        {
            try {
                _io_service.run();
            }
            catch(const std::exception& e) {
                // report exceptions here
            }
        }
    }
};


using namespace std;

auto main() -> int
{
    service_loop sl;
    sl.write("hello", 5);
    sl.write(" world", 6);
    std::this_thread::sleep_for(std::chrono::seconds(10));

    return 0;
}
票数 1
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33480163

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档