Rust future 浅析
学习 rust 异步编程时需要区分三个库 future, futures 和 tokio:
- future 是 rust 中对于异步行为的抽象,是标准库的一部分
- futures 是处理 future 的工具集,提供了 join! 之类方便的 api
- tokio 是 rust 的一个异步运行时,可以执行 future,也提供了异步读写等基础 future 能力(这类 future 也被称为 leaf future),是当前“事实上”的异步运行时标准库
本篇文章着眼于 future 的使用,重点关注
- 标准库中 future 的定义
- futures 提供的工具
- tokio 提供了哪些 leaf future
future
future.rs
标准库 future 中定义的 Future 是一个 trait
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
Future 中的 poll 方法是由运行时调用的,运行时会循环调用 poll,直到返回值为 Ready
如何执行一个 future?
- 在异步代码块中,“等待运行时调用 poll 得到 Ready\<Output>” 的过程用
.await语法来表达,每个 future 都可以调用这个方法
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
sleep(Duration::from_millis(100)).await;
println!("100 ms have elapsed");
}
这里使用了异步运行时 tokio 提供的异步时间响应能力,sleep 调用返回的正是一个 future。
- 在同步代码块中,我们是不能直接调用 poll 的,就像上一段说的,poll 是给运行时的,我们需要做的是启用一个运行时来等待 future 运行结束得到结果
use tokio::runtime::Runtime;
use tokio::time::{sleep, Duration};
// Create the runtime
let rt = Runtime::new().unwrap();
// Execute the future, blocking the current thread until completion
rt.block_on(async {
sleep(Duration::from_millis(100)).await;
println!("100 ms have elapsed");
});
当然,也可以用 rt.spawn 运行异步代码,不等待返回值结果
- 在一个 future 的 poll 实现中执行 future,这里就需要直接调用 future 的 poll 方法了。
为什么这里不能按照同步代码的方式处理了?“只缘身在此山中”。
放一个 stackoverflow 的例子 How to sleep in Future::poll()?
pin_project_lite::pin_project! {
struct MyDelay {
#[pin]
sleep: tokio::time::Sleep,
}
}
impl MyDelay {
fn new() -> Self {
Self {
sleep: tokio::time::sleep(Duration::from_secs(1)),
}
}
}
impl Future for MyDelay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("poll");
let this = self.project();
this.sleep.poll(cx)
}
}
crate future
标准库 future 中还定义了其他的一些结构
-
into_future.rs
这也是一个 trait,功能其实和 future 挺像的,但是更适合用于 builder 的场景,用来把一些既有类型转化为 future
#[stable(feature = "into_future", since = "1.64.0")] pub trait IntoFuture { /// The output that the future will produce on completion. #[stable(feature = "into_future", since = "1.64.0")] type Output; /// Which kind of future are we turning this into? #[stable(feature = "into_future", since = "1.64.0")] type IntoFuture: Future<Output = Self::Output>; #[stable(feature = "into_future", since = "1.64.0")] #[lang = "into_future"] fn into_future(self) -> Self::IntoFuture; }已经为 Future 实现了 IntoFuture,功能也是简单的返回自己
-
join.rs
这个文件里实现了可以等待多个 future 完成的
join!join!的实现使用了类型递归以支持可变参数,然后遍历了所有 future 直到结果都变成了 Ready注意!!!
这里有个容易忽视的点:一个 Future trait 在 poll 结果返回了 Ready 之后是不一定能再调用 poll(可能触发未定义行为)。
所以 join.rs 中实现了
enum MaybeDone<F: Future>,它暂存 Future,并在 poll 完成后保存了 Ready 的返回过结果以保证可以继续 pollpub enum MaybeDone<F: Future> { Future(F), Done(F::Output), Taken, } impl<F: Future> Future for MaybeDone<F> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // SAFETY: pinning in structural for `f` unsafe { // Do not mix match ergonomics with unsafe. match *self.as_mut().get_unchecked_mut() { MaybeDone::Future(ref mut f) => { let val = Pin::new_unchecked(f).poll(cx).ready()?; self.set(Self::Done(val)); } MaybeDone::Done(_) => {} MaybeDone::Taken => unreachable!(), } } Poll::Ready(()) } }这里的 Taken 是一个比较有意思的设计,应该是为了所有权的转移,结果只允许取一次,被取结果后就会被设置为 Taken
-
pending.rs
提供了一个永远都不会结束的
struct Pending<T>pub struct Pending<T> { _data: marker::PhantomData<fn() -> T>, } impl<T> Future for Pending<T> { type Output = T; fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> { Poll::Pending } }也不知道用在哪,可能是专门来为难运行时的吧
使用的时候一般是调用
pending函数来创建 -
ready.rs
这里和 pending 很像,定义了一个
struct Readypub struct Ready<T>(Option<T>); impl<T> Future for Ready<T> { type Output = T; #[inline] fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> { Poll::Ready(self.0.take().expect("`Ready` polled after completion")) } }注意,struct 中包裹的不是 T 而是
Option<T>这是因为 Future 的 poll 中的 self 类型为
Pin<&mut Self>也就是说如果 future 想调用 poll,必须在外面有一层 Pin 保证内存不会发生移动,而 Option 就是为了破解这个移动和不移动的钥匙
关于 future,pin 和 unpin,可以看这篇文章了解更多细节
同样的,提供了 ready 函数作为 maker
-
poll_fn.rs
Pending 和 Ready 都有了,不来个函数的 warpper?
其实严格说来,前两者是对结果封装的 future,而对函数封装的 future 显然更灵活。 只能说没有忘本吧,毕竟是 MozCaml
pub struct PollFn<F> { f: F, } impl<T, F> Future for PollFn<F> where F: FnMut(&mut Context<'_>) -> Poll<T>, { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { // SAFETY: We are not moving out of the pinned field. (unsafe { &mut self.get_unchecked_mut().f })(cx) } }同样以函数作为入口
futures
future 也是有历史的,而历史就来自于 futures 库,这才是 rust 异步编程正统所在
更多细节请看这里
futures 库由多个子库组成,包括
- futures-core
- futures-io
- futures-sink
- futures-channel
- futures-task
- futures-macro
- futures-util
- futures-executor
futures-core
-
aotmic_waker.rs
这个文件中定义了一个单生产者多消费者的
struct AtomicWaker这是一个用于标记 future 可继续执行并唤醒 executor 的数据结构,支持处理重复唤醒这样涉及运行时的结构不在本文详述,下同
-
future.rs
- type BoxFuture: 动态 Future 的封装类型
- type LocalBoxFuture: 动态 Future,不支持线程间传递
- trait FusedFuture: 提供了
is_terminated方法用于判断 Future 是否已经结束,这种结束既可能是已经 Ready,也可能是某种原因取消丢弃 - TryFuture: 特指 Output 类型为
Result<T, E>的 Future,这是一类很常用的 Future,可以单独拿出来做些处理
-
stream.rs
这里定义了一个和 Future 同等重要的
trait Stream,它本质是异步化的Iterator<Item>pub trait Stream { /// Values yielded by the stream. type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; fn size_hint(&self) -> (usize, Option<usize>) { (0, None) } }它可以一直 poll_next,直到返回 None
类似 Future 还有其他相关定义:
- FusedStream: 一个添加了
is_terminated方法的 trait - TryStream: Item 类型为
Result<T, E>的 trait
- FusedStream: 一个添加了
futures-sink
如果说 stream 是 item generator,那么 sink 就是 item eater
注意,和 trait Future, trait Stream 不同, trait Sink<Item> 需要显示声明
Item,感觉风格有点脱节?
sink 需要实现的方法也有点多
pub trait Sink<Item> {
/// The type of value produced by the sink when an error occurs.
type Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}
发送数据时使用 start_send ,检测可以继续发送使用 poll_ready ,检测之前发送的已经都发送成功了用
poll_flush ,结束调用 poll_close 清理资源
futures-io
定义了四个 trait:
trait AsyncRead, 需要实现poll_read和poll_read_vectoredtrait AsyncWrite, 需要实现poll_write,poll_write_vectored,poll_flush和poll_closetrait AsyncSeek, 需要实现poll_seektrait AsyncBufRead, 继承了trait AsyncRead, 另外还要求实现poll_fill_buf和consume
这四个 trait 也对应了同步的 std::io::{Read, Write, Seek, BufRead}
futures-channel
主要有几个结构
- Sender\<T>
- Receiver\<T>
- Lock\<T>,只有 try_lock 一个方法
- mpsc::Queue
- mpsc::SenderT>, mpsc::UnboundedSender\<T>
- mpsc::Receiver\<T>, mpsc::UnboundedReceiver\<T>
futures-task
除了封装 Future pointer 的几个结构 FutureObj, LocalFutureObj, 和
UnsafeFutureObj 之外,都是 waker 相关的数据结构,实现相关,暂不细究
futures-macro
都是比较复杂的宏实现,包括
join!: 功能和标准库中的join!类似,看着实现更复杂,不知道能力上有多少区别select!: 支持同时等待多个 future 完成并取得结果进行下一步操作,似乎是比 join 更高级的处理stream_select!: 更像是 stream merge 的功能,多个 stream 合并成为一个 stream
futures-util
这才是重头戏啊,包括上面提到的很多功能都是从这个模块暴露出来的!
-
abortable.rs
核心是实现了一个
struct Abortable<T>,这个结构体包裹的 future / stream 可以在 abort 行为过后让返回值都变成 None使用如下:
-
fns.rs
看着是定义了魔法函数
trait FnOnce1<A>和trait FnMut1<A>,然后为特定类型实现了一些魔法函数转换,有 Adapter 的感觉 -
never.rs
这是一个永远没有正常值的类型
pub type Never = core::convert::Infallible; -
unfold_state.rs
enum UnfoldState看着是一个结果的封装,遇到再看 -
async_await
- join_mod.rs, 暴露 macro
join!和try_join!,并提供文档 - pending.rs, 提供了
pending_once,顾名思义,只能 pending 一次 - poll.rs, 提供了一个
poll,返回的是 PollOnce,这家伙直接 Ready,但是返回值还是一个 Poll,有点没明白使用场景 - random.rs, 大概很多人都期望有异步 random 吧,实际这是个纯纯的同步计算,完全没异步余地
- select.rs, 暴露
select! - stream_select.rs, 暴露
stream_select!
- join_mod.rs, 暴露 macro
-
task
暴露 spawn 方法
为
trait Spawn实现trait SpawnExt,包括spawn和spawn_with_handle两个方法还有一个本地版本的
trait LocalSpawnExt -
lock
封装了同步原语
- Mutex, 就是常规的协程锁
- BiLock, 为两方争抢资源高度优化过的锁
-
compat
兼容 future 0.1
-
future
-
future/
- catch_unwind.rs, 是
std::panic::catch_unwind的协程化包装 - flatten.rs, 定义了
enum Flatten<Fut1, Fut2>看着像是带 Empty 的 Either - fuse.rs, 提供
struct Fuse的定义,有 is_terminated 方法 - map.rs, 为 future 提供 map 语义,通过函数指明下面要做的工作
- remote_handle.rs, 提供跨线程的 future 等待处理能力
- shared.rs, 跨线程引用 future 的能力
- mod.rs, 定义了核心的
trait FutureExt扩展了 future 的能力
fn map<U, F>(self, f: F) -> Map<Self, F> ⓘ where F: FnOnce(Self::Output) -> U, Self: Sized, {} fn map_into<U>(self) -> MapInto<Self, U> ⓘ where Self::Output: Into<U>, Self: Sized, {} fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> ⓘ where F: FnOnce(Self::Output) -> Fut, Fut: Future, Self: Sized, {} fn left_future<B>(self) -> Either<Self, B> ⓘ where B: Future<Output = Self::Output>, Self: Sized, {} fn right_future<A>(self) -> Either<A, Self> ⓘ where A: Future<Output = Self::Output>, Self: Sized, {} fn into_stream(self) -> IntoStream<Self> where Self: Sized, {} fn flatten(self) -> Flatten<Self> ⓘ where Self::Output: Future, Self: Sized, {} fn flatten_stream(self) -> FlattenStream<Self> where Self::Output: Stream, Self: Sized, {} fn fuse(self) -> Fuse<Self> ⓘ where Self: Sized, {} fn inspect<F>(self, f: F) -> Inspect<Self, F> ⓘ where F: FnOnce(&Self::Output), Self: Sized, {} fn catch_unwind(self) -> CatchUnwind<Self> ⓘ where Self: Sized + UnwindSafe, {} fn shared(self) -> Shared<Self> ⓘ where Self: Sized, Self::Output: Clone, {} fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) where Self: Sized, {} fn boxed<'a>( self ) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a, Global>> where Self: Sized + Send + 'a, {} fn boxed_local<'a>( self ) -> Pin<Box<dyn Future<Output = Self::Output> + 'a, Global>> where Self: Sized + 'a, {} fn unit_error(self) -> UnitError<Self> ⓘ where Self: Sized, {} fn never_error(self) -> NeverError<Self> ⓘ where Self: Sized, {} fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> where Self: Unpin, {} fn now_or_never(self) -> Option<Self::Output> where Self: Sized, {}可以看到,基本是函数式链式处理那一套方法和一些类型转换相关方法
也就是说这个库的使用者认为 future 配合链式调用才是上策
- catch_unwind.rs, 是
-
TryFutureExt/
核心还是
trait TryFutureExt,它也实现了很多的方法,适合链式调用 -
./
fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)enum Either<A, B>fn join_all<I>(iter: I) -> JoinAll<I::Item>fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2>fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut>struct OptionFuture<F>, 用于处理 Option\<T> 转换的 Futurefn pending<T>() -> Pending<T>fn poll_fn<T, F>(f: F) -> PollFn<F>fn poll_immediate<F: Future>(f: F) -> PollImmediate<F>, 马上返回,没成功就返回 Nonefn ready<T>(t: T) -> Ready<T>fn select_all<I>(iter: I) -> SelectAll<I::Item>fn select_ok<I>(iter: I) -> SelectOk<I::Item>fn select<A, B>(future1: A, future2: B) -> Select<A, B>fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item>fn try_join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> TryJoin<Fut1, Fut2>fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut>fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B>
-
-
io
-
sink
-
stream