Merge pull request #929 from embassy-rs/futures-fixes

embassy-futures additions, prepare for crates.io release
This commit is contained in:
Dario Nieuwenhuis 2022-08-29 15:02:04 +02:00 committed by GitHub
commit 7542505cf9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 466 additions and 22 deletions

View file

@ -6,9 +6,12 @@ edition = "2021"
[package.metadata.embassy_docs]
src_base = "https://github.com/embassy-rs/embassy/blob/embassy-futures-v$VERSION/embassy-futures/src/"
src_base_git = "https://github.com/embassy-rs/embassy/blob/$COMMIT/embassy-futures/src/"
features = ["nightly"]
features = ["defmt"]
target = "thumbv7em-none-eabi"
[package.metadata.docs.rs]
features = ["defmt"]
[dependencies]
defmt = { version = "0.3", optional = true }
log = { version = "0.4.14", optional = true }

View file

@ -1,9 +1,28 @@
# embassy-futures
Utilities for working with futures:
An [Embassy](https://embassy.dev) project.
Utilities for working with futures, compatible with `no_std` and not using `alloc`. Optimized for code size,
ideal for embedded systems.
- Future combinators, like [`join`](join) and [`select`](select)
- Utilities to use `async` without a fully fledged executor: [`block_on`](block_on::block_on) and [`yield_now`](yield_now::yield_now).
## Interoperability
Futures from this crate can run on any executor.
## Minimum supported Rust version (MSRV)
Embassy is guaranteed to compile on the latest stable Rust version at the time of release. It might compile with older versions but that may change in any new patch release.
## License
This work is licensed under either of
- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or
<http://www.apache.org/licenses/LICENSE-2.0>)
- MIT license ([LICENSE-MIT](LICENSE-MIT) or <http://opensource.org/licenses/MIT>)
at your option.
- [`select`](select::select) - waiting for one out of two futures to complete.
- [`select3`](select::select3) - waiting for one out of three futures to complete.
- [`select4`](select::select4) - waiting for one out of four futures to complete.
- [`select_all`](select::select_all) - waiting for one future in a list of futures to complete.
- [`yield_now`](yield_now::yield_now) - yielding the current task.

View file

@ -0,0 +1,33 @@
use core::future::Future;
use core::pin::Pin;
use core::ptr;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
static VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RawWaker::new(ptr::null(), &VTABLE), |_| {}, |_| {}, |_| {});
/// Run a future to completion using a busy loop.
///
/// This calls `.poll()` on the future in a busy loop, which blocks
/// the current thread at 100% cpu usage until the future is done. The
/// future's `Waker` mechanism is not used.
///
/// You can use this to run multiple futures concurrently with [`join`][crate::join].
///
/// It's suitable for systems with no or limited concurrency and without
/// strict requirements around power consumption. For more complex use
/// cases, prefer using a "real" executor like `embassy-executor`, which
/// supports multiple tasks, and putting the core to sleep when no task
/// needs to do work.
pub fn block_on<F: Future>(mut fut: F) -> F::Output {
// safety: we don't move the future after this line.
let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
let raw_waker = RawWaker::new(ptr::null(), &VTABLE);
let waker = unsafe { Waker::from_raw(raw_waker) };
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(res) = fut.as_mut().poll(&mut cx) {
return res;
}
}
}

322
embassy-futures/src/join.rs Normal file
View file

@ -0,0 +1,322 @@
//! Wait for multiple futures to complete.
use core::future::Future;
use core::mem::MaybeUninit;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::{fmt, mem};
#[derive(Debug)]
enum MaybeDone<Fut: Future> {
/// A not-yet-completed future
Future(/* #[pin] */ Fut),
/// The output of the completed future
Done(Fut::Output),
/// The empty variant after the result of a [`MaybeDone`] has been
/// taken using the [`take_output`](MaybeDone::take_output) method.
Gone,
}
impl<Fut: Future> MaybeDone<Fut> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
let this = unsafe { self.get_unchecked_mut() };
match this {
Self::Future(fut) => match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Ready(res) => {
*this = Self::Done(res);
true
}
Poll::Pending => false,
},
_ => true,
}
}
fn take_output(&mut self) -> Fut::Output {
match &*self {
Self::Done(_) => {}
Self::Future(_) | Self::Gone => panic!("take_output when MaybeDone is not done."),
}
match mem::replace(self, Self::Gone) {
MaybeDone::Done(output) => output,
_ => unreachable!(),
}
}
}
impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
macro_rules! generate {
($(
$(#[$doc:meta])*
($Join:ident, <$($Fut:ident),*>),
)*) => ($(
$(#[$doc])*
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[allow(non_snake_case)]
pub struct $Join<$($Fut: Future),*> {
$(
$Fut: MaybeDone<$Fut>,
)*
}
impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*>
where
$(
$Fut: Future + fmt::Debug,
$Fut::Output: fmt::Debug,
)*
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(stringify!($Join))
$(.field(stringify!($Fut), &self.$Fut))*
.finish()
}
}
impl<$($Fut: Future),*> $Join<$($Fut),*> {
#[allow(non_snake_case)]
fn new($($Fut: $Fut),*) -> Self {
Self {
$($Fut: MaybeDone::Future($Fut)),*
}
}
}
impl<$($Fut: Future),*> Future for $Join<$($Fut),*> {
type Output = ($($Fut::Output),*);
fn poll(
self: Pin<&mut Self>, cx: &mut Context<'_>
) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let mut all_done = true;
$(
all_done &= unsafe { Pin::new_unchecked(&mut this.$Fut) }.poll(cx);
)*
if all_done {
Poll::Ready(($(this.$Fut.take_output()), *))
} else {
Poll::Pending
}
}
}
)*)
}
generate! {
/// Future for the [`join`](join()) function.
(Join, <Fut1, Fut2>),
/// Future for the [`join3`] function.
(Join3, <Fut1, Fut2, Fut3>),
/// Future for the [`join4`] function.
(Join4, <Fut1, Fut2, Fut3, Fut4>),
/// Future for the [`join5`] function.
(Join5, <Fut1, Fut2, Fut3, Fut4, Fut5>),
}
/// Joins the result of two futures, waiting for them both to complete.
///
/// This function will return a new future which awaits both futures to
/// complete. The returned future will finish with a tuple of both results.
///
/// Note that this function consumes the passed futures and returns a
/// wrapped version of it.
///
/// # Examples
///
/// ```
/// # embassy_futures::block_on(async {
///
/// let a = async { 1 };
/// let b = async { 2 };
/// let pair = embassy_futures::join::join(a, b).await;
///
/// assert_eq!(pair, (1, 2));
/// # });
/// ```
pub fn join<Fut1, Fut2>(future1: Fut1, future2: Fut2) -> Join<Fut1, Fut2>
where
Fut1: Future,
Fut2: Future,
{
Join::new(future1, future2)
}
/// Joins the result of three futures, waiting for them all to complete.
///
/// This function will return a new future which awaits all futures to
/// complete. The returned future will finish with a tuple of all results.
///
/// Note that this function consumes the passed futures and returns a
/// wrapped version of it.
///
/// # Examples
///
/// ```
/// # embassy_futures::block_on(async {
///
/// let a = async { 1 };
/// let b = async { 2 };
/// let c = async { 3 };
/// let res = embassy_futures::join::join3(a, b, c).await;
///
/// assert_eq!(res, (1, 2, 3));
/// # });
/// ```
pub fn join3<Fut1, Fut2, Fut3>(future1: Fut1, future2: Fut2, future3: Fut3) -> Join3<Fut1, Fut2, Fut3>
where
Fut1: Future,
Fut2: Future,
Fut3: Future,
{
Join3::new(future1, future2, future3)
}
/// Joins the result of four futures, waiting for them all to complete.
///
/// This function will return a new future which awaits all futures to
/// complete. The returned future will finish with a tuple of all results.
///
/// Note that this function consumes the passed futures and returns a
/// wrapped version of it.
///
/// # Examples
///
/// ```
/// # embassy_futures::block_on(async {
///
/// let a = async { 1 };
/// let b = async { 2 };
/// let c = async { 3 };
/// let d = async { 4 };
/// let res = embassy_futures::join::join4(a, b, c, d).await;
///
/// assert_eq!(res, (1, 2, 3, 4));
/// # });
/// ```
pub fn join4<Fut1, Fut2, Fut3, Fut4>(
future1: Fut1,
future2: Fut2,
future3: Fut3,
future4: Fut4,
) -> Join4<Fut1, Fut2, Fut3, Fut4>
where
Fut1: Future,
Fut2: Future,
Fut3: Future,
Fut4: Future,
{
Join4::new(future1, future2, future3, future4)
}
/// Joins the result of five futures, waiting for them all to complete.
///
/// This function will return a new future which awaits all futures to
/// complete. The returned future will finish with a tuple of all results.
///
/// Note that this function consumes the passed futures and returns a
/// wrapped version of it.
///
/// # Examples
///
/// ```
/// # embassy_futures::block_on(async {
///
/// let a = async { 1 };
/// let b = async { 2 };
/// let c = async { 3 };
/// let d = async { 4 };
/// let e = async { 5 };
/// let res = embassy_futures::join::join5(a, b, c, d, e).await;
///
/// assert_eq!(res, (1, 2, 3, 4, 5));
/// # });
/// ```
pub fn join5<Fut1, Fut2, Fut3, Fut4, Fut5>(
future1: Fut1,
future2: Fut2,
future3: Fut3,
future4: Fut4,
future5: Fut5,
) -> Join5<Fut1, Fut2, Fut3, Fut4, Fut5>
where
Fut1: Future,
Fut2: Future,
Fut3: Future,
Fut4: Future,
Fut5: Future,
{
Join5::new(future1, future2, future3, future4, future5)
}
// =====================================================
/// Future for the [`join_array`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct JoinArray<Fut: Future, const N: usize> {
futures: [MaybeDone<Fut>; N],
}
impl<Fut: Future, const N: usize> fmt::Debug for JoinArray<Fut, N>
where
Fut: Future + fmt::Debug,
Fut::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinArray").field("futures", &self.futures).finish()
}
}
impl<Fut: Future, const N: usize> Future for JoinArray<Fut, N> {
type Output = [Fut::Output; N];
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let mut all_done = true;
for f in this.futures.iter_mut() {
all_done &= unsafe { Pin::new_unchecked(f) }.poll(cx);
}
if all_done {
let mut array: [MaybeUninit<Fut::Output>; N] = unsafe { MaybeUninit::uninit().assume_init() };
for i in 0..N {
array[i].write(this.futures[i].take_output());
}
Poll::Ready(unsafe { (&array as *const _ as *const [Fut::Output; N]).read() })
} else {
Poll::Pending
}
}
}
/// Joins the result of an array of futures, waiting for them all to complete.
///
/// This function will return a new future which awaits all futures to
/// complete. The returned future will finish with a tuple of all results.
///
/// Note that this function consumes the passed futures and returns a
/// wrapped version of it.
///
/// # Examples
///
/// ```
/// # embassy_futures::block_on(async {
///
/// async fn foo(n: u32) -> u32 { n }
/// let a = foo(1);
/// let b = foo(2);
/// let c = foo(3);
/// let res = embassy_futures::join::join_array([a, b, c]).await;
///
/// assert_eq!(res, [1, 2, 3]);
/// # });
/// ```
pub fn join_array<Fut: Future, const N: usize>(futures: [Fut; N]) -> JoinArray<Fut, N> {
JoinArray {
futures: futures.map(MaybeDone::Future),
}
}

View file

@ -5,8 +5,11 @@
// This mod MUST go first, so that the others see its macros.
pub(crate) mod fmt;
mod select;
mod block_on;
mod yield_now;
pub use select::*;
pub mod join;
pub mod select;
pub use block_on::*;
pub use yield_now::*;

View file

@ -1,9 +1,12 @@
//! Wait for the first of several futures to complete.
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
/// Result for [`select`].
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Either<A, B> {
/// First future finished first.
First(A),
@ -60,6 +63,7 @@ where
/// Result for [`select3`].
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Either3<A, B, C> {
/// First future finished first.
First(A),
@ -118,6 +122,7 @@ where
/// Result for [`select4`].
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Either4<A, B, C, D> {
/// First future finished first.
First(A),
@ -183,28 +188,70 @@ where
// ====================================================================
/// Future for the [`select_all`] function.
/// Future for the [`select_array`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SelectAll<Fut, const N: usize> {
pub struct SelectArray<Fut, const N: usize> {
inner: [Fut; N],
}
/// Creates a new future which will select over a list of futures.
/// Creates a new future which will select over an array of futures.
///
/// The returned future will wait for any future within `iter` to be ready. Upon
/// The returned future will wait for any future to be ready. Upon
/// completion the item resolved will be returned, along with the index of the
/// future that was ready.
///
/// # Panics
///
/// This function will panic if the array specified contains no items.
pub fn select_all<Fut: Future, const N: usize>(arr: [Fut; N]) -> SelectAll<Fut, N> {
assert!(N > 0);
SelectAll { inner: arr }
/// If the array is empty, the resulting future will be Pending forever.
pub fn select_array<Fut: Future, const N: usize>(arr: [Fut; N]) -> SelectArray<Fut, N> {
SelectArray { inner: arr }
}
impl<Fut: Future, const N: usize> Future for SelectAll<Fut, N> {
impl<Fut: Future, const N: usize> Future for SelectArray<Fut, N> {
type Output = (Fut::Output, usize);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Safety: Since `self` is pinned, `inner` cannot move. Since `inner` cannot move,
// its elements also cannot move. Therefore it is safe to access `inner` and pin
// references to the contained futures.
let item = unsafe {
self.get_unchecked_mut()
.inner
.iter_mut()
.enumerate()
.find_map(|(i, f)| match Pin::new_unchecked(f).poll(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
})
};
match item {
Some((idx, res)) => Poll::Ready((res, idx)),
None => Poll::Pending,
}
}
}
// ====================================================================
/// Future for the [`select_slice`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SelectSlice<'a, Fut> {
inner: &'a mut [Fut],
}
/// Creates a new future which will select over a slice of futures.
///
/// The returned future will wait for any future to be ready. Upon
/// completion the item resolved will be returned, along with the index of the
/// future that was ready.
///
/// If the slice is empty, the resulting future will be Pending forever.
pub fn select_slice<'a, Fut: Future>(slice: &'a mut [Fut]) -> SelectSlice<'a, Fut> {
SelectSlice { inner: slice }
}
impl<'a, Fut: Future> Future for SelectSlice<'a, Fut> {
type Output = (Fut::Output, usize);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -3,6 +3,23 @@ use core::pin::Pin;
use core::task::{Context, Poll};
/// Yield from the current task once, allowing other tasks to run.
///
/// This can be used to easily and quickly implement simple async primitives
/// without using wakers. The following snippet will wait for a condition to
/// hold, while still allowing other tasks to run concurrently (not monopolizing
/// the executor thread).
///
/// ```rust,no_run
/// while !some_condition() {
/// yield_now().await;
/// }
/// ```
///
/// The downside is this will spin in a busy loop, using 100% of the CPU, while
/// using wakers correctly would allow the CPU to sleep while waiting.
///
/// The internal implementation is: on first poll the future wakes itself and
/// returns `Poll::Pending`. On second poll, it returns `Poll::Ready`.
pub fn yield_now() -> impl Future<Output = ()> {
YieldNowFuture { yielded: false }
}

View file

@ -12,7 +12,7 @@ mod descriptor_reader;
pub mod driver;
pub mod types;
use embassy_futures::{select, Either};
use embassy_futures::select::{select, Either};
use heapless::Vec;
pub use self::builder::{Builder, Config};

View file

@ -8,7 +8,7 @@ use core::sync::atomic::{AtomicBool, Ordering};
use defmt::*;
use embassy_executor::Spawner;
use embassy_futures::{select, Either};
use embassy_futures::select::{select, Either};
use embassy_nrf::gpio::{Input, Pin, Pull};
use embassy_nrf::usb::{Driver, PowerUsb};
use embassy_nrf::{interrupt, pac};