前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >C++ 异步编程之协程代码实践

C++ 异步编程之协程代码实践

作者头像
巫山老妖
发布2024-08-12 15:40:05
1070
发布2024-08-12 15:40:05
举报
文章被收录于专栏:小巫技术博客

引言

异步编程是实际开发当中不可或缺的一部分,尤其是在处理 I/O 操作、网络请求、用户界面响应等需要高并发场景时。进程和线程我们做研发的可能了解的比较多,虽然协程的概念很早就出现了,但语言层面上支持相对比较晚,直到C++ 20才正式被引入。本文分享一下笔者在工程上使用协程的一些实践和思考总结。

进程 vs 线程 vs 协程

用一个表格对比下进程、线程和协程之间的区别:

特征

进程

线程

协程

定义

独立的执行环境,拥有自己的地址空间。

进程内的执行单元,共享进程的资源。

轻量级的“线程”,不由操作系统直接管理,而是由应用程序控制。

资源消耗

高,每个进程都有独立的内存等资源。

较低,线程之间共享内存和资源。

最低,协程共享线程资源,切换开销小。

创建和销毁成本

高,涉及到操作系统的资源分配和回收。

中等,比进程轻量,但仍需操作系统管理。

非常低,由程序语言或框架层面控制。

控制复杂度

高,需要操作系统参与调度和同步。

中等,线程之间的同步和通信需要细致处理。

低,通常在一个线程内,同步和通信更简单。

并发性能

中等,进程间通信(IPC)开销较大。

高,线程之间切换和通信相对高效。

高,协程切换开销非常小,适合高并发场景。

应用场景

适合需要独立资源和保护的应用。

适合需要并行处理和资源共享的应用。

适合IO密集型和高并发的应用。

每种技术都有其适用的场景:

  • 进程:适合于需要独立运行和资源隔离的大型应用程序,如服务器的不同服务组件。
  • 线程:适合于需要并行处理任务并共享内存资源的场景,如多核处理器上的并行计算。
  • 协程:特别适合处理高并发的I/O密集型任务,如现代Web服务器和网络应用。

笔者主要是从事应用开发,进程一般情况下用的比较少,只有在需要实现跨进程通信的时候才会涉及到。线程就用得比较多,通常会使用线程池来管理,进而减少创建和销毁带来的开销。协程因为非常轻量,日常业务开发当中,比如发起网络请求、I/O操作和简单的异步操作,可以用同步的方式写异步代码,也能更便捷的控制协程的生命周期,不受系统管理,能给研发带来更多的灵活性。

Boost.Asio 异步模型

Boost.Asio 简介

Boost.Asio是一个用于C++的跨平台库,它提供了一组用于处理异步输入/输出(I/O)的工具和组件。它是Boost库的一部分,一个非常流行的C++库集合,旨在提供可移植且高质量的通用组件。 Boost.Asio主要用于网络和低级硬件交互,支持TCP、UDP、串行端口等协议。它不仅限于网络编程,也可以用于构建任何需要异步I/O操作的应用程序,比如文件处理、定时器等。异步I/O是指启动一个I/O操作后,不需要等待其完成即可继续执行其他任务。这对于需要高性能和响应性能的应用程序非常有用,因为它可以帮助你有效地使用系统资源,防止应用程序在等待I/O操作完成时空闲。 Boost.Asio提供了一个强大的异步模型,通过使用回调函数、绑定器和协程等技术,使得编写异步代码更加直观和简洁。此外,它也有同步操作的支持,使得用户可以根据需要选择最适合自己的编程风格。

图引自:https://think-async.com/Asio/

因为C++在语言层面需要将编译器升级至C++20才支持协程,包括关键字co_awaitco_returnco_yield. 我们的项目工程使用了Boost.Asio库可以在不支持C++20的环境中也可以使用协程,相比之下它提供了向后的兼容性。C++20 协程提供了一种更为现代和符合直觉的方式来处理异步代码,允许开发者以类似同步代码的方式编写异步逻辑,这极大简化了代码的复杂性。

Post vs CoSpawn

在 Boost.Asio 中,postco_spawn 是两种常用的处理异步操作的方法。post 用于将任务异步地提交到执行器(如 io_context)上执行,而 co_spawn 则是用于启动协程,使得异步代码的写法更接近同步代码的风格。

示例代码:

代码语言:javascript
复制
#include <boost/asio.hpp>
#include <boost/asio/experimental/co_spawn.hpp>
#include <boost/asio/experimental/detached.hpp>
#include <iostream>
#include <chrono>

namespace asio = boost::asio;
using namespace std::chrono_literals;

asio::awaitable<void> async_print(const std::string& message) {
    co_await asio::this_coro::executor.sleep_for(1s);
    std::cout << message << std::endl;
}

int main() {
    asio::io_context io_context;

    // 使用 post 提交一个简单的任务
    asio::post(io_context, []() {
        std::cout << "Hello from post!\n";
    });

    // 使用 co_spawn 启动一个协程
    asio::experimental::co_spawn(io_context, async_print("Hello from coroutine!"), asio::experimental::detached);

    // 运行 io_context 直到所有作业完成
    io_context.run();

    return 0;
}

在这个例子中:

  • async_print 是一个协程函数,它等待 1 秒钟然后打印一条消息。这个函数返回 asio::awaitable<void>,表明它是一个异步协程。
  • post 函数用于提交一个 lambda 函数到 io_context。此 lambda 函数直接打印一条消息。
  • co_spawn 函数用于在 io_context 的执行器上启动 async_print 协程。第三个参数 asio::experimental::detached 表示协程的完成是“分离”的,即不需要等待协程完成。

协程的一些代码实践

针对Boost.Asio协程实现的封装

以下的一些代码有针对Boost.Asio库中关于协程相关的封装,比如:

简化命名空间声明和变量定义

代码语言:javascript
复制
namespace asio = boost::asio;
using error_code = boost::system::error_code;

template <typename T>
using awaitable = boost::asio::awaitable<T>;
constexpr cross::comm::StrictDetachedType detached;        // default use strict detached, instead of asio::detached
constexpr cross::comm::TolerantDetachedType tol_detached;  // tolerant, like asio::detached, but with exception logging
using boost::asio::use_awaitable;
using boost::asio::experimental::awaitable_operators::operator&&;
using boost::asio::experimental::awaitable_operators::operator||;
using await_token_t = asio::as_tuple_t<asio::use_awaitable_t<>>;
constexpr await_token_t await_token;
  • 简化代码中对Boost.Asio和错误码的引用
  • 模版别名定义,简化boost::asio::awaitable的协程返回类型声明
  • 引入user_awaitable以及逻辑与和逻辑或操作符,允许在协程中组合多个异步操作
  • 定义便于协程支持的异步操作,返回元组的结果的await_token

async_signal.h

代码语言:javascript
复制
#ifndef CROSS_COMM_ASYNC_SIGNAL_H
#define CROSS_COMM_ASYNC_SIGNAL_H

#include <optional>

#include "boost/asio/any_io_executor.hpp"
#include "boost/asio/deferred.hpp"
#include "boost/asio/experimental/parallel_group.hpp"
#include "boost/asio/post.hpp"
#include "boost/asio/steady_timer.hpp"
#include "boost/signals2/signal.hpp"
#include "boost/smart_ptr/local_shared_ptr.hpp"

namespace cross::comm {

template <typename CompletionToken, typename... SigArgs>
auto AsyncWaitSignal(boost::asio::any_io_executor ex, boost::signals2::signal<void(SigArgs...)> *sig,
                     CompletionToken &&token) {
  return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, SigArgs...)>(
      [](auto handler, auto ex, auto sig) mutable {
        auto sl = boost::asio::get_associated_cancellation_slot(handler);
        auto wrapper = std::make_shared<std::pair<decltype(handler), bool>>(std::move(handler), false);
        auto conn = sig->connect_extended([wrapper, ex](const auto &conn, SigArgs &&...args) mutable {
          // maybe in another thread in the callback
          conn.disconnect();
          boost::asio::post(ex,
                            [wrapper = std::move(wrapper),
                             args = std::make_tuple(boost::system::error_code{}, std::forward<SigArgs>(args)...)]() {
                              // now in ex's thread
                              if (wrapper->second) return;
                              wrapper->second = true;
                              std::apply(wrapper->first, std::move(args));
                            });
        });
        if (sl.is_connected()) {
          sl.assign([conn = std::move(conn), ex,
                     weak_wrapper = std::weak_ptr<typename decltype(wrapper)::element_type>(wrapper)](
                        boost::asio::cancellation_type_t) {
            auto wrapper = weak_wrapper.lock();  // acquire shared_ptr of handler before conn disconnect
            conn.disconnect();
            if (wrapper) {
              boost::asio::post(ex, [wrapper = std::move(wrapper)]() {
                // now in ex's thread
                if (wrapper->second) return;
                wrapper->second = true;
                std::tuple<boost::system::error_code, SigArgs...> canceled_args;
                std::get<0>(canceled_args) = boost::asio::error::operation_aborted;
                std::apply(wrapper->first, std::move(canceled_args));
              });
            }
          });
        }
      },
      token, std::move(ex), sig);
}

template <typename CompletionToken, typename... SigArgs>
auto AsyncWaitSignalWithTimeout(boost::asio::any_io_executor ex, boost::signals2::signal<void(SigArgs...)> *sig,
                                std::chrono::milliseconds timeout, CompletionToken &&token) {
  return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, SigArgs...)>(
      [](auto handler, auto ex, auto sig, auto timeout) mutable {
        auto sl = boost::asio::get_associated_cancellation_slot(handler);
        boost::local_shared_ptr<boost::asio::steady_timer> timer(new boost::asio::steady_timer(ex, timeout));
        boost::asio::experimental::make_parallel_group(timer->async_wait(boost::asio::deferred),
                                                       AsyncWaitSignal(ex, sig, boost::asio::deferred))
            .async_wait(boost::asio::experimental::wait_for_one(),
                        [timer, handler = std::move(handler)](
                            std::array<std::size_t, 2> completion_order, boost::system::error_code ec1,
                            boost::system::error_code ec2, SigArgs &&...args) mutable {
                          if (completion_order[0] == 0 && !ec1) {
                            std::tuple<boost::system::error_code, SigArgs...> timeout_args;
                            std::get<0>(timeout_args) = boost::asio::error::timed_out;
                            std::apply(handler, std::move(timeout_args));
                            return;
                          }
                          std::apply(handler, std::make_tuple(ec2, std::forward<SigArgs>(args)...));
                        });
        if (sl.is_connected()) {
          sl.assign([timer](boost::asio::cancellation_type_t) { timer->cancel(); });
        }
      },
      token, std::move(ex), sig, timeout);
}

// callback style, handler MUST be copyable, so coroutine is not suitable for this
template <typename Handler, typename... SigArgs>
boost::signals2::connection AsyncConnectSignal(boost::asio::any_io_executor ex,
                                               boost::signals2::signal<void(SigArgs...)> *sig, Handler &&handler) {
  return sig->connect_extended(
      [ex = std::move(ex), handler = std::move(handler)](const auto &conn, SigArgs &&...args) mutable {
        boost::asio::post(ex, [handler, conn, args = std::make_tuple(std::forward<SigArgs>(args)...)]() {
          if (conn.connected()) std::apply(handler, std::move(args));
        });
      });
}

}  // namespace cross::comm

#endif  // CROSS_COMM_ASYNC_SIGNAL_H

AsyncWaitSignal

这个模板函数用于异步等待信号的触发。它接收一个执行器(ex)、一个信号对象指针(sig)和一个完成令牌(token)。函数内部使用boost::asio::async_initiate来包装异步操作。

  • 内部逻辑:
    • 连接到信号,当信号触发时,使用boost::asio::post将回调函数发布到指定的执行器上执行。这确保了回调是在正确的上下文中执行。
    • 使用了std::shared_ptr来管理回调中的状态,确保在异步环境中安全地使用。
    • 支持取消操作,如果与异步操作关联的取消槽被触发,则断开信号连接,并通过执行器发布一个表示操作被取消的回调。

AsyncWaitSignalWithTimeout

这个函数在AsyncWaitSignal的基础上增加了超时机制。如果在指定的时间内信号没有被触发,则触发超时处理逻辑。

  • 内部逻辑:
    • 创建一个steady_timer,并与信号等待操作并行启动。
    • 使用boost::asio::experimental::make_parallel_group来组合定时器和信号等待操作,这允许同时等待两个异步操作。
    • 使用async_wait等待两个操作中的任意一个完成。根据完成的操作类型(定时器或信号),调用相应的处理逻辑。

AsyncConnectSignal

这个函数用于将用户定义的回调连接到一个信号。

  • 内部逻辑:
    • 使用信号的connect_extended方法注册回调。
    • 回调中使用boost::asio::post确保回调在正确的执行器上执行。
    • 检查连接状态,确保在信号仍然连接时执行用户的处理逻辑。

实现一个协程方法

定义一个协程方法,使用awaitable 来声明协程或异步的返回类型。

代码语言:javascript
复制
awaitable<void> mock_pay(std::string auth_code) {
  auto [ec, out_trade_no] = co_await PayRequest::SimulateMchPay(auth_code, 1);
  if (ec) {
    LOG_E("sim mch pay fail, ec: {} out_trade_no: {}", ec, out_trade_no);
  } else {
    LOG_I("sim mch pay out_trade_no: {}", out_trade_no);
  }
  co_return;
}

使用同步的代码风格写异步代码:

代码语言:javascript
复制
 co_await mock_pay(auth_code);

解析一下:

  • co_await:一元运算符,语义是挂起协程,并将程序控制权返回给调用者。
  • awaitable: 支持co_await运算符的类型,表示可等待对象。
  • co_return:用于从协程返回值,并标志着协程的结束。这与传统的 return 语句类似,但它是专为协程设计的,确保在返回值之前正确地清理和挂起协程状态。

实现一个timer等待

代码语言:javascript
复制
  asio::steady_timer timeout(Threads::MainThread()->Executor(), std::chrono::seconds(2));
  co_await timeout.async_wait(await_token);

解析一下:

  • asio::steady_timer 是 Boost.Asio 提供的一个用于精确计时的类。
  • Threads::MainThread()->Executor() 获取了主线程的执行器(Executor)。这个执行器是处理异步事件的上下文。
  • std::chrono::seconds(2) 指定定时器在两秒后激活。
  • async_wait 是一个异步操作,当定时器达到指定的时间后,它被触发。
  • await_token 是一个用于控制异步等待行为的对象。在 Boost.Asio 的 C++20 协程支持中,通常使用一种称为 use_awaitable 的特殊对象作为 await_token

实现等待一个超时异步信号

代码语言:javascript
复制
 auto [ec, result] = co_await comm::AsyncWaitSignalWithTimeout(
      this_thread::Executor(), SystemInterface::Instance()->NetworkScanWifiCompletedSig(), std::chrono::seconds(10),
      await_token);

以上这段代码将异步信号处理和超时逻辑封装到AsyncWaitSignalWithTimeout方法中,开发者实现相应的信号处理逻辑和传递超时参数即可,代码非常简洁易读。等待异步信号处理的在日常开发中应用非常常见,按传统的实现方式会非常繁琐,使用协程之后就变得容易。这里涉及关于信号的实现,由于篇幅有限,这里后续有机会继续补充相应的实践,更详细的参考:

https://www.boost.org/doc/libs/1_84_0/doc/html/signals2.html

实现类似Promise的并发模型

代码语言:javascript
复制
 auto results = co_await (ShorkLinkQueryPayResult(auth_code) || AsyncWaitNetworkPushMessage(auth_code));

这段代码展示的是使用 ||逻辑或操作符来实现多任务并行请求,等待两个异步任务,只要一个成功即视为成功。

ShorkLinkQueryPayResult代码示例:

这段代码展示的是通过短连接来实现查询支付结果,通过while循环结合timer实现一个定时轮询查单逻辑。可以看到每一次轮询都会调用co_await来等待异步任务FetchPayResult结果,也是通过协程的方式来处理网络请求。

AsyncWaitNetworkPushMessage代码示例:

这段代码展示的通过异步信号带超时时间,等待后台推送消息来实现推拉结合的方式查询支付结果。

除了逻辑或||,自然也可以通过逻辑与&&来实现等待多个异步任务的执行结果。对应的其实类似JavaScript中Promise机制中的Promise.all()Promise.any(),使用async/await语法糖即能实现类似的效果。

总结

本文介绍了协程的基本概念和用法,通过使用Boost.Asio框架实现了高效的协程封装,使用同步的方式编写异步代码带来的简洁性和代码健壮性,无需处理复杂的状态扭转,让开发更好的关注业务代码的实现,用更低的成本实现复杂的并发任务。笔者提供了我们在工程中常见的协程使用案例,比如:

  • 使用awaitable来声明一个协程方法
  • 使用asio::steady_timer来实现定时逻辑
  • 使用boost::signals2::signalpost 方法来实现复杂的异步信号处理
  • 使用 &&|| 来实现类似Promise机制中的并发任务模型

通过以上的实践,基本可以满足90%以上业务开发当中的的异步编程场景,未来也可以继续深入学习异步编程的本质,探索更加高效和优雅的实现方式。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-08-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 巫山老妖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 进程 vs 线程 vs 协程
  • Boost.Asio 异步模型
    • Boost.Asio 简介
      • Post vs CoSpawn
      • 协程的一些代码实践
        • 针对Boost.Asio协程实现的封装
          • 实现一个协程方法
            • 实现一个timer等待
              • 实现等待一个超时异步信号
                • 实现类似Promise的并发模型
                • 总结
                相关产品与服务
                GPU 云服务器
                GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于生成式AI,自动驾驶,深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档