diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs index 435c97db8..6c76eed76 100644 --- a/embassy/src/executor/mod.rs +++ b/embassy/src/executor/mod.rs @@ -8,16 +8,17 @@ use core::pin::Pin; use core::ptr; use core::ptr::NonNull; use core::sync::atomic::{AtomicU32, Ordering}; -use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use core::task::{Context, Poll, RawWaker, Waker}; mod run_queue; mod util; +mod waker; use self::run_queue::{RunQueue, RunQueueItem}; use self::util::UninitCell; -/// Task is spawned and future hasn't finished running yet. -const STATE_RUNNING: u32 = 1 << 0; +/// Task is spawned (has a future) +const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue const STATE_RUN_QUEUED: u32 = 1 << 1; /// Task is in the executor timer queue @@ -27,7 +28,36 @@ pub(crate) struct TaskHeader { state: AtomicU32, run_queue_item: RunQueueItem, executor: Cell<*const Executor>, // Valid if state != 0 - poll_fn: UninitCell, // Valid if STATE_RUNNING + poll_fn: UninitCell, // Valid if STATE_SPAWNED +} + +impl TaskHeader { + pub(crate) unsafe fn enqueue(&self) { + let mut current = self.state.load(Ordering::Acquire); + loop { + // If already scheduled, or if not started, + if (current & STATE_RUN_QUEUED != 0) || (current & STATE_SPAWNED == 0) { + return; + } + + // Mark it as scheduled + let new = current | STATE_RUN_QUEUED; + + match self.state.compare_exchange_weak( + current, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(next_current) => current = next_current, + } + } + + // We have just marked the task as scheduled, so enqueue it. + let executor = &*self.executor.get(); + executor.enqueue(self as *const TaskHeader as *mut TaskHeader); + } } // repr(C) is needed to guarantee that header is located at offset 0 @@ -35,59 +65,9 @@ pub(crate) struct TaskHeader { #[repr(C)] pub struct Task { header: TaskHeader, - future: UninitCell, // Valid if STATE_RUNNING + future: UninitCell, // Valid if STATE_SPAWNED } -#[derive(Copy, Clone, Debug)] -#[cfg_attr(feature = "defmt", derive(defmt::Format))] -pub enum SpawnError { - Busy, -} - -//============= -// Waker - -static WAKER_VTABLE: RawWakerVTable = - RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop); - -unsafe fn waker_clone(p: *const ()) -> RawWaker { - RawWaker::new(p, &WAKER_VTABLE) -} - -unsafe fn waker_wake(p: *const ()) { - let header = &*(p as *const TaskHeader); - - let mut current = header.state.load(Ordering::Acquire); - loop { - // If already scheduled, or if not started, - if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) { - return; - } - - // Mark it as scheduled - let new = current | STATE_RUN_QUEUED; - - match header - .state - .compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire) - { - Ok(_) => break, - Err(next_current) => current = next_current, - } - } - - // We have just marked the task as scheduled, so enqueue it. - let executor = &*header.executor.get(); - executor.enqueue(p as *mut TaskHeader); -} - -unsafe fn waker_drop(_: *const ()) { - // nop -} - -//============= -// Task - impl Task { pub const fn new() -> Self { Self { @@ -103,7 +83,7 @@ impl Task { pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken { for task in pool { - let state = STATE_RUNNING | STATE_RUN_QUEUED; + let state = STATE_SPAWNED | STATE_RUN_QUEUED; if task .header .state @@ -129,14 +109,14 @@ impl Task { let this = &*(p as *const Task); let future = Pin::new_unchecked(this.future.as_mut()); - let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE)); + let waker = waker::from_task(p); let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); this.header .state - .fetch_and(!STATE_RUNNING, Ordering::AcqRel); + .fetch_and(!STATE_SPAWNED, Ordering::AcqRel); } Poll::Pending => {} } @@ -145,9 +125,6 @@ impl Task { unsafe impl Sync for Task {} -//============= -// Spawn token - #[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"] pub struct SpawnToken { header: Option>, @@ -160,8 +137,11 @@ impl Drop for SpawnToken { } } -//============= -// Executor +#[derive(Copy, Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum SpawnError { + Busy, +} pub struct Executor { run_queue: RunQueue, @@ -207,7 +187,7 @@ impl Executor { let header = &*p; let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_RUNNING == 0 { + if state & STATE_SPAWNED == 0 { // If task is not running, ignore it. This can happen in the following scenario: // - Task gets dequeued, poll starts // - While task is being polled, it gets woken. It gets placed in the queue. diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs new file mode 100644 index 000000000..662857dea --- /dev/null +++ b/embassy/src/executor/waker.rs @@ -0,0 +1,22 @@ +use core::task::{RawWaker, RawWakerVTable, Waker}; + +use super::TaskHeader; + +static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); + +unsafe fn clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &VTABLE) +} + +unsafe fn wake(p: *const ()) { + let header = &*(p as *const TaskHeader); + header.enqueue(); +} + +unsafe fn drop(_: *const ()) { + // nop +} + +pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker { + Waker::from_raw(RawWaker::new(p as _, &VTABLE)) +}