asio
的 c++20
协程的设计十分复杂,并且和目前大多数的协程库有着非常不同的设计思路,通常大多数 c++20
协程的设计都是通过协程本身提供的机制构建一个协程调用链机制。
通常来说,要实现一个 c++20
协程就是通过协程的 promise
对象存储 caller
协程(调用者)的句柄(或干脆将整个调用者协程的 promise
对象指针一股脑存储在当前被调用的子协程的 promise
中)。
然后在子协程退出时(会触发调用 final_suspend
), 在 final_suspend()
中调用 caller
协程的句柄然后 resume
让这个父协程得以继续往下执行。
通常来说,基于上述流程来实现协程库并不算太复杂,而 asio c++20
协程在设计上却大为不同,非常复杂,所以建议看这篇文章对着 asio
的源码来阅读。
它完全通过一个 pump
的机制来驱动协程的运行,要实现 pump
这么一个机制,asio
于是设计了一套类似 thread
的协程结构,称之为 awaitable_thread
,文档中的设计架构如下:
+------------------------------------+
| top_of_stack_ |
| V
+--------------+---+ +-----------------+
| | | |
| awaitable_thread |<---------------------------+ awaitable_frame |
| | attached_thread_ | |
+--------------+---+ (Set only when +---+-------------+
| frames are being |
| actively pumped | caller_
| by a thread, and |
| then only for V
| the top frame.) +-----------------+
| | |
| | awaitable_frame |
| | |
| +---+-------------+
| |
| | caller_
| :
| :
| |
| V
| +-----------------+
| bottom_of_stack_ | |
+------------------------------->| awaitable_frame |
| |
+-----------------+
在这个设计构架图中,awaitable_frame
则是协程调用链上的一个个 Promise
对象指针(这一点很重要),bottom_of_stack_
始终指向协程入口 awaitable_frame
,其中 awaitable_frame
之间通过 caller_
建立起链表关系,代码关系示例如:
class awaitable_frame_base {
awaitable_frame* caller_ = nullptr;
awaitable_thread* attached_thread_ = nullptr;
};
class awaitable_frame : awaitable_frame_base {
awaitable_frame_base* top_of_stack_;
};
有了这个调用链关系,实际上已经描述了整个设计的轮廓,但还远远不够,如 attached_thread_
它在这个结构中,始终处于调用栈顶的那个 awaitable_frame
当中,其它所有 awaitable_frame
都不得存储 attached_thread_
,也就是虽然每个 awaitable_frame
都有成员 attached_thread_
,但只有 top_of_stack_
指向的那个 awaitable_frame
才有资格存储 attached_thread_
,这个 attached_thread_
其实也不是什么特别的东西,它主要就是存储整个协程入口点的 awaitable
对象,成员变量名为 bottom_of_stack_
。
然后我们再来看 class awaitable
这个类型的一些关键点,它是一个标准的 awaiter
定义,下面代码省去一些无关的细节,主要抽出主体来分析
class awaitable {
public:
bool await_ready() const noexcept
{
return false;
}
template <class U>
void await_suspend(std::coroutine_handle<> h)
{
frame_->push_frame(&h.promise());
}
T await_resume()
{
return frame_->get();
}
private:
awaitable_frame* frame_;
};
class awaitable_thread {
public:
void launch()
{
bottom_of_stack_.frame_->top_of_stack_->attach_thread(this);
pump();
}
void pump()
{
do
bottom_of_stack_.frame_->top_of_stack_->resume();
while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_);
}
private:
// 协程入口 awaitable 对象
awaitable<awaitable_thread_entry_point> bottom_of_stack_;
};
所以,可以看到,我们可以通过 attached_thread_
对象指针得到 bottom_of_stack_
,再通过 bottom_of_stack_
可以得到入口协程的 awaitable_frame
信息,同时在这个 awaitable_frame
信息中也可得到 top_of_stack_
信息,而 top_of_stack_.frame_
本身的 caller_
指针就只能是空了(若是顶层协程)。
有了上面信息,再来看一个 asio
协程的异步具体是如何驱动的,这是最关键的地方,若有 asio
协程代码如下:
net::awaitable<void> start_market_server()
{
auto executor = co_await net::this_coro::executor;
net::steady_timer timer(executor);
timer.expires_after(1s);
co_await timer.async_wait(net::use_awaitable);
}
这里重点分析 co_await timer.async_wait(net::use_awaitable);
这行代码的执行流程。
当然, 这是由 use_awaitable
和 async_result
类型通过 async_initiate
函数中使用 async_result
偏特化到指定的类型中(async_result
机制是 asio
定制 CompletionToken
的核心机制,这里不详谈),这里会匹配到一个如下定义的 async_result
类型,其中还有一个 static
的协程函数,代码示例如下(为了便于分析,同样去掉了很多无关信息)。
class async_result<use_awaitable_t>
{
typedef typename awaitable_handler<> handler_type;
typedef typename handler_type::awaitable_type return_type;
static handler_type* do_init(
auto* frame, Initiation& initiation,
use_awaitable_t u, InitArgs&... args)
{
(void)u;
handler_type handler(frame->detach_thread());
std::move(initiation)(std::move(handler), std::move(args)...);
return nullptr;
}
template <typename Initiation, typename... InitArgs>
static return_type initiate(Initiation initiation,
use_awaitable_t u, InitArgs... args)
{
co_await [&] (auto* frame)
{
return do_init(frame, initiation, u, args...);
};
for (;;) {} // Never reached.
}
};
重点在这个 initiate
协程中, 这个协程会根据 return_type
(推导后实际上也是 awaitable
类型) 构造一个 frame
,也就是 initiate
协程的帧,这个 frame
并存储在返回类型的 awaitable
类型的临时对象中, 然后由用户代码中的 co_await
调用起这个 initiate
协程。
当执行 co_await
在这个 initiate
协程上,就会在新构造的 awaitable
的 await_suspend
中执行 frame->push_frame
将 caller
保存到当前 frame
中的 caller_
成员中,同时将 attached_thread_
这个指针从 caller
中转移到当前 frame
中,同时还修改 entry_point
的 top_of_stack_
为当前 frame
(也就是 initiate
协程的帧),需要注意的是 entry_point
就是 bottom_of_stack_.frame_
, 也就是入口点协程,top_of_stack_
就是在入口点协程 frame
上的,即是:bottom_of_stack_.frame_->top_of_stack_
。
当上述流程执行完后,会返回到 caller
的 pump()
中循环当中,也就是 top_of_stack_->resume();
,紧接着执行循环条件判断
do
bottom_of_stack_.frame_->top_of_stack_->resume();
while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_);
由于在前面保存了 top_of_stack_
为当前 frame
(即:async_result::initiate
协程),所以条件继续为 true
, 继续循环,执行当前 initiate
协程帧的 frame->resume()
,从而开始运行 initiate
这个协程函数。
在这个 initiate
协程函数中,co_await [&] (auto* frame)
将会被调用触发 await_transform(Function f)
调用,在这里面的 await_suspend
里设置 frame->resume_context_
这个临时变量 resume_context
指针,以便在 resume()
调用完成后可执行 [&] (auto* frame)
这个 lambda
函数,在这个 lambda
中, 参数 frame
指针是当前 initiate
协程的帧。
在 initiate
协程中,由于只 resume()
了一次,因此只会执行一次 co_await
,后面的代码则不再会有机会得到执行
template <typename Initiation, typename... InitArgs>
static return_type initiate(Initiation initiation,
use_awaitable_t u, InitArgs... args)
{
co_await [&] (auto* frame)
{
return do_init(frame, initiation, u, args...);
};
for (;;) {} // 永远不会执行到的地方
}
这就是这个代码的的奇怪之处,再回到上面 lambda
函数的执行流程中,do_init
中将当前 frame
的 attached_thread_
转移到 awaitable_handler
中(awaitable_handler
就是 attached_thread_
相同基类 awaitable_thread
的类型)的叫 handler
的对象中,然后通过 async_initiation
的异步启动函数对象发起实际异步,这样 awaitable_handler
就被携带在发起的异步 op
(通过 operation
类型擦除)中了,initiate
协程从而实现了和 async_initiation
中 Initiation
的桥接起来,使得用户实现 async_initiation
中的 Initiation
函数对象能在回调或协程等不同形式的 CompletionToken
下适配。
需要注意的是,在 lambda
中的代码:
handler_type handler(frame->detach_thread());
std::move(initiation)(std::move(handler), std::move(args)...);
第一行代码它会将 awaitable_thread
对象 move
复制到 handler
对象,在这个 move
操作中非常关键的是 move
构造函数
awaitable_thread(awaitable_thread&& other) noexcept
: bottom_of_stack_(std::move(other.bottom_of_stack_))
{
}
在这个 move
构造函数中,它会将之前的 awaitable_thread
对象中的 bottom_of_stack_
对象的 frame
通过 move
构造到当前 awaitable_thread
对象的 bottom_of_stack_
的 frame
,在这里的 awaitable_thread
也就是 awaitable_handler
对象,这里也将导致原来的 awaitable_thread
对象的 frame
变为空指针,这一点十分重要。
第二行 std::move(initiation)(std::move(handler), std::move(args)...);
即是 async_initiation
中 Initiation
对象,正是通过此 lambda
桥接到它,在 async_initiation
中 Initiation
真正发起异步操作。
在 lambda
执行完成后,await_transform(Function f)
调用就会返回,同时会返回到 initiate
协程上次 resume()
的地方,也就是 pump()
这个函数的循环当中,然后再判断循环条件时
while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_);
此时,因为之前在 lambda
中经历了 move
构造,awaitable_thread::pump()
函数的当前 awaitable_thread
的 bottom_of_stack_.frame_
就被转移到了 awaitable_handler
对象的 bottom_of_stack_.frame_
中了,所以此时这里条件不成立因此退出 pump()
函数,执行流程又再次返回到 asio 的 io_context
事件循环当中。
当异步操作完成,事件循环再次触发 op
的回调,这个 op
通过类型恢复后,它就是前面说的 awaitable_handler
对象,事件循环 run
将调用 awaitable_handler
对象的 operator()
函数,下面拿其中一个版本来尝试说明
template <typename Arg0, typename Arg1>
void operator()(Arg0&& arg0, Arg1&& arg1)
{
this->frame()->attach_thread(this);
if constexpr (is_disposition<T0>::value)
{
if (arg0 == no_error)
this->frame()->return_value(std::forward<Arg1>(arg1));
else
this->frame()->set_disposition(std::forward<Arg0>(arg0));
}
else
{
this->frame()->return_values(std::forward<Arg0>(arg0),
std::forward<Arg1>(arg1));
}
this->frame()->clear_cancellation_slot();
this->frame()->pop_frame();
this->pump();
}
awaitable_handler
的 operator()(Arg0&& arg0, Arg1&& arg1)
函数,在这个函数当中,通过 attach_thread
到当前 this
也就是 awaitable_thread
指针(即 awaitable_handler
本身),并将上面 arg0, arg1
设置到当前 frame
当中(注意,当前 frame
也就是之前 initiate
协程的 frame
指针), 紧接着调用 pop_frame()
将原来存储在 frame
当中的 caller_
作为 top_of_stack_
,以及将 attached_thread_
指针转移到 caller_
中的 attached_thread_
指针上,并清空当前 frame
的 caller_
和 attached_thread_
成员,然后再执行 pump()
,在 pump()
调用中,实际上调用的是 caller_
的 resume()
, 从而将唤醒 caller_
协程,也就是用户代码中:
co_await timer.async_wait(net::use_awaitable);
此时,将会执行协程 awaitable
对象的 await_resume()
以获取返回值
T await_resume()
{
return awaitable(static_cast<awaitable&&>(*this)).frame_->get();
}
这里关键的一点就是协程返回值是存储在 initiate
协程的 frame_
指针上(也就是它的 promise
类型中的 result_
),await_resume()
的返回值就是整个异步协程调用的返回值,这里是实现了协程返回值的关键。
当整个 co_await
语句包括返回值都执行完成后,awaitable
这个临时对象,就会进入析构,此时才最终删除 initiate
协程的 promise
上所有信息,同时也包括 std::coroutine_handle
指针(coro_.destroy();
), 此时才总算圆满的将整个 co_await
一个 asio
协程流程执行完成。
通过上面的分析,可以看到 asio
协程的设计思路非常不一般,虽然十分复杂,却也非常精巧,而且并没有多余的开销实现,没有 new
,没有锁,也没有系统调用,仅是一些必要的逻辑语句,因此十分高效。
由于 asio
协程的复杂度极高,再次建议看这篇文章同时对着 asio
源码来阅读,这才能抽丝剥茧一般解惑 asio
协程的设计。