对C++ Coroutine feature的后续展开。

前言

本文仅供学习探讨之用,如果侵犯了您的权益请联系我删除。

工具

  1. Visual Studio Code

基本介绍

协程这玩意我觉得其实没必要多做什么介绍,现在流行的语言基本上都有这玩意,网上也有很多相关的文章。如果你还没听过的话我觉得你可以先谷歌或百度查查相关资料。

暗中观察

C++推出协程特性都已经两年了。虽说刚出的时候就造了个异步任务的轮子,不过一直没什么机会用,后来也就扔一边了。

最近想起又拿出来试了试,发现有点难用。加上我想写篇文章记录一下一些设计思路,索性就直接重构了一下,并增加一些新的功能。

进入正题,因为本期代码比较长所以本文中应该不会出现完整的代码,只会把基本写法和一些关键的设计部分拿出来解释说明。

至于完整的代码实现与简单测试都已上传至我的Github仓库中,如果你感兴趣可以自行跳转查看。

仓库地址https://github.com/Bzi-Han/coroutine

编译器支持

使用本协程库的一些限制条件

MSVC: 19.29.30145^

GCC: 12.2^

CLANG: 15.0.0^

如果你的编译器版本低于以上版本的请自行测试。

基础协程

一个基础的协程是由一个类或结构体包含着一个公开的名字固定为promise_type的结构体或类构成的,只要你的promise_type必要的成员都是公开的就行。

必要的成员有以下几个

  1. get_return_object()用于返回实际的协程对象。
  2. initial_suspend()协程对象创建后的初始暂停点。
  3. final_suspend()协程执行完的最终暂停点。
  4. return_value()return_void()用于接收协程的返回值。
  5. unhandled_exception()用于处理协程中抛出的未在内部处理的异常。

可选的成员有以下几个

  1. yield_value()用于接收co_yield的值。
  2. await_transform()operator co_await用于对co_await语法的重载。
  3. get_return_object_on_allocation_failure()用于接管协程分配内存失败时默认抛出std::bad_alloc异常的操作。

接下来我们来看以下一个最简单的协程该怎么写,上代码

struct task_promise_type;

struct task : std::coroutine_handle<task_promise_type>
{
    using promise_type = task_promise_type;
};

struct task_promise_type
{
    task get_return_object() { return {task::from_promise(*this)}; }

    std::suspend_always initial_suspend() noexcept { return {}; }

    std::suspend_always final_suspend() noexcept { return {}; }

    void return_void() {}

    void unhandled_exception() {}
};

这里task继承std::coroutine_handle属于偷懒写法,由父类来接收协程句柄并使用父类的接口进行管理,实际在get_return_object()时协程的句柄可有你自己自行分配。

例如不继承std::coroutine_handle并在task的构造函数中接收协程句柄并存入私有成员等待后续使用,然后在析构函数中去销毁协程(handle.destroy())就可实现通过RAII自动管理协程的生命周期,后面介绍库的时候会说到。

需要了解的是initial_suspendfinal_suspend的返回值均为一个awaitable

awaitable决定了当前协程后续的流程走向,这里使用的std::suspend_always为语言提供的默认awaitable之一,此外还有个std::suspend_never表示不暂停。

awaitable也可以自己实现,与promise_type相似,其必须包含以下成员

  1. await_ready()用于决定当前协程是否暂停。
  2. await_suspend()如当前协程决定暂停,则会进入此函数。
  3. await_resume()当前协程准备继续执行时会进入此函数,其返回值就是co_await的返回值。

那么怎么用

task test01()
{
    std::cout << "黑之契约者给我整破防了" << std::endl;
    co_await std::suspend_always{};
    std::cout << "继续破防" << std::endl;
}

int main()
{
    auto coro01 = test01();

    coro01.resume();
    std::cout << "先缓缓" << std::endl;
    coro01.resume();
    coro01.destroy();

    return 0;
}

黑之契约者给我整破防了
先缓缓
继续破防

还是很简单的,接下看一下可选的三个成员怎么使用

添加一个结构体normal_function_transform用于将普通的无参函数转成支持我们协程执行流程的awaitable

并修改promise_type添加我们需要的三个成员函数

template <typename runable_t, typename return_t>
struct normal_function_transform
{
    runable_t runable;
    return_t result;

    constexpr bool await_ready() const noexcept { return false; }
    constexpr void await_suspend(std::coroutine_handle<> resumePoint) noexcept
    {
        result = runable();
        // resumePoint.resume()
        //   这里可以直接在这里释放继续执行当前协程,也可以等待外部调度。
        //   如果在这里继续执行当前协程则不会交出调度权。
    }
    constexpr return_t await_resume() const noexcept
    {
        return result;
    }
};

template <typename runable_t>
struct normal_function_transform<runable_t, void>
{
    runable_t runable;

    constexpr bool await_ready() const noexcept { return false; }
    constexpr void await_suspend(std::coroutine_handle<> resumePoint) const noexcept
    {
        runable();
    }
    constexpr void await_resume() const noexcept {}
};

struct task_promise_type
{
    task get_return_object() { return {task::from_promise(*this)}; }

    std::suspend_always initial_suspend() noexcept { return {}; }

    std::suspend_always final_suspend() noexcept { return {}; }

    void return_void() {}

    void unhandled_exception() {}

    std::suspend_always yield_value(const std::string_view &data)
    {
        std::cout << "[task_promise_type::yield_value] yield value: " << data << std::endl;
        return {};
    }

    template <typename runable_t>
    auto await_transform(runable_t runable)
    {
        return normal_function_transform<runable_t, std::invoke_result_t<runable_t>>{runable};
    }

    static task get_return_object_on_allocation_failure()
    {
        std::cout << "[task_promise_type::get_return_object_on_allocation_failure] boom" << std::endl;
        return {nullptr};
    }
};

然后我们来看一下使用的代码以及效果

int test00()
{
    std::cout << "[test00] test00 called" << std::endl;
    return 233;
}

task test01()
{
    std::cout << "[test01] 黑之契约者给我整破防了" << std::endl;
    auto result = co_await test00;
    std::cout << "[test01] co_await result: " << result << std::endl;
    std::cout << "[test01] 继续破防" << std::endl;
    co_yield "提交一个值并交出调度权";
    std::cout << "[test01] 继续结束" << std::endl;
}

int main()
{
    auto coro01 = test01();

    coro01.resume();
    std::cout << "[main] 先缓缓" << std::endl;
    coro01.resume();
    std::cout << "[main] 又回来了" << std::endl;
    coro01(); //coro01.resume();
    coro01.destroy();

    return 0;
}

[test01] 黑之契约者给我整破防了
[test00] test00 called
[main] 先缓缓
[test01] co_await result: 233
[test01] 继续破防
[task_promise_type::yield_value] yield value: 提交一个值并交出调度权
[main] 又回来了
[test01] 继续结束

那么到这里我觉得基础的协程也差不多了,接下来就来看一下这个库的协程与调度器的一些设计。

协程任务设计

基础结构

TaskBase

这个类中包含了4个static类型的成员,全部为thread_local属性。

  1. m_taskQueue 通用任务的执行队列。
  2. m_delayQueue 延迟任务的执行队列。
  3. m_resumeQueue 保存子协程运行结束的返回点队列。
  4. m_finallyQueue 保存当前协程运行结束后co_finally语法所执行的协程。

task_promise_type_base

final_suspend

这个函数有两个分支,一个是开启co_finally语法的分支,另一个是没开启的分支。

未开启co_finally语法的执行流程

  • 检测当前协程在m_resumeQueue中是否存在返回点,如果存在则将返回点放入m_taskQueue中,并将m_resumeQueue中的返回点抹除。

开启co_finally语法的执行流程

  1. 检测当前协程在m_resumeQueue中是否存在返回点,如果存在则将返回点取出,并将m_resumeQueue中的返回点抹除。
  2. 通过取出的返回点判断当前协程是不是finally协程,如果是则将返回点放入m_taskQueue并返回。
  3. 如果不是则检测当前协程是否存在finally协程,如果存在则将所有finally协程放入m_taskQueue并将返回点放入m_resumeQueue中。
  4. 如果当前协程不存在finally协程,则将取出的返回点放入m_taskQueue中。
yield_value

这个函数有一个接收yield_t类型的默认函数与一个接收延迟任务类型参数的重载。

默认函数yield_value(const std::conditional_t<std::is_same_v<void, yield_t>, uint8_t, yield_t> &result)的执行流程

  1. 将提交上来的result存入预定的std::vector中。
  2. 将自身协程放入m_taskQueue中等待再次调度。

重载函数yield_value(deferrable &&defer)的执行流程

  • 根据传进的参数构造延迟任务并放入m_delayQueue中等待调度。

task_promise_type

这个类继承于task_promise_type_base,有一个默认入口与一个类型偏特化入口。

默认入口接收任何类型的return_t

偏特化入口只接收void类型的return_t

这样做的目的是用于实现无返回值promise_type与有返回值promise_type的同时兼容。

Task

这个类继承于TaskBase,是实际协程的实现类,实际协程调度时操作的就是当前Task继承于TaskBase的4个任务队列。

该类使用using aliaspromise_type进行定义并使用私有成员std::coroutine_handle<promise_type> m_coroutineHandlepromise_type初始化的协程句柄进行保存。

co_delay语法

co_delay语法的实现为

#define co_delay co_yield coroutine::task_implement::deferrable{} =

其中deferrable为一个用来标识和存储延迟任务的结构体,他的定义如下

struct deferrable
{
    void *task = nullptr;
    std::chrono::milliseconds timeout;

    deferrable &&operator=(size_t timeout)
    {
        this->timeout = std::chrono::milliseconds{timeout};

        return std::move(*this);
    }
};

可以看到它实际上是使用co_yield语法对当前协程提交了一个deferrable,而延迟执行的毫秒数为co_delay后面跟的立即数。如:co_delay 233;的延迟时间为233毫秒啊。

co_switch语法

co_switch语法的实现为

#define co_switch co_yield {}

co_delay一样,不过提交的是一个initialize list,实际走的是一个uint8_t类型的0

co_finally语法

它的使用方法为

co_finally
{
    // write any code...
};

co_finally语法的实现为

#define _COROMAKELAMBDA(x, y) x##y
#define COROMAKELAMBDA(x, y) _COROMAKELAMBDA(x, y)
#define co_finally coroutine::task_implement::__CoroFinally COROMAKELAMBDA(lambda, __COUNTER__) = [&]() -> coroutine::task_implement::Task<void, void>

这里COROMAKELAMBDA宏用于实现随机的变量名,其实际名为lambda+序号,序号由编译器提供的宏__COUNTER__提供,它的值是它使用的第几次次数的值,从0开始。

__CoroFinally类的定义如下

template <typename coro_t>
class __CoroFinally
{
public:
    __CoroFinally(coro_t coro)
        : m_coro(std::move(coro))
    {
    }
    ~__CoroFinally()
    {
        TaskBase::m_finallyQueue.push({m_coro(), false});
    }

private:
    coro_t m_coro;
};

可以看到,这里实际上是声明了一个名为lambda+x__CoroFinally类型的局部变量。

在初始化的时候传入一个返回值和提交值都为void的一个不完整的lambda表达式,
其使用用户所编写的代码块进行补全,变成一个完整的lambda表达式。

在当前协程运行完走出作用域时,由于RAII的作用__CoroFinally会将自身持有的协程放入m_finallyQueue中,然后等待调度器的执行并完成finally所要执行的工作。

调度器设计

基于上面设计的结构,在编写调度器时我们只需要关注与维护TaskBase的4个静态成员即可。

单线程调度器

对于单线程调度器,我们只需要在调用run函数时检测m_taskQueuem_delayQueue是否为空,如果不为空则将协程取出并执行清空即可。

普通任务与延迟任务的调度优先级取决于个人,这里写的流程为

  1. 检测m_delayQueuem_taskQueue是否为空,如果都为空则跳出死循环并结束此次的运行。
  2. 如果m_delayQueue不为空,取出头部任务并判断是否已超时,如果已超时则执行取出的任务并从m_delayQueue中抹除。
  3. 检测m_taskQueue是否为空,如果为空则使用continue回到死循环的头部开始执行。
  4. 如果m_taskQeueu不为空,取出头部的任务执行,并从m_taskQueue中抹除。

多线程调度器

先来看一下多线程调度器的成员与任务的转移流程

using channel_t = std::promise<std::pair<void *, void *>> *;
using future_t = std::future<std::pair<void *, void *>>;

constexpr static size_t m_minTasks = 2, m_maxTasks = 3; // 本地任务队列任务数量的最大值与最小值
static size_t m_maxWokers;                              // 最大工作线程数量
static bool m_work;                                     // 用于控制线程池的开关
static std::mutex m_globalMutex, m_localMutex;          // 队列锁
static std::vector<std::thread> m_workers;              // 工作线程池
static std::queue<void *> m_globalQueue;                // 全局任务队列,给调度器添加任务时会添加到此队列中
static std::queue<channel_t> m_channels;                // 用于偷取其他工作线程中的本地任务的通信通道

当我们给调度器添加一个任务时,首先会被添加到m_globalQueue全局任务队列中,然后每个工作线程会在m_globalQueue中获取属于自己的任务。

获取到的任务会添加到当前线程的本地队列中,也就是TaskBase的4个静态成员。

当有空闲线程出现时,它会往m_channels中添加自己接收其他线程”施舍”的任务的promise,如果其他线程的本地队列过长,就会将自己的头部任务交给其他在空闲中的线程来执行,自己则执行下一个任务。

run函数的执行流程

  • 检测当前工作线程数量是否大于m_maxWokers,如果还不足则添加工作线程至可以执行本次m_globalQueue队列中任务数量的工作线程数量(最多不超过上限)。

worker函数的执行流程

由于基础流程与单线程调度器基本一致,所以我们只讲不同的部分。

  1. 检测m_globalQueue是否为不为空且m_taskQueue本地任务队列存在的任务数量是否小于m_minTasks
  2. 如果小于这个数量则从m_globalQueue获取一个任务并放入m_taskQueue中。
  3. m_globalQueuem_taskQueue都为空,而m_work不为false时,创建一个promise并放入m_channels中。
  4. 每隔1s检测一次promise是否已经准备就绪,若已经准备就绪(已经偷到别的线程的本地任务),则从中取出偷取的到的任务并加入m_taskQueue中。

至此,对于这个协程库的介绍就完了。

结语

这篇文章咕咕咕了好久

qiqi

那就这样了,有缘再见~