对C++ Coroutine feature的后续展开。
前言
本文仅供学习探讨之用,如果侵犯了您的权益请联系我删除。
工具
基本介绍
协程这玩意我觉得其实没必要多做什么介绍,现在流行的语言基本上都有这玩意,网上也有很多相关的文章。如果你还没听过的话我觉得你可以先谷歌或百度查查相关资料。
从C++
推出协程特性都已经两年了。虽说刚出的时候就造了个异步任务的轮子,不过一直没什么机会用,后来也就扔一边了。
最近想起又拿出来试了试,发现有点难用。加上我想写篇文章记录一下一些设计思路,索性就直接重构了一下,并增加一些新的功能。
进入正题,因为本期代码比较长所以本文中应该不会出现完整的代码,只会把基本写法和一些关键的设计部分拿出来解释说明。
至于完整的代码实现与简单测试都已上传至我的Github仓库中,如果你感兴趣可以自行跳转查看。
仓库地址:https://github.com/Bzi-Han/coroutine
编译器支持
使用本协程库的一些限制条件
MSVC: 19.29.30145^
GCC: 12.2^
CLANG: 15.0.0^
如果你的编译器版本低于以上版本的请自行测试。
基础协程
一个基础的协程是由一个类或结构体包含着一个公开的名字固定为promise_type
的结构体或类构成的,只要你的promise_type
必要的成员都是公开的就行。
必要的成员有以下几个
get_return_object()
用于返回实际的协程对象。initial_suspend()
协程对象创建后的初始暂停点。final_suspend()
协程执行完的最终暂停点。return_value()
或return_void()
用于接收协程的返回值。unhandled_exception()
用于处理协程中抛出的未在内部处理的异常。
可选的成员有以下几个
yield_value()
用于接收co_yield
的值。await_transform()
或operator co_await
用于对co_await
语法的重载。get_return_object_on_allocation_failure()
用于接管协程分配内存失败时默认抛出std::bad_alloc
异常的操作。
接下来我们来看以下一个最简单的协程该怎么写,上代码
struct task_promise_type;
struct task : std::coroutine_handle<task_promise_type>
{
using promise_type = task_promise_type;
};
struct task_promise_type
{
task get_return_object() { return {task::from_promise(*this)}; }
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
这里task
继承std::coroutine_handle
属于偷懒写法,由父类来接收协程句柄并使用父类的接口进行管理,实际在get_return_object()
时协程的句柄可有你自己自行分配。
例如不继承std::coroutine_handle
并在task
的构造函数中接收协程句柄并存入私有成员等待后续使用,然后在析构函数中去销毁协程(handle.destroy()
)就可实现通过RAII
自动管理协程的生命周期,后面介绍库的时候会说到。
需要了解的是initial_suspend
与final_suspend
的返回值均为一个awaitable
。
awaitable
决定了当前协程后续的流程走向,这里使用的std::suspend_always
为语言提供的默认awaitable
之一,此外还有个std::suspend_never
表示不暂停。
awaitable
也可以自己实现,与promise_type
相似,其必须包含以下成员
await_ready()
用于决定当前协程是否暂停。await_suspend()
如当前协程决定暂停,则会进入此函数。await_resume()
当前协程准备继续执行时会进入此函数,其返回值就是co_await
的返回值。
那么怎么用
task test01()
{
std::cout << "黑之契约者给我整破防了" << std::endl;
co_await std::suspend_always{};
std::cout << "继续破防" << std::endl;
}
int main()
{
auto coro01 = test01();
coro01.resume();
std::cout << "先缓缓" << std::endl;
coro01.resume();
coro01.destroy();
return 0;
}
黑之契约者给我整破防了
先缓缓
继续破防
还是很简单的,接下看一下可选的三个成员怎么使用
添加一个结构体normal_function_transform
用于将普通的无参函数转成支持我们协程执行流程的awaitable
并修改promise_type
添加我们需要的三个成员函数
template <typename runable_t, typename return_t>
struct normal_function_transform
{
runable_t runable;
return_t result;
constexpr bool await_ready() const noexcept { return false; }
constexpr void await_suspend(std::coroutine_handle<> resumePoint) noexcept
{
result = runable();
// resumePoint.resume()
// 这里可以直接在这里释放继续执行当前协程,也可以等待外部调度。
// 如果在这里继续执行当前协程则不会交出调度权。
}
constexpr return_t await_resume() const noexcept
{
return result;
}
};
template <typename runable_t>
struct normal_function_transform<runable_t, void>
{
runable_t runable;
constexpr bool await_ready() const noexcept { return false; }
constexpr void await_suspend(std::coroutine_handle<> resumePoint) const noexcept
{
runable();
}
constexpr void await_resume() const noexcept {}
};
struct task_promise_type
{
task get_return_object() { return {task::from_promise(*this)}; }
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
std::suspend_always yield_value(const std::string_view &data)
{
std::cout << "[task_promise_type::yield_value] yield value: " << data << std::endl;
return {};
}
template <typename runable_t>
auto await_transform(runable_t runable)
{
return normal_function_transform<runable_t, std::invoke_result_t<runable_t>>{runable};
}
static task get_return_object_on_allocation_failure()
{
std::cout << "[task_promise_type::get_return_object_on_allocation_failure] boom" << std::endl;
return {nullptr};
}
};
然后我们来看一下使用的代码以及效果
int test00()
{
std::cout << "[test00] test00 called" << std::endl;
return 233;
}
task test01()
{
std::cout << "[test01] 黑之契约者给我整破防了" << std::endl;
auto result = co_await test00;
std::cout << "[test01] co_await result: " << result << std::endl;
std::cout << "[test01] 继续破防" << std::endl;
co_yield "提交一个值并交出调度权";
std::cout << "[test01] 继续结束" << std::endl;
}
int main()
{
auto coro01 = test01();
coro01.resume();
std::cout << "[main] 先缓缓" << std::endl;
coro01.resume();
std::cout << "[main] 又回来了" << std::endl;
coro01(); //coro01.resume();
coro01.destroy();
return 0;
}
[test01] 黑之契约者给我整破防了
[test00] test00 called
[main] 先缓缓
[test01] co_await result: 233
[test01] 继续破防
[task_promise_type::yield_value] yield value: 提交一个值并交出调度权
[main] 又回来了
[test01] 继续结束
那么到这里我觉得基础的协程也差不多了,接下来就来看一下这个库的协程与调度器的一些设计。
协程任务设计
基础结构
TaskBase
这个类中包含了4个static
类型的成员,全部为thread_local
属性。
m_taskQueue
通用任务的执行队列。m_delayQueue
延迟任务的执行队列。m_resumeQueue
保存子协程运行结束的返回点队列。m_finallyQueue
保存当前协程运行结束后co_finally
语法所执行的协程。
task_promise_type_base
final_suspend
这个函数有两个分支,一个是开启co_finally
语法的分支,另一个是没开启的分支。
未开启co_finally
语法的执行流程
- 检测当前协程在
m_resumeQueue
中是否存在返回点,如果存在则将返回点放入m_taskQueue
中,并将m_resumeQueue
中的返回点抹除。
开启co_finally
语法的执行流程
- 检测当前协程在
m_resumeQueue
中是否存在返回点,如果存在则将返回点取出,并将m_resumeQueue
中的返回点抹除。 - 通过取出的返回点判断当前协程是不是
finally
协程,如果是则将返回点放入m_taskQueue
并返回。 - 如果不是则检测当前协程是否存在
finally
协程,如果存在则将所有finally
协程放入m_taskQueue
并将返回点放入m_resumeQueue
中。 - 如果当前协程不存在
finally
协程,则将取出的返回点放入m_taskQueue
中。
yield_value
这个函数有一个接收yield_t
类型的默认函数与一个接收延迟任务类型参数的重载。
默认函数yield_value(const std::conditional_t<std::is_same_v<void, yield_t>, uint8_t, yield_t> &result)
的执行流程
- 将提交上来的
result
存入预定的std::vector
中。 - 将自身协程放入
m_taskQueue
中等待再次调度。
重载函数yield_value(deferrable &&defer)
的执行流程
- 根据传进的参数构造延迟任务并放入
m_delayQueue
中等待调度。
task_promise_type
这个类继承于task_promise_type_base
,有一个默认入口与一个类型偏特化入口。
默认入口接收任何类型的return_t
。
偏特化入口只接收void
类型的return_t
。
这样做的目的是用于实现无返回值promise_type
与有返回值promise_type
的同时兼容。
Task
这个类继承于TaskBase
,是实际协程的实现类,实际协程调度时操作的就是当前Task
继承于TaskBase
的4个任务队列。
该类使用using alias
对promise_type
进行定义并使用私有成员std::coroutine_handle<promise_type> m_coroutineHandle
对promise_type
初始化的协程句柄进行保存。
co_delay语法
co_delay
语法的实现为
#define co_delay co_yield coroutine::task_implement::deferrable{} =
其中deferrable
为一个用来标识和存储延迟任务的结构体,他的定义如下
struct deferrable
{
void *task = nullptr;
std::chrono::milliseconds timeout;
deferrable &&operator=(size_t timeout)
{
this->timeout = std::chrono::milliseconds{timeout};
return std::move(*this);
}
};
可以看到它实际上是使用co_yield
语法对当前协程提交了一个deferrable
,而延迟执行的毫秒数为co_delay
后面跟的立即数。如:co_delay 233;
的延迟时间为233毫秒啊。
co_switch语法
co_switch
语法的实现为
#define co_switch co_yield {}
与co_delay
一样,不过提交的是一个initialize list
,实际走的是一个uint8_t
类型的0
。
co_finally语法
它的使用方法为
co_finally
{
// write any code...
};
co_finally
语法的实现为
#define _COROMAKELAMBDA(x, y) x##y
#define COROMAKELAMBDA(x, y) _COROMAKELAMBDA(x, y)
#define co_finally coroutine::task_implement::__CoroFinally COROMAKELAMBDA(lambda, __COUNTER__) = [&]() -> coroutine::task_implement::Task<void, void>
这里COROMAKELAMBDA
宏用于实现随机的变量名,其实际名为lambda
+序号
,序号由编译器提供的宏__COUNTER__
提供,它的值是它使用的第几次次数的值,从0
开始。
__CoroFinally
类的定义如下
template <typename coro_t>
class __CoroFinally
{
public:
__CoroFinally(coro_t coro)
: m_coro(std::move(coro))
{
}
~__CoroFinally()
{
TaskBase::m_finallyQueue.push({m_coro(), false});
}
private:
coro_t m_coro;
};
可以看到,这里实际上是声明了一个名为lambda
+x
的__CoroFinally
类型的局部变量。
在初始化的时候传入一个返回值和提交值都为void
的一个不完整的lambda
表达式,
其使用用户所编写的代码块进行补全,变成一个完整的lambda
表达式。
在当前协程运行完走出作用域时,由于RAII
的作用__CoroFinally
会将自身持有的协程放入m_finallyQueue
中,然后等待调度器的执行并完成finally
所要执行的工作。
调度器设计
基于上面设计的结构,在编写调度器时我们只需要关注与维护TaskBase
的4个静态成员即可。
单线程调度器
对于单线程调度器,我们只需要在调用run
函数时检测m_taskQueue
与m_delayQueue
是否为空,如果不为空则将协程取出并执行清空即可。
普通任务与延迟任务的调度优先级取决于个人,这里写的流程为
- 检测
m_delayQueue
与m_taskQueue
是否为空,如果都为空则跳出死循环并结束此次的运行。 - 如果
m_delayQueue
不为空,取出头部任务并判断是否已超时,如果已超时则执行取出的任务并从m_delayQueue
中抹除。 - 检测
m_taskQueue
是否为空,如果为空则使用continue
回到死循环的头部开始执行。 - 如果
m_taskQeueu
不为空,取出头部的任务执行,并从m_taskQueue
中抹除。
多线程调度器
先来看一下多线程调度器的成员与任务的转移流程
using channel_t = std::promise<std::pair<void *, void *>> *;
using future_t = std::future<std::pair<void *, void *>>;
constexpr static size_t m_minTasks = 2, m_maxTasks = 3; // 本地任务队列任务数量的最大值与最小值
static size_t m_maxWokers; // 最大工作线程数量
static bool m_work; // 用于控制线程池的开关
static std::mutex m_globalMutex, m_localMutex; // 队列锁
static std::vector<std::thread> m_workers; // 工作线程池
static std::queue<void *> m_globalQueue; // 全局任务队列,给调度器添加任务时会添加到此队列中
static std::queue<channel_t> m_channels; // 用于偷取其他工作线程中的本地任务的通信通道
当我们给调度器添加一个任务时,首先会被添加到m_globalQueue
全局任务队列中,然后每个工作线程会在m_globalQueue
中获取属于自己的任务。
获取到的任务会添加到当前线程的本地队列中,也就是TaskBase
的4个静态成员。
当有空闲线程出现时,它会往m_channels
中添加自己接收其他线程”施舍”的任务的promise
,如果其他线程的本地队列过长,就会将自己的头部任务交给其他在空闲中的线程来执行,自己则执行下一个任务。
run
函数的执行流程
- 检测当前工作线程数量是否大于
m_maxWokers
,如果还不足则添加工作线程至可以执行本次m_globalQueue
队列中任务数量的工作线程数量(最多不超过上限)。
worker
函数的执行流程
由于基础流程与单线程调度器基本一致,所以我们只讲不同的部分。
- 检测
m_globalQueue
是否为不为空且m_taskQueue
本地任务队列存在的任务数量是否小于m_minTasks
。 - 如果小于这个数量则从
m_globalQueue
获取一个任务并放入m_taskQueue
中。 - 当
m_globalQueue
与m_taskQueue
都为空,而m_work
不为false
时,创建一个promise
并放入m_channels
中。 - 每隔1s检测一次
promise
是否已经准备就绪,若已经准备就绪(已经偷到别的线程的本地任务),则从中取出偷取的到的任务并加入m_taskQueue
中。
至此,对于这个协程库的介绍就完了。
结语
这篇文章咕咕咕了好久
那就这样了,有缘再见~