跳转至

Rust future 浅析

学习 rust 异步编程时需要区分三个库 future, futures 和 tokio:

  • future 是 rust 中对于异步行为的抽象,是标准库的一部分
  • futures 是处理 future 的工具集,提供了 join! 之类方便的 api
  • tokio 是 rust 的一个异步运行时,可以执行 future,也提供了异步读写等基础 future 能力(这类 future 也被称为 leaf future),是当前“事实上”的异步运行时标准库

本篇文章着眼于 future 的使用,重点关注

  • 标准库中 future 的定义
  • futures 提供的工具
  • tokio 提供了哪些 leaf future

future

future.rs

标准库 future 中定义的 Future 是一个 trait

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

Future 中的 poll 方法是由运行时调用的,运行时会循环调用 poll,直到返回值为 Ready

如何执行一个 future?

  1. 在异步代码块中,“等待运行时调用 poll 得到 Ready\<Output>” 的过程用 .await 语法来表达,每个 future 都可以调用这个方法
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    sleep(Duration::from_millis(100)).await;
    println!("100 ms have elapsed");
}

这里使用了异步运行时 tokio 提供的异步时间响应能力,sleep 调用返回的正是一个 future。

  1. 在同步代码块中,我们是不能直接调用 poll 的,就像上一段说的,poll 是给运行时的,我们需要做的是启用一个运行时来等待 future 运行结束得到结果
use tokio::runtime::Runtime;
use tokio::time::{sleep, Duration};

// Create the runtime
let rt = Runtime::new().unwrap();

// Execute the future, blocking the current thread until completion
rt.block_on(async {
    sleep(Duration::from_millis(100)).await;
    println!("100 ms have elapsed");
});

当然,也可以用 rt.spawn 运行异步代码,不等待返回值结果

  1. 在一个 future 的 poll 实现中执行 future,这里就需要直接调用 future 的 poll 方法了。

为什么这里不能按照同步代码的方式处理了?“只缘身在此山中”。

放一个 stackoverflow 的例子 How to sleep in Future::poll()?

pin_project_lite::pin_project! {
    struct MyDelay {
        #[pin]
        sleep: tokio::time::Sleep,
    }
}

impl MyDelay {
    fn new() -> Self {
        Self {
            sleep: tokio::time::sleep(Duration::from_secs(1)),
        }
    }
}

impl Future for MyDelay {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("poll");
        let this = self.project();
        this.sleep.poll(cx)
    }
}

crate future

标准库 future 中还定义了其他的一些结构

  1. into_future.rs

    这也是一个 trait,功能其实和 future 挺像的,但是更适合用于 builder 的场景,用来把一些既有类型转化为 future

    #[stable(feature = "into_future", since = "1.64.0")]
    pub trait IntoFuture {
        /// The output that the future will produce on completion.
        #[stable(feature = "into_future", since = "1.64.0")]
        type Output;
    
        /// Which kind of future are we turning this into?
        #[stable(feature = "into_future", since = "1.64.0")]
        type IntoFuture: Future<Output = Self::Output>;
    
            #[stable(feature = "into_future", since = "1.64.0")]
        #[lang = "into_future"]
        fn into_future(self) -> Self::IntoFuture;
    }
    

    已经为 Future 实现了 IntoFuture,功能也是简单的返回自己

    #[stable(feature = "into_future", since = "1.64.0")]
    impl<F: Future> IntoFuture for F {
        type Output = F::Output;
        type IntoFuture = F;
    
        fn into_future(self) -> Self::IntoFuture {
            self
        }
    }
    
  2. join.rs

    这个文件里实现了可以等待多个 future 完成的 join!

    join! 的实现使用了类型递归以支持可变参数,然后遍历了所有 future 直到结果都变成了 Ready

    注意!!!

    这里有个容易忽视的点:一个 Future trait 在 poll 结果返回了 Ready 之后是不一定能再调用 poll(可能触发未定义行为)。

    所以 join.rs 中实现了 enum MaybeDone<F: Future> ,它暂存 Future,并在 poll 完成后保存了 Ready 的返回过结果以保证可以继续 poll

    pub enum MaybeDone<F: Future> {
        Future(F),
        Done(F::Output),
        Taken,
    }
    
    impl<F: Future> Future for MaybeDone<F> {
        type Output = ();
    
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            // SAFETY: pinning in structural for `f`
            unsafe {
                // Do not mix match ergonomics with unsafe.
                match *self.as_mut().get_unchecked_mut() {
                    MaybeDone::Future(ref mut f) => {
                        let val = Pin::new_unchecked(f).poll(cx).ready()?;
                        self.set(Self::Done(val));
                    }
                    MaybeDone::Done(_) => {}
                    MaybeDone::Taken => unreachable!(),
                }
            }
    
            Poll::Ready(())
        }
    }
    

    这里的 Taken 是一个比较有意思的设计,应该是为了所有权的转移,结果只允许取一次,被取结果后就会被设置为 Taken

    impl<F: Future> MaybeDone<F> {
        pub fn take_output(&mut self) -> Option<F::Output> {
            match *self {
                MaybeDone::Done(_) => match mem::replace(self, Self::Taken) {
                    MaybeDone::Done(val) => Some(val),
                    _ => unreachable!(),
                },
                _ => None,
            }
        }
    }
    
  3. pending.rs

    提供了一个永远都不会结束的 struct Pending<T>

    pub struct Pending<T> {
        _data: marker::PhantomData<fn() -> T>,
    }
    
    impl<T> Future for Pending<T> {
        type Output = T;
    
        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
            Poll::Pending
        }
    }
    

    也不知道用在哪,可能是专门来为难运行时的吧

    使用的时候一般是调用 pending 函数来创建

    pub fn pending<T>() -> Pending<T> {
        Pending { _data: marker::PhantomData }
    }
    
  4. ready.rs

    这里和 pending 很像,定义了一个 struct Ready

    pub struct Ready<T>(Option<T>);
    
    impl<T> Future for Ready<T> {
        type Output = T;
    
        #[inline]
        fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
            Poll::Ready(self.0.take().expect("`Ready` polled after completion"))
        }
    }
    

    注意,struct 中包裹的不是 T 而是 Option<T>

    这是因为 Future 的 poll 中的 self 类型为 Pin<&mut Self>

    也就是说如果 future 想调用 poll,必须在外面有一层 Pin 保证内存不会发生移动,而 Option 就是为了破解这个移动和不移动的钥匙

    关于 future,pin 和 unpin,可以看这篇文章了解更多细节

    同样的,提供了 ready 函数作为 maker

    pub fn ready<T>(t: T) -> Ready<T> {
        Ready(Some(t))
    }
    
  5. poll_fn.rs

    Pending 和 Ready 都有了,不来个函数的 warpper?

    其实严格说来,前两者是对结果封装的 future,而对函数封装的 future 显然更灵活。 只能说没有忘本吧,毕竟是 MozCaml

    pub struct PollFn<F> {
        f: F,
    }
    
    impl<T, F> Future for PollFn<F>
    where
        F: FnMut(&mut Context<'_>) -> Poll<T>,
    {
        type Output = T;
    
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
            // SAFETY: We are not moving out of the pinned field.
            (unsafe { &mut self.get_unchecked_mut().f })(cx)
        }
    }
    

    同样以函数作为入口

    pub fn poll_fn<T, F>(f: F) -> PollFn<F>
    where
        F: FnMut(&mut Context<'_>) -> Poll<T>,
    {
        PollFn { f }
    }
    

futures

future 也是有历史的,而历史就来自于 futures 库,这才是 rust 异步编程正统所在

更多细节请看这里

futures 库由多个子库组成,包括

  • futures-core
  • futures-io
  • futures-sink
  • futures-channel
  • futures-task
  • futures-macro
  • futures-util
  • futures-executor

futures-core

  1. aotmic_waker.rs

    这个文件中定义了一个单生产者多消费者的 struct AtomicWaker 这是一个用于标记 future 可继续执行并唤醒 executor 的数据结构,支持处理重复唤醒

    这样涉及运行时的结构不在本文详述,下同

  2. future.rs

    • type BoxFuture: 动态 Future 的封装类型
    • type LocalBoxFuture: 动态 Future,不支持线程间传递
    • trait FusedFuture: 提供了 is_terminated 方法用于判断 Future 是否已经结束,这种结束既可能是已经 Ready,也可能是某种原因取消丢弃
    • TryFuture: 特指 Output 类型为 Result<T, E> 的 Future,这是一类很常用的 Future,可以单独拿出来做些处理
  3. stream.rs

    这里定义了一个和 Future 同等重要的 trait Stream ,它本质是异步化的 Iterator<Item>

    pub trait Stream {
        /// Values yielded by the stream.
        type Item;
    
        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
    
        fn size_hint(&self) -> (usize, Option<usize>) {
            (0, None)
        }
    }
    

    它可以一直 poll_next,直到返回 None

    类似 Future 还有其他相关定义:

    • FusedStream: 一个添加了 is_terminated 方法的 trait
    • TryStream: Item 类型为 Result<T, E> 的 trait

futures-sink

如果说 stream 是 item generator,那么 sink 就是 item eater

注意,和 trait Future, trait Stream 不同, trait Sink<Item> 需要显示声明 Item,感觉风格有点脱节?

sink 需要实现的方法也有点多

pub trait Sink<Item> {
    /// The type of value produced by the sink when an error occurs.
    type Error;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

发送数据时使用 start_send ,检测可以继续发送使用 poll_ready ,检测之前发送的已经都发送成功了用 poll_flush ,结束调用 poll_close 清理资源

futures-io

定义了四个 trait:

  • trait AsyncRead, 需要实现 poll_readpoll_read_vectored
  • trait AsyncWrite, 需要实现 poll_write, poll_write_vectored, poll_flushpoll_close
  • trait AsyncSeek, 需要实现 poll_seek
  • trait AsyncBufRead, 继承了 trait AsyncRead, 另外还要求实现 poll_fill_bufconsume

这四个 trait 也对应了同步的 std::io::{Read, Write, Seek, BufRead}

futures-channel

主要有几个结构

  • Sender\<T>
  • Receiver\<T>
  • Lock\<T>,只有 try_lock 一个方法
  • mpsc::Queue
  • mpsc::SenderT>, mpsc::UnboundedSender\<T>
  • mpsc::Receiver\<T>, mpsc::UnboundedReceiver\<T>

futures-task

除了封装 Future pointer 的几个结构 FutureObj, LocalFutureObj, 和 UnsafeFutureObj 之外,都是 waker 相关的数据结构,实现相关,暂不细究

futures-macro

都是比较复杂的宏实现,包括

  • join!: 功能和标准库中的 join! 类似,看着实现更复杂,不知道能力上有多少区别
  • select!: 支持同时等待多个 future 完成并取得结果进行下一步操作,似乎是比 join 更高级的处理
  • stream_select!: 更像是 stream merge 的功能,多个 stream 合并成为一个 stream

futures-util

这才是重头戏啊,包括上面提到的很多功能都是从这个模块暴露出来的!

  1. abortable.rs

    核心是实现了一个 struct Abortable<T> ,这个结构体包裹的 future / stream 可以在 abort 行为过后让返回值都变成 None

    使用如下:

    use futures::future::{Abortable, AbortHandle};
    use futures::stream::{self, StreamExt};
    
    let (abort_handle, abort_registration) = AbortHandle::new_pair();
    let mut stream = Abortable::new(stream::iter(vec![1, 2, 3]), abort_registration);
    abort_handle.abort();
    assert_eq!(stream.next().await, None);
    
  2. fns.rs

    看着是定义了魔法函数 trait FnOnce1<A>trait FnMut1<A> ,然后为特定类型实现了一些魔法函数转换,有 Adapter 的感觉

  3. never.rs

    这是一个永远没有正常值的类型

    pub type Never = core::convert::Infallible;

  4. unfold_state.rs

    enum UnfoldState 看着是一个结果的封装,遇到再看

  5. async_await

    • join_mod.rs, 暴露 macro join!try_join! ,并提供文档
    • pending.rs, 提供了 pending_once ,顾名思义,只能 pending 一次
    • poll.rs, 提供了一个 poll ,返回的是 PollOnce,这家伙直接 Ready,但是返回值还是一个 Poll,有点没明白使用场景
    • random.rs, 大概很多人都期望有异步 random 吧,实际这是个纯纯的同步计算,完全没异步余地
    • select.rs, 暴露 select!
    • stream_select.rs, 暴露 stream_select!
  6. task

    暴露 spawn 方法

    trait Spawn 实现 trait SpawnExt ,包括 spawnspawn_with_handle 两个方法

    还有一个本地版本的 trait LocalSpawnExt

  7. lock

    封装了同步原语

    • Mutex, 就是常规的协程锁
    • BiLock, 为两方争抢资源高度优化过的锁
  8. compat

    兼容 future 0.1

  9. future

    1. future/

      • catch_unwind.rs, 是 std::panic::catch_unwind 的协程化包装
      • flatten.rs, 定义了 enum Flatten<Fut1, Fut2> 看着像是带 Empty 的 Either
      • fuse.rs, 提供 struct Fuse 的定义,有 is_terminated 方法
      • map.rs, 为 future 提供 map 语义,通过函数指明下面要做的工作
      • remote_handle.rs, 提供跨线程的 future 等待处理能力
      • shared.rs, 跨线程引用 future 的能力
      • mod.rs, 定义了核心的 trait FutureExt 扩展了 future 的能力
      fn map<U, F>(self, f: F) -> Map<Self, F> 
      where
          F: FnOnce(Self::Output) -> U,
          Self: Sized,
      {}
      
      fn map_into<U>(self) -> MapInto<Self, U> 
      where
          Self::Output: Into<U>,
          Self: Sized,
      {}
      
      fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> 
      where
          F: FnOnce(Self::Output) -> Fut,
          Fut: Future,
          Self: Sized,
      {}
      
      fn left_future<B>(self) -> Either<Self, B> 
      where
          B: Future<Output = Self::Output>,
          Self: Sized,
      {}
      
      fn right_future<A>(self) -> Either<A, Self> 
      where
          A: Future<Output = Self::Output>,
          Self: Sized,
      {}
      
      fn into_stream(self) -> IntoStream<Self>
      where
          Self: Sized,
      {}
      
      fn flatten(self) -> Flatten<Self> 
      where
          Self::Output: Future,
          Self: Sized,
      {}
      
      fn flatten_stream(self) -> FlattenStream<Self>
      where
          Self::Output: Stream,
          Self: Sized,
      {}
      
      fn fuse(self) -> Fuse<Self> 
      where
          Self: Sized,
      {}
      
      fn inspect<F>(self, f: F) -> Inspect<Self, F> 
      where
          F: FnOnce(&Self::Output),
          Self: Sized,
      {}
      
      fn catch_unwind(self) -> CatchUnwind<Self> 
      where
          Self: Sized + UnwindSafe,
      {}
      
      fn shared(self) -> Shared<Self> 
      where
          Self: Sized,
          Self::Output: Clone,
      {}
      
      fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)
      where
          Self: Sized,
      {}
      
      fn boxed<'a>(
          self
      ) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a, Global>>
      where
          Self: Sized + Send + 'a,
      {}
      
      fn boxed_local<'a>(
          self
      ) -> Pin<Box<dyn Future<Output = Self::Output> + 'a, Global>>
      where
          Self: Sized + 'a,
      {}
      
      fn unit_error(self) -> UnitError<Self> 
      where
          Self: Sized,
      {}
      
      fn never_error(self) -> NeverError<Self> 
      where
          Self: Sized,
      {}
      
      fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
      where
          Self: Unpin,
      {}
      
      fn now_or_never(self) -> Option<Self::Output>
      where
          Self: Sized,
      {}
      

      可以看到,基本是函数式链式处理那一套方法和一些类型转换相关方法

      也就是说这个库的使用者认为 future 配合链式调用才是上策

    2. TryFutureExt/

      核心还是 trait TryFutureExt ,它也实现了很多的方法,适合链式调用

    3. ./

      • fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
      • enum Either<A, B>
      • fn join_all<I>(iter: I) -> JoinAll<I::Item>
      • fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2>
      • fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut>
      • struct OptionFuture<F>, 用于处理 Option\<T> 转换的 Future
      • fn pending<T>() -> Pending<T>
      • fn poll_fn<T, F>(f: F) -> PollFn<F>
      • fn poll_immediate<F: Future>(f: F) -> PollImmediate<F>, 马上返回,没成功就返回 None
      • fn ready<T>(t: T) -> Ready<T>
      • fn select_all<I>(iter: I) -> SelectAll<I::Item>
      • fn select_ok<I>(iter: I) -> SelectOk<I::Item>
      • fn select<A, B>(future1: A, future2: B) -> Select<A, B>
      • fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item>
      • fn try_join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> TryJoin<Fut1, Fut2>
      • fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut>
      • fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B>
  10. io

  11. sink

  12. stream