736: executor: allow Send-spawning of tasks if their args are Send. r=Dirbaio a=Dirbaio

This allows send-spawning (spawning into an executor in another thread) tasks if their args are Send. Previously this would require the entire future to be Send.

--


When send-spawning a task, we construct the future in this thread, and effectively
"send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
send-spawning should require the future `F` to be `Send`.

The problem is this is more restrictive than needed. Once the future is executing,
it is never sent to another thread. It is only sent when spawning. It should be
enough for the task's arguments to be Send. (and in practice it's super easy to
accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)

Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
args are Send, it's OK to send a !Send future, as long as we do it before first polling it.


Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
bors[bot] 2022-04-27 12:21:25 +00:00 committed by GitHub
commit 9c283cd445
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 54 deletions

View file

@ -73,26 +73,10 @@ pub fn run(args: syn::AttributeArgs, f: syn::ItemFn) -> Result<TokenStream, Toke
// in the user's code.
#task_inner
#visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl ::core::future::Future + 'static> {
use ::core::future::Future;
use #embassy_path::executor::SpawnToken;
use #embassy_path::executor::raw::TaskStorage;
type Fut = impl Future + 'static;
#[allow(clippy::declare_interior_mutable_const)]
const NEW_TS: TaskStorage<Fut> = TaskStorage::new();
static POOL: [TaskStorage<Fut>; #pool_size] = [NEW_TS; #pool_size];
// Opaque type laundering, to obscure its origin!
// Workaround for "opaque type's hidden type cannot be another opaque type from the same scope"
// https://github.com/rust-lang/rust/issues/96406
fn launder_tait(token: SpawnToken<impl Future+'static>) -> SpawnToken<impl Future+'static> {
token
}
launder_tait(unsafe { TaskStorage::spawn_pool(&POOL, move || #task_inner_ident(#(#arg_names,)*)) })
#visibility fn #task_ident(#fargs) -> #embassy_path::executor::SpawnToken<impl Sized> {
type Fut = impl ::core::future::Future + 'static;
static POOL: #embassy_path::executor::raw::TaskPool<Fut, #pool_size> = #embassy_path::executor::raw::TaskPool::new();
unsafe { POOL._spawn_async_fn(move || #task_inner_ident(#(#arg_names,)*)) }
}
};

View file

@ -110,8 +110,8 @@ impl TaskHeader {
/// Raw storage in which a task can be spawned.
///
/// This struct holds the necessary memory to spawn one task whose future is `F`.
/// At a given time, the `Task` may be in spawned or not-spawned state. You may spawn it
/// with [`Task::spawn()`], which will fail if it is already spawned.
/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
///
/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
@ -121,7 +121,7 @@ impl TaskHeader {
/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
// repr(C) is needed to guarantee that the Task is located at offset 0
// This makes it safe to cast between Task and Task pointers.
// This makes it safe to cast between TaskHeader and TaskStorage pointers.
#[repr(C)]
pub struct TaskStorage<F: Future + 'static> {
raw: TaskHeader,
@ -129,6 +129,9 @@ pub struct TaskStorage<F: Future + 'static> {
}
impl<F: Future + 'static> TaskStorage<F> {
#[cfg(feature = "nightly")]
const NEW: Self = Self::new();
/// Create a new TaskStorage, in not-spawned state.
#[cfg(feature = "nightly")]
pub const fn new() -> Self {
@ -147,22 +150,6 @@ impl<F: Future + 'static> TaskStorage<F> {
}
}
/// Try to spawn a task in a pool.
///
/// See [`Self::spawn()`] for details.
///
/// This will loop over the pool and spawn the task in the first storage that
/// is currently free. If none is free,
pub fn spawn_pool(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken<F> {
for task in pool {
if task.spawn_allocate() {
return unsafe { task.spawn_initialize(future) };
}
}
SpawnToken::new_failed()
}
/// Try to spawn the task.
///
/// The `future` closure constructs the future. It's only called if spawning is
@ -172,15 +159,15 @@ impl<F: Future + 'static> TaskStorage<F> {
///
/// This function will fail if the task is already spawned and has not finished running.
/// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
/// cause [`Executor::spawn()`] to return the error.
/// cause [`Spawner::spawn()`] to return the error.
///
/// Once the task has finished running, you may spawn it again. It is allowed to spawn it
/// on a different executor.
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> {
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
if self.spawn_allocate() {
unsafe { self.spawn_initialize(future) }
unsafe { SpawnToken::<F>::new(self.spawn_initialize(future)) }
} else {
SpawnToken::new_failed()
SpawnToken::<F>::new_failed()
}
}
@ -192,12 +179,11 @@ impl<F: Future + 'static> TaskStorage<F> {
.is_ok()
}
unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> SpawnToken<F> {
unsafe fn spawn_initialize(&'static self, future: impl FnOnce() -> F) -> NonNull<TaskHeader> {
// Initialize the task
self.raw.poll_fn.write(Self::poll);
self.future.write(future());
SpawnToken::new(NonNull::new_unchecked(&self.raw as *const TaskHeader as _))
NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader)
}
unsafe fn poll(p: NonNull<TaskHeader>) {
@ -222,6 +208,89 @@ impl<F: Future + 'static> TaskStorage<F> {
unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {}
/// Raw storage that can hold up to N tasks of the same type.
///
/// This is essentially a `[TaskStorage<F>; N]`.
#[cfg(feature = "nightly")]
pub struct TaskPool<F: Future + 'static, const N: usize> {
pool: [TaskStorage<F>; N],
}
#[cfg(feature = "nightly")]
impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
/// Create a new TaskPool, with all tasks in non-spawned state.
pub const fn new() -> Self {
Self {
pool: [TaskStorage::NEW; N],
}
}
/// Try to spawn a task in the pool.
///
/// See [`TaskStorage::spawn()`] for details.
///
/// This will loop over the pool and spawn the task in the first storage that
/// is currently free. If none is free, a "poisoned" SpawnToken is returned,
/// which will cause [`Spawner::spawn()`] to return the error.
pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
for task in &self.pool {
if task.spawn_allocate() {
return unsafe { SpawnToken::<F>::new(task.spawn_initialize(future)) };
}
}
SpawnToken::<F>::new_failed()
}
/// Like spawn(), but allows the task to be send-spawned if the args are Send even if
/// the future is !Send.
///
/// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
/// by the Embassy macros ONLY.
///
/// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
/// is an `async fn`, NOT a hand-written `Future`.
#[doc(hidden)]
pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
where
FutFn: FnOnce() -> F,
{
// When send-spawning a task, we construct the future in this thread, and effectively
// "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
// send-spawning should require the future `F` to be `Send`.
//
// The problem is this is more restrictive than needed. Once the future is executing,
// it is never sent to another thread. It is only sent when spawning. It should be
// enough for the task's arguments to be Send. (and in practice it's super easy to
// accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
//
// We can do it by sending the task args and constructing the future in the executor thread
// on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
// of the args.
//
// Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
// args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
//
// (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
// but it's possible it'll be guaranteed in the future. See zulip thread:
// https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
//
// The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
// This is why we return `SpawnToken<FutFn>` below.
//
// This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
// by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
for task in &self.pool {
if task.spawn_allocate() {
return SpawnToken::<FutFn>::new(task.spawn_initialize(future));
}
}
SpawnToken::<FutFn>::new_failed()
}
}
/// Raw executor.
///
/// This is the core of the Embassy executor. It is low-level, requiring manual

View file

@ -12,17 +12,21 @@ use super::raw;
/// value is a `SpawnToken` that represents an instance of the task, ready to spawn. You must
/// then spawn it into an executor, typically with [`Spawner::spawn()`].
///
/// The generic parameter `S` determines whether the task can be spawned in executors
/// in other threads or not. If `S: Send`, it can, which allows spawning it into a [`SendSpawner`].
/// If not, it can't, so it can only be spawned into the current thread's executor, with [`Spawner`].
///
/// # Panics
///
/// Dropping a SpawnToken instance panics. You may not "abort" spawning a task in this way.
/// Once you've invoked a task function and obtained a SpawnToken, you *must* spawn it.
#[must_use = "Calling a task function does nothing on its own. You must spawn the returned SpawnToken, typically with Spawner::spawn()"]
pub struct SpawnToken<F> {
pub struct SpawnToken<S> {
raw_task: Option<NonNull<raw::TaskHeader>>,
phantom: PhantomData<*mut F>,
phantom: PhantomData<*mut S>,
}
impl<F> SpawnToken<F> {
impl<S> SpawnToken<S> {
pub(crate) unsafe fn new(raw_task: NonNull<raw::TaskHeader>) -> Self {
Self {
raw_task: Some(raw_task),
@ -38,7 +42,7 @@ impl<F> SpawnToken<F> {
}
}
impl<F> Drop for SpawnToken<F> {
impl<S> Drop for SpawnToken<S> {
fn drop(&mut self) {
// TODO deallocate the task instead.
panic!("SpawnToken instances may not be dropped. You must pass them to Spawner::spawn()")
@ -97,7 +101,7 @@ impl Spawner {
/// Spawn a task into an executor.
///
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
pub fn spawn<F>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
let task = token.raw_task;
mem::forget(token);
@ -119,7 +123,7 @@ impl Spawner {
/// # Panics
///
/// Panics if the spawning fails.
pub fn must_spawn<F>(&self, token: SpawnToken<F>) {
pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
unwrap!(self.spawn(token));
}
@ -173,7 +177,7 @@ impl SendSpawner {
/// Spawn a task into an executor.
///
/// You obtain the `token` by calling a task function (i.e. one marked with `#[embassy::task]`).
pub fn spawn<F: Send>(&self, token: SpawnToken<F>) -> Result<(), SpawnError> {
pub fn spawn<S: Send>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
let header = token.raw_task;
mem::forget(token);
@ -191,7 +195,7 @@ impl SendSpawner {
/// # Panics
///
/// Panics if the spawning fails.
pub fn must_spawn<F: Send>(&self, token: SpawnToken<F>) {
pub fn must_spawn<S: Send>(&self, token: SpawnToken<S>) {
unwrap!(self.spawn(token));
}
}