对C++ Coroutine feature的后续展开。
前言
本文仅供学习探讨之用,如果侵犯了您的权益请联系我删除。
工具
基本介绍
协程这玩意我觉得其实没必要多做什么介绍,现在流行的语言基本上都有这玩意,网上也有很多相关的文章。如果你还没听过的话我觉得你可以先谷歌或百度查查相关资料。

从C++推出协程特性都已经两年了。虽说刚出的时候就造了个异步任务的轮子,不过一直没什么机会用,后来也就扔一边了。
最近想起又拿出来试了试,发现有点难用。加上我想写篇文章记录一下一些设计思路,索性就直接重构了一下,并增加一些新的功能。
进入正题,因为本期代码比较长所以本文中应该不会出现完整的代码,只会把基本写法和一些关键的设计部分拿出来解释说明。
至于完整的代码实现与简单测试都已上传至我的Github仓库中,如果你感兴趣可以自行跳转查看。
仓库地址:https://github.com/Bzi-Han/coroutine
编译器支持
使用本协程库的一些限制条件
MSVC: 19.29.30145^
GCC: 12.2^
CLANG: 15.0.0^
如果你的编译器版本低于以上版本的请自行测试。
基础协程
一个基础的协程是由一个类或结构体包含着一个公开的名字固定为promise_type的结构体或类构成的,只要你的promise_type必要的成员都是公开的就行。
必要的成员有以下几个
get_return_object()用于返回实际的协程对象。initial_suspend()协程对象创建后的初始暂停点。final_suspend()协程执行完的最终暂停点。return_value()或return_void()用于接收协程的返回值。unhandled_exception()用于处理协程中抛出的未在内部处理的异常。
可选的成员有以下几个
yield_value()用于接收co_yield的值。await_transform()或operator co_await用于对co_await语法的重载。get_return_object_on_allocation_failure()用于接管协程分配内存失败时默认抛出std::bad_alloc异常的操作。
接下来我们来看以下一个最简单的协程该怎么写,上代码
struct task_promise_type;
struct task : std::coroutine_handle<task_promise_type>
{
using promise_type = task_promise_type;
};
struct task_promise_type
{
task get_return_object() { return {task::from_promise(*this)}; }
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
这里task继承std::coroutine_handle属于偷懒写法,由父类来接收协程句柄并使用父类的接口进行管理,实际在get_return_object()时协程的句柄可有你自己自行分配。
例如不继承std::coroutine_handle并在task的构造函数中接收协程句柄并存入私有成员等待后续使用,然后在析构函数中去销毁协程(handle.destroy())就可实现通过RAII自动管理协程的生命周期,后面介绍库的时候会说到。
需要了解的是initial_suspend与final_suspend的返回值均为一个awaitable。
awaitable决定了当前协程后续的流程走向,这里使用的std::suspend_always为语言提供的默认awaitable之一,此外还有个std::suspend_never表示不暂停。
awaitable也可以自己实现,与promise_type相似,其必须包含以下成员
await_ready()用于决定当前协程是否暂停。await_suspend()如当前协程决定暂停,则会进入此函数。await_resume()当前协程准备继续执行时会进入此函数,其返回值就是co_await的返回值。
那么怎么用
task test01()
{
std::cout << "黑之契约者给我整破防了" << std::endl;
co_await std::suspend_always{};
std::cout << "继续破防" << std::endl;
}
int main()
{
auto coro01 = test01();
coro01.resume();
std::cout << "先缓缓" << std::endl;
coro01.resume();
coro01.destroy();
return 0;
}
黑之契约者给我整破防了
先缓缓
继续破防
还是很简单的,接下看一下可选的三个成员怎么使用
添加一个结构体normal_function_transform用于将普通的无参函数转成支持我们协程执行流程的awaitable
并修改promise_type添加我们需要的三个成员函数
template <typename runable_t, typename return_t>
struct normal_function_transform
{
runable_t runable;
return_t result;
constexpr bool await_ready() const noexcept { return false; }
constexpr void await_suspend(std::coroutine_handle<> resumePoint) noexcept
{
result = runable();
// resumePoint.resume()
// 这里可以直接在这里释放继续执行当前协程,也可以等待外部调度。
// 如果在这里继续执行当前协程则不会交出调度权。
}
constexpr return_t await_resume() const noexcept
{
return result;
}
};
template <typename runable_t>
struct normal_function_transform<runable_t, void>
{
runable_t runable;
constexpr bool await_ready() const noexcept { return false; }
constexpr void await_suspend(std::coroutine_handle<> resumePoint) const noexcept
{
runable();
}
constexpr void await_resume() const noexcept {}
};
struct task_promise_type
{
task get_return_object() { return {task::from_promise(*this)}; }
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
std::suspend_always yield_value(const std::string_view &data)
{
std::cout << "[task_promise_type::yield_value] yield value: " << data << std::endl;
return {};
}
template <typename runable_t>
auto await_transform(runable_t runable)
{
return normal_function_transform<runable_t, std::invoke_result_t<runable_t>>{runable};
}
static task get_return_object_on_allocation_failure()
{
std::cout << "[task_promise_type::get_return_object_on_allocation_failure] boom" << std::endl;
return {nullptr};
}
};
然后我们来看一下使用的代码以及效果
int test00()
{
std::cout << "[test00] test00 called" << std::endl;
return 233;
}
task test01()
{
std::cout << "[test01] 黑之契约者给我整破防了" << std::endl;
auto result = co_await test00;
std::cout << "[test01] co_await result: " << result << std::endl;
std::cout << "[test01] 继续破防" << std::endl;
co_yield "提交一个值并交出调度权";
std::cout << "[test01] 继续结束" << std::endl;
}
int main()
{
auto coro01 = test01();
coro01.resume();
std::cout << "[main] 先缓缓" << std::endl;
coro01.resume();
std::cout << "[main] 又回来了" << std::endl;
coro01(); //coro01.resume();
coro01.destroy();
return 0;
}
[test01] 黑之契约者给我整破防了
[test00] test00 called
[main] 先缓缓
[test01] co_await result: 233
[test01] 继续破防
[task_promise_type::yield_value] yield value: 提交一个值并交出调度权
[main] 又回来了
[test01] 继续结束
那么到这里我觉得基础的协程也差不多了,接下来就来看一下这个库的协程与调度器的一些设计。
协程任务设计
基础结构
TaskBase
这个类中包含了4个static类型的成员,全部为thread_local属性。
m_taskQueue通用任务的执行队列。m_delayQueue延迟任务的执行队列。m_resumeQueue保存子协程运行结束的返回点队列。m_finallyQueue保存当前协程运行结束后co_finally语法所执行的协程。
task_promise_type_base
final_suspend
这个函数有两个分支,一个是开启co_finally语法的分支,另一个是没开启的分支。
未开启co_finally语法的执行流程
- 检测当前协程在
m_resumeQueue中是否存在返回点,如果存在则将返回点放入m_taskQueue中,并将m_resumeQueue中的返回点抹除。
开启co_finally语法的执行流程
- 检测当前协程在
m_resumeQueue中是否存在返回点,如果存在则将返回点取出,并将m_resumeQueue中的返回点抹除。 - 通过取出的返回点判断当前协程是不是
finally协程,如果是则将返回点放入m_taskQueue并返回。 - 如果不是则检测当前协程是否存在
finally协程,如果存在则将所有finally协程放入m_taskQueue并将返回点放入m_resumeQueue中。 - 如果当前协程不存在
finally协程,则将取出的返回点放入m_taskQueue中。
yield_value
这个函数有一个接收yield_t类型的默认函数与一个接收延迟任务类型参数的重载。
默认函数yield_value(const std::conditional_t<std::is_same_v<void, yield_t>, uint8_t, yield_t> &result)的执行流程
- 将提交上来的
result存入预定的std::vector中。 - 将自身协程放入
m_taskQueue中等待再次调度。
重载函数yield_value(deferrable &&defer)的执行流程
- 根据传进的参数构造延迟任务并放入
m_delayQueue中等待调度。
task_promise_type
这个类继承于task_promise_type_base,有一个默认入口与一个类型偏特化入口。
默认入口接收任何类型的return_t。
偏特化入口只接收void类型的return_t。
这样做的目的是用于实现无返回值promise_type与有返回值promise_type的同时兼容。
Task
这个类继承于TaskBase,是实际协程的实现类,实际协程调度时操作的就是当前Task继承于TaskBase的4个任务队列。
该类使用using alias对promise_type进行定义并使用私有成员std::coroutine_handle<promise_type> m_coroutineHandle对promise_type初始化的协程句柄进行保存。
co_delay语法
co_delay语法的实现为
#define co_delay co_yield coroutine::task_implement::deferrable{} =
其中deferrable为一个用来标识和存储延迟任务的结构体,他的定义如下
struct deferrable
{
void *task = nullptr;
std::chrono::milliseconds timeout;
deferrable &&operator=(size_t timeout)
{
this->timeout = std::chrono::milliseconds{timeout};
return std::move(*this);
}
};
可以看到它实际上是使用co_yield语法对当前协程提交了一个deferrable,而延迟执行的毫秒数为co_delay后面跟的立即数。如:co_delay 233;的延迟时间为233毫秒啊。
co_switch语法
co_switch语法的实现为
#define co_switch co_yield {}
与co_delay一样,不过提交的是一个initialize list,实际走的是一个uint8_t类型的0。
co_finally语法
它的使用方法为
co_finally
{
// write any code...
};
co_finally语法的实现为
#define _COROMAKELAMBDA(x, y) x##y
#define COROMAKELAMBDA(x, y) _COROMAKELAMBDA(x, y)
#define co_finally coroutine::task_implement::__CoroFinally COROMAKELAMBDA(lambda, __COUNTER__) = [&]() -> coroutine::task_implement::Task<void, void>
这里COROMAKELAMBDA宏用于实现随机的变量名,其实际名为lambda+序号,序号由编译器提供的宏__COUNTER__提供,它的值是它使用的第几次次数的值,从0开始。
__CoroFinally类的定义如下
template <typename coro_t>
class __CoroFinally
{
public:
__CoroFinally(coro_t coro)
: m_coro(std::move(coro))
{
}
~__CoroFinally()
{
TaskBase::m_finallyQueue.push({m_coro(), false});
}
private:
coro_t m_coro;
};
可以看到,这里实际上是声明了一个名为lambda+x的__CoroFinally类型的局部变量。
在初始化的时候传入一个返回值和提交值都为void的一个不完整的lambda表达式,
其使用用户所编写的代码块进行补全,变成一个完整的lambda表达式。
在当前协程运行完走出作用域时,由于RAII的作用__CoroFinally会将自身持有的协程放入m_finallyQueue中,然后等待调度器的执行并完成finally所要执行的工作。
调度器设计
基于上面设计的结构,在编写调度器时我们只需要关注与维护TaskBase的4个静态成员即可。
单线程调度器
对于单线程调度器,我们只需要在调用run函数时检测m_taskQueue与m_delayQueue是否为空,如果不为空则将协程取出并执行清空即可。
普通任务与延迟任务的调度优先级取决于个人,这里写的流程为
- 检测
m_delayQueue与m_taskQueue是否为空,如果都为空则跳出死循环并结束此次的运行。 - 如果
m_delayQueue不为空,取出头部任务并判断是否已超时,如果已超时则执行取出的任务并从m_delayQueue中抹除。 - 检测
m_taskQueue是否为空,如果为空则使用continue回到死循环的头部开始执行。 - 如果
m_taskQeueu不为空,取出头部的任务执行,并从m_taskQueue中抹除。
多线程调度器
先来看一下多线程调度器的成员与任务的转移流程
using channel_t = std::promise<std::pair<void *, void *>> *;
using future_t = std::future<std::pair<void *, void *>>;
constexpr static size_t m_minTasks = 2, m_maxTasks = 3; // 本地任务队列任务数量的最大值与最小值
static size_t m_maxWokers; // 最大工作线程数量
static bool m_work; // 用于控制线程池的开关
static std::mutex m_globalMutex, m_localMutex; // 队列锁
static std::vector<std::thread> m_workers; // 工作线程池
static std::queue<void *> m_globalQueue; // 全局任务队列,给调度器添加任务时会添加到此队列中
static std::queue<channel_t> m_channels; // 用于偷取其他工作线程中的本地任务的通信通道
当我们给调度器添加一个任务时,首先会被添加到m_globalQueue全局任务队列中,然后每个工作线程会在m_globalQueue中获取属于自己的任务。
获取到的任务会添加到当前线程的本地队列中,也就是TaskBase的4个静态成员。
当有空闲线程出现时,它会往m_channels中添加自己接收其他线程”施舍”的任务的promise,如果其他线程的本地队列过长,就会将自己的头部任务交给其他在空闲中的线程来执行,自己则执行下一个任务。
run函数的执行流程
- 检测当前工作线程数量是否大于
m_maxWokers,如果还不足则添加工作线程至可以执行本次m_globalQueue队列中任务数量的工作线程数量(最多不超过上限)。
worker函数的执行流程
由于基础流程与单线程调度器基本一致,所以我们只讲不同的部分。
- 检测
m_globalQueue是否为不为空且m_taskQueue本地任务队列存在的任务数量是否小于m_minTasks。 - 如果小于这个数量则从
m_globalQueue获取一个任务并放入m_taskQueue中。 - 当
m_globalQueue与m_taskQueue都为空,而m_work不为false时,创建一个promise并放入m_channels中。 - 每隔1s检测一次
promise是否已经准备就绪,若已经准备就绪(已经偷到别的线程的本地任务),则从中取出偷取的到的任务并加入m_taskQueue中。
至此,对于这个协程库的介绍就完了。
结语
这篇文章咕咕咕了好久

那就这样了,有缘再见~