executor: Allow TaskStorage to auto-implement Sync

This commit is contained in:
Grant Miller 2023-03-20 16:20:51 -05:00
parent b6663a013f
commit 41d558a5f4
4 changed files with 154 additions and 57 deletions

View file

@ -13,8 +13,8 @@ mod timer_queue;
pub(crate) mod util; pub(crate) mod util;
mod waker; mod waker;
use core::cell::Cell;
use core::future::Future; use core::future::Future;
use core::marker::PhantomData;
use core::mem; use core::mem;
use core::pin::Pin; use core::pin::Pin;
use core::ptr::NonNull; use core::ptr::NonNull;
@ -30,7 +30,7 @@ use embassy_time::Instant;
use rtos_trace::trace; use rtos_trace::trace;
use self::run_queue::{RunQueue, RunQueueItem}; use self::run_queue::{RunQueue, RunQueueItem};
use self::util::UninitCell; use self::util::{SyncUnsafeCell, UninitCell};
pub use self::waker::task_from_waker; pub use self::waker::task_from_waker;
use super::SpawnToken; use super::SpawnToken;
@ -46,11 +46,11 @@ pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub(crate) struct TaskHeader { pub(crate) struct TaskHeader {
pub(crate) state: AtomicU32, pub(crate) state: AtomicU32,
pub(crate) run_queue_item: RunQueueItem, pub(crate) run_queue_item: RunQueueItem,
pub(crate) executor: Cell<Option<&'static Executor>>, pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
poll_fn: Cell<Option<unsafe fn(TaskRef)>>, poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
pub(crate) expires_at: Cell<Instant>, pub(crate) expires_at: SyncUnsafeCell<Instant>,
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
pub(crate) timer_queue_item: timer_queue::TimerQueueItem, pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
} }
@ -61,6 +61,9 @@ pub struct TaskRef {
ptr: NonNull<TaskHeader>, ptr: NonNull<TaskHeader>,
} }
unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
impl TaskRef { impl TaskRef {
fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self { fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
Self { Self {
@ -115,12 +118,12 @@ impl<F: Future + 'static> TaskStorage<F> {
raw: TaskHeader { raw: TaskHeader {
state: AtomicU32::new(0), state: AtomicU32::new(0),
run_queue_item: RunQueueItem::new(), run_queue_item: RunQueueItem::new(),
executor: Cell::new(None), executor: SyncUnsafeCell::new(None),
// Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
poll_fn: Cell::new(None), poll_fn: SyncUnsafeCell::new(None),
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
expires_at: Cell::new(Instant::from_ticks(0)), expires_at: SyncUnsafeCell::new(Instant::from_ticks(0)),
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
timer_queue_item: timer_queue::TimerQueueItem::new(), timer_queue_item: timer_queue::TimerQueueItem::new(),
}, },
@ -170,9 +173,15 @@ impl<F: Future + 'static> TaskStorage<F> {
// it's a noop for our waker. // it's a noop for our waker.
mem::forget(waker); mem::forget(waker);
} }
}
unsafe impl<F: Future + 'static> Sync for TaskStorage<F> {} #[doc(hidden)]
#[allow(dead_code)]
fn _assert_sync(self) {
fn assert_sync<T: Sync>(_: T) {}
assert_sync(self)
}
}
struct AvailableTask<F: Future + 'static> { struct AvailableTask<F: Future + 'static> {
task: &'static TaskStorage<F>, task: &'static TaskStorage<F>,
@ -279,29 +288,13 @@ impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
} }
} }
/// Raw executor. struct SignalCtx(*mut ());
/// unsafe impl Sync for SignalCtx {}
/// This is the core of the Embassy executor. It is low-level, requiring manual
/// handling of wakeups and task polling. If you can, prefer using one of the pub(crate) struct SyncExecutor {
/// [higher level executors](crate::Executor).
///
/// The raw executor leaves it up to you to handle wakeups and scheduling:
///
/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
/// that "want to run").
/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
/// to do. You must arrange for `poll()` to be called as soon as possible.
///
/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
/// level, etc. It may be called synchronously from any `Executor` method call as well.
/// You must deal with this correctly.
///
/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
/// the requirement for `poll` to not be called reentrantly.
pub struct Executor {
run_queue: RunQueue, run_queue: RunQueue,
signal_fn: fn(*mut ()), signal_fn: fn(*mut ()),
signal_ctx: *mut (), signal_ctx: SignalCtx,
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
pub(crate) timer_queue: timer_queue::TimerQueue, pub(crate) timer_queue: timer_queue::TimerQueue,
@ -309,14 +302,8 @@ pub struct Executor {
alarm: AlarmHandle, alarm: AlarmHandle,
} }
impl Executor { impl SyncExecutor {
/// Create a new executor. pub(crate) fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
///
/// When the executor has work to do, it will call `signal_fn` with
/// `signal_ctx` as argument.
///
/// See [`Executor`] docs for details on `signal_fn`.
pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
let alarm = unsafe { unwrap!(driver::allocate_alarm()) }; let alarm = unsafe { unwrap!(driver::allocate_alarm()) };
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
@ -325,7 +312,7 @@ impl Executor {
Self { Self {
run_queue: RunQueue::new(), run_queue: RunQueue::new(),
signal_fn, signal_fn,
signal_ctx, signal_ctx: SignalCtx(signal_ctx),
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
timer_queue: timer_queue::TimerQueue::new(), timer_queue: timer_queue::TimerQueue::new(),
@ -346,7 +333,7 @@ impl Executor {
trace::task_ready_begin(task.as_ptr() as u32); trace::task_ready_begin(task.as_ptr() as u32);
if self.run_queue.enqueue(cs, task) { if self.run_queue.enqueue(cs, task) {
(self.signal_fn)(self.signal_ctx) (self.signal_fn)(self.signal_ctx.0)
} }
} }
@ -387,7 +374,8 @@ impl Executor {
/// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to /// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
/// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
/// no `poll()` already running. /// no `poll()` already running.
pub unsafe fn poll(&'static self) { pub(crate) unsafe fn poll(&'static self) {
#[allow(clippy::never_loop)]
loop { loop {
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]
self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
@ -441,6 +429,84 @@ impl Executor {
#[cfg(feature = "rtos-trace")] #[cfg(feature = "rtos-trace")]
trace::system_idle(); trace::system_idle();
} }
}
/// Raw executor.
///
/// This is the core of the Embassy executor. It is low-level, requiring manual
/// handling of wakeups and task polling. If you can, prefer using one of the
/// [higher level executors](crate::Executor).
///
/// The raw executor leaves it up to you to handle wakeups and scheduling:
///
/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
/// that "want to run").
/// - You must supply a `signal_fn`. The executor will call it to notify you it has work
/// to do. You must arrange for `poll()` to be called as soon as possible.
///
/// `signal_fn` can be called from *any* context: any thread, any interrupt priority
/// level, etc. It may be called synchronously from any `Executor` method call as well.
/// You must deal with this correctly.
///
/// In particular, you must NOT call `poll` directly from `signal_fn`, as this violates
/// the requirement for `poll` to not be called reentrantly.
#[repr(transparent)]
pub struct Executor {
pub(crate) inner: SyncExecutor,
_not_sync: PhantomData<*mut ()>,
}
impl Executor {
pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
mem::transmute(inner)
}
/// Create a new executor.
///
/// When the executor has work to do, it will call `signal_fn` with
/// `signal_ctx` as argument.
///
/// See [`Executor`] docs for details on `signal_fn`.
pub fn new(signal_fn: fn(*mut ()), signal_ctx: *mut ()) -> Self {
Self {
inner: SyncExecutor::new(signal_fn, signal_ctx),
_not_sync: PhantomData,
}
}
/// Spawn a task in this executor.
///
/// # Safety
///
/// `task` must be a valid pointer to an initialized but not-already-spawned task.
///
/// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
/// In this case, the task's Future must be Send. This is because this is effectively
/// sending the task to the executor thread.
pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
self.inner.spawn(task)
}
/// Poll all queued tasks in this executor.
///
/// This loops over all tasks that are queued to be polled (i.e. they're
/// freshly spawned or they've been woken). Other tasks are not polled.
///
/// You must call `poll` after receiving a call to `signal_fn`. It is OK
/// to call `poll` even when not requested by `signal_fn`, but it wastes
/// energy.
///
/// # Safety
///
/// You must NOT call `poll` reentrantly on the same executor.
///
/// In particular, note that `poll` may call `signal_fn` synchronously. Therefore, you
/// must NOT directly call `poll()` from your `signal_fn`. Instead, `signal_fn` has to
/// somehow schedule for `poll()` to be called later, at a time you know for sure there's
/// no `poll()` already running.
pub unsafe fn poll(&'static self) {
self.inner.poll()
}
/// Get a spawner that spawns tasks in this executor. /// Get a spawner that spawns tasks in this executor.
/// ///
@ -483,9 +549,11 @@ impl embassy_time::queue::TimerQueue for TimerQueue {
fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) {
let task = waker::task_from_waker(waker); let task = waker::task_from_waker(waker);
let task = task.header(); let task = task.header();
unsafe {
let expires_at = task.expires_at.get(); let expires_at = task.expires_at.get();
task.expires_at.set(expires_at.min(at)); task.expires_at.set(expires_at.min(at));
} }
}
} }
#[cfg(feature = "integrated-timers")] #[cfg(feature = "integrated-timers")]

View file

@ -1,28 +1,32 @@
use core::cell::Cell;
use core::cmp::min; use core::cmp::min;
use atomic_polyfill::Ordering; use atomic_polyfill::Ordering;
use embassy_time::Instant; use embassy_time::Instant;
use super::{TaskRef, STATE_TIMER_QUEUED}; use super::{TaskRef, STATE_TIMER_QUEUED};
use crate::raw::util::SyncUnsafeCell;
pub(crate) struct TimerQueueItem { pub(crate) struct TimerQueueItem {
next: Cell<Option<TaskRef>>, next: SyncUnsafeCell<Option<TaskRef>>,
} }
impl TimerQueueItem { impl TimerQueueItem {
pub const fn new() -> Self { pub const fn new() -> Self {
Self { next: Cell::new(None) } Self {
next: SyncUnsafeCell::new(None),
}
} }
} }
pub(crate) struct TimerQueue { pub(crate) struct TimerQueue {
head: Cell<Option<TaskRef>>, head: SyncUnsafeCell<Option<TaskRef>>,
} }
impl TimerQueue { impl TimerQueue {
pub const fn new() -> Self { pub const fn new() -> Self {
Self { head: Cell::new(None) } Self {
head: SyncUnsafeCell::new(None),
}
} }
pub(crate) unsafe fn update(&self, p: TaskRef) { pub(crate) unsafe fn update(&self, p: TaskRef) {

View file

@ -25,3 +25,32 @@ impl<T> UninitCell<T> {
ptr::drop_in_place(self.as_mut_ptr()) ptr::drop_in_place(self.as_mut_ptr())
} }
} }
unsafe impl<T> Sync for UninitCell<T> {}
#[repr(transparent)]
pub struct SyncUnsafeCell<T> {
value: UnsafeCell<T>,
}
unsafe impl<T: Sync> Sync for SyncUnsafeCell<T> {}
impl<T> SyncUnsafeCell<T> {
#[inline]
pub const fn new(value: T) -> Self {
Self {
value: UnsafeCell::new(value),
}
}
pub unsafe fn set(&self, value: T) {
*self.value.get() = value;
}
pub unsafe fn get(&self) -> T
where
T: Copy,
{
*self.value.get()
}
}

View file

@ -92,6 +92,7 @@ impl Spawner {
poll_fn(|cx| { poll_fn(|cx| {
let task = raw::task_from_waker(cx.waker()); let task = raw::task_from_waker(cx.waker());
let executor = unsafe { task.header().executor.get().unwrap_unchecked() }; let executor = unsafe { task.header().executor.get().unwrap_unchecked() };
let executor = unsafe { raw::Executor::wrap(executor) };
Poll::Ready(Self::new(executor)) Poll::Ready(Self::new(executor))
}) })
.await .await
@ -130,9 +131,7 @@ impl Spawner {
/// spawner to other threads, but the spawner loses the ability to spawn /// spawner to other threads, but the spawner loses the ability to spawn
/// non-Send tasks. /// non-Send tasks.
pub fn make_send(&self) -> SendSpawner { pub fn make_send(&self) -> SendSpawner {
SendSpawner { SendSpawner::new(&self.executor.inner)
executor: self.executor,
}
} }
} }
@ -145,14 +144,11 @@ impl Spawner {
/// If you want to spawn non-Send tasks, use [Spawner]. /// If you want to spawn non-Send tasks, use [Spawner].
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct SendSpawner { pub struct SendSpawner {
executor: &'static raw::Executor, executor: &'static raw::SyncExecutor,
} }
unsafe impl Send for SendSpawner {}
unsafe impl Sync for SendSpawner {}
impl SendSpawner { impl SendSpawner {
pub(crate) fn new(executor: &'static raw::Executor) -> Self { pub(crate) fn new(executor: &'static raw::SyncExecutor) -> Self {
Self { executor } Self { executor }
} }