diff --git a/Cargo.toml b/Cargo.toml index 3ad62f18d..792804046 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ debug = 2 debug-assertions = false incremental = false lto = "fat" -opt-level = 3 +opt-level = 's' overflow-checks = false # do not optimize proc-macro crates = faster builds from scratch diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index 08d273dbb..8b05e29fe 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml @@ -22,6 +22,5 @@ rand_core = { version = "0.5.1", optional = true, features = ["std"] } cortex-m = "0.6.4" futures = { version = "0.3.5", default-features = false } pin-project = { version = "1.0.2", default-features = false } -futures-intrusive = { version = "0.3.1", default-features = false } embassy-macros = { version = "0.1.0", path = "../embassy-macros"} diff --git a/embassy/src/executor/mod.rs b/embassy/src/executor/mod.rs index 6c76eed76..c1ec3832a 100644 --- a/embassy/src/executor/mod.rs +++ b/embassy/src/executor/mod.rs @@ -1,6 +1,5 @@ pub use embassy_macros::task; -use core::cell::Cell; use core::future::Future; use core::marker::PhantomData; use core::mem; @@ -8,30 +7,54 @@ use core::pin::Pin; use core::ptr; use core::ptr::NonNull; use core::sync::atomic::{AtomicU32, Ordering}; -use core::task::{Context, Poll, RawWaker, Waker}; +use core::task::{Context, Poll, Waker}; +use core::{ + cell::{Cell, UnsafeCell}, + cmp::min, +}; mod run_queue; +pub(crate) mod timer; +mod timer_queue; mod util; mod waker; use self::run_queue::{RunQueue, RunQueueItem}; +use self::timer_queue::{TimerQueue, TimerQueueItem}; use self::util::UninitCell; +use crate::{ + fmt::{panic, *}, + time::{Alarm, Instant}, +}; /// Task is spawned (has a future) -const STATE_SPAWNED: u32 = 1 << 0; +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; /// Task is in the executor run queue -const STATE_RUN_QUEUED: u32 = 1 << 1; +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; /// Task is in the executor timer queue -const STATE_TIMER_QUEUED: u32 = 1 << 2; +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; pub(crate) struct TaskHeader { state: AtomicU32, run_queue_item: RunQueueItem, + expires_at: Cell, + timer_queue_item: TimerQueueItem, executor: Cell<*const Executor>, // Valid if state != 0 poll_fn: UninitCell, // Valid if STATE_SPAWNED } impl TaskHeader { + const fn new() -> Self { + Self { + state: AtomicU32::new(0), + expires_at: Cell::new(Instant::from_ticks(0)), + run_queue_item: RunQueueItem::new(), + timer_queue_item: TimerQueueItem::new(), + executor: Cell::new(ptr::null()), + poll_fn: UninitCell::uninit(), + } + } + pub(crate) unsafe fn enqueue(&self) { let mut current = self.state.load(Ordering::Acquire); loop { @@ -71,12 +94,7 @@ pub struct Task { impl Task { pub const fn new() -> Self { Self { - header: TaskHeader { - state: AtomicU32::new(0), - run_queue_item: RunQueueItem::new(), - executor: Cell::new(ptr::null()), - poll_fn: UninitCell::uninit(), - }, + header: TaskHeader::new(), future: UninitCell::uninit(), } } @@ -144,7 +162,9 @@ pub enum SpawnError { } pub struct Executor { + alarm: Option<&'static dyn Alarm>, run_queue: RunQueue, + timer_queue: TimerQueue, signal_fn: fn(), not_send: PhantomData<*mut ()>, } @@ -152,7 +172,18 @@ pub struct Executor { impl Executor { pub const fn new(signal_fn: fn()) -> Self { Self { + alarm: None, run_queue: RunQueue::new(), + timer_queue: TimerQueue::new(), + signal_fn: signal_fn, + not_send: PhantomData, + } + } + pub const fn new_with_alarm(alarm: &'static dyn Alarm, signal_fn: fn()) -> Self { + Self { + alarm: Some(alarm), + run_queue: RunQueue::new(), + timer_queue: TimerQueue::new(), signal_fn: signal_fn, not_send: PhantomData, } @@ -183,8 +214,13 @@ impl Executor { /// Runs the executor until the queue is empty. pub fn run(&self) { unsafe { + self.timer_queue.dequeue_expired(Instant::now(), |p| { + self.enqueue(p); + }); + self.run_queue.dequeue_all(|p| { let header = &*p; + header.expires_at.set(Instant::MAX); let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); if state & STATE_SPAWNED == 0 { @@ -198,7 +234,25 @@ impl Executor { // Run the task header.poll_fn.read()(p as _); + + // Enqueue or update into timer_queue + self.timer_queue.update(p); }); + + // If this is in the past, set_alarm will immediately trigger the alarm, + // which will make the wfe immediately return so we do another loop iteration. + if let Some(alarm) = self.alarm { + let next_expiration = self.timer_queue.next_expiration(); + alarm.set_callback(self.signal_fn); + alarm.set(next_expiration.as_ticks()); + } } } } + +pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) { + let p = waker::task_from_waker(waker); + let header = &*p; + let expires_at = header.expires_at.get(); + header.expires_at.set(min(expires_at, at)); +} diff --git a/embassy/src/executor/timer.rs b/embassy/src/executor/timer.rs new file mode 100644 index 000000000..05c14b880 --- /dev/null +++ b/embassy/src/executor/timer.rs @@ -0,0 +1,67 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures::Stream; + +use crate::time::{Duration, Instant}; + +pub struct Timer { + expires_at: Instant, +} + +impl Timer { + pub fn at(expires_at: Instant) -> Self { + Self { expires_at } + } + + pub fn after(duration: Duration) -> Self { + Self { + expires_at: Instant::now() + duration, + } + } +} + +impl Unpin for Timer {} + +impl Future for Timer { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.expires_at <= Instant::now() { + Poll::Ready(()) + } else { + unsafe { super::register_timer(self.expires_at, cx.waker()) }; + Poll::Pending + } + } +} + +pub struct Ticker { + expires_at: Instant, + duration: Duration, +} + +impl Ticker { + pub fn every(duration: Duration) -> Self { + let expires_at = Instant::now() + duration; + Self { + expires_at, + duration, + } + } +} + +impl Unpin for Ticker {} + +impl Stream for Ticker { + type Item = (); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.expires_at <= Instant::now() { + let dur = self.duration; + self.expires_at += dur; + Poll::Ready(Some(())) + } else { + unsafe { super::register_timer(self.expires_at, cx.waker()) }; + Poll::Pending + } + } +} diff --git a/embassy/src/executor/timer_queue.rs b/embassy/src/executor/timer_queue.rs new file mode 100644 index 000000000..428b6cf63 --- /dev/null +++ b/embassy/src/executor/timer_queue.rs @@ -0,0 +1,85 @@ +use core::cell::Cell; +use core::sync::atomic::{AtomicPtr, Ordering}; +use core::{cmp::min, ptr}; + +use crate::time::Instant; + +use super::{TaskHeader, STATE_TIMER_QUEUED}; + +pub(crate) struct TimerQueueItem { + next: Cell<*mut TaskHeader>, +} + +impl TimerQueueItem { + pub const fn new() -> Self { + Self { + next: Cell::new(ptr::null_mut()), + } + } +} + +pub(crate) struct TimerQueue { + head: Cell<*mut TaskHeader>, +} + +impl TimerQueue { + pub const fn new() -> Self { + Self { + head: Cell::new(ptr::null_mut()), + } + } + + pub(crate) unsafe fn update(&self, p: *mut TaskHeader) { + let header = &*p; + if header.expires_at.get() != Instant::MAX { + let old_state = header.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); + let is_new = old_state & STATE_TIMER_QUEUED == 0; + + if is_new { + header.timer_queue_item.next.set(self.head.get()); + self.head.set(p); + } + } + } + + pub(crate) unsafe fn next_expiration(&self) -> Instant { + let mut res = Instant::MAX; + self.retain(|p| { + let header = &*p; + let expires = header.expires_at.get(); + res = min(res, expires); + expires != Instant::MAX + }); + res + } + + pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(*mut TaskHeader)) { + self.retain(|p| { + let header = &*p; + if header.expires_at.get() <= now { + on_task(p); + false + } else { + true + } + }); + } + + pub(crate) unsafe fn retain(&self, mut f: impl FnMut(*mut TaskHeader) -> bool) { + let mut prev = &self.head; + while !prev.get().is_null() { + let p = prev.get(); + let header = &*p; + if f(p) { + // Skip to next + prev = &header.timer_queue_item.next; + } else { + // Remove it + prev.set(header.timer_queue_item.next.get()); + header + .state + .fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + } + } + } +} diff --git a/embassy/src/executor/waker.rs b/embassy/src/executor/waker.rs index 662857dea..5a604d865 100644 --- a/embassy/src/executor/waker.rs +++ b/embassy/src/executor/waker.rs @@ -1,15 +1,16 @@ +use core::mem; use core::task::{RawWaker, RawWakerVTable, Waker}; use super::TaskHeader; -static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); +const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop); unsafe fn clone(p: *const ()) -> RawWaker { RawWaker::new(p, &VTABLE) } unsafe fn wake(p: *const ()) { - let header = &*(p as *const TaskHeader); + let header = &*task_from_ptr(p); header.enqueue(); } @@ -20,3 +21,18 @@ unsafe fn drop(_: *const ()) { pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker { Waker::from_raw(RawWaker::new(p as _, &VTABLE)) } + +pub(crate) unsafe fn task_from_ptr(p: *const ()) -> *mut TaskHeader { + p as *mut TaskHeader +} + +pub(crate) unsafe fn task_from_waker(w: &Waker) -> *mut TaskHeader { + let w: &WakerHack = mem::transmute(w); + assert_eq!(w.vtable, &VTABLE); + task_from_ptr(w.data) +} + +struct WakerHack { + data: *const (), + vtable: &'static RawWakerVTable, +} diff --git a/embassy/src/time/instant.rs b/embassy/src/time/instant.rs index 75098081f..e1aff11c4 100644 --- a/embassy/src/time/instant.rs +++ b/embassy/src/time/instant.rs @@ -12,6 +12,9 @@ pub struct Instant { } impl Instant { + pub const MIN: Instant = Instant { ticks: u64::MIN }; + pub const MAX: Instant = Instant { ticks: u64::MAX }; + pub fn now() -> Instant { Instant { ticks: now() } } diff --git a/embassy/src/time/mod.rs b/embassy/src/time/mod.rs index 896838371..7722c3562 100644 --- a/embassy/src/time/mod.rs +++ b/embassy/src/time/mod.rs @@ -2,6 +2,7 @@ mod duration; mod instant; mod traits; +pub use crate::executor::timer::{Ticker, Timer}; pub use duration::Duration; pub use instant::Instant; pub use traits::*; diff --git a/examples/src/bin/multiprio.rs b/examples/src/bin/multiprio.rs index e73747ac6..be7e91a9d 100644 --- a/examples/src/bin/multiprio.rs +++ b/examples/src/bin/multiprio.rs @@ -64,7 +64,7 @@ use example_common::*; use cortex_m_rt::entry; use nrf52840_hal::clocks; -use embassy::executor::{task, TimerExecutor}; +use embassy::executor::{task, Executor}; use embassy::time::{Duration, Instant, Timer}; use embassy::util::Forever; use embassy_nrf::{interrupt, pac, rtc}; @@ -112,9 +112,12 @@ async fn run_low() { } static RTC: Forever> = Forever::new(); -static EXECUTOR_LOW: Forever>> = Forever::new(); -static EXECUTOR_MED: Forever>> = Forever::new(); -static EXECUTOR_HIGH: Forever>> = Forever::new(); +static ALARM_LOW: Forever> = Forever::new(); +static EXECUTOR_LOW: Forever = Forever::new(); +static ALARM_MED: Forever> = Forever::new(); +static EXECUTOR_MED: Forever = Forever::new(); +static ALARM_HIGH: Forever> = Forever::new(); +static EXECUTOR_HIGH: Forever = Forever::new(); #[entry] fn main() -> ! { @@ -131,11 +134,14 @@ fn main() -> ! { rtc.start(); unsafe { embassy::time::set_clock(rtc) }; - let executor_low = EXECUTOR_LOW.put(TimerExecutor::new(rtc.alarm0(), cortex_m::asm::sev)); - let executor_med = EXECUTOR_MED.put(TimerExecutor::new(rtc.alarm1(), || { + let alarm_low = ALARM_LOW.put(rtc.alarm0()); + let executor_low = EXECUTOR_LOW.put(Executor::new_with_alarm(alarm_low, cortex_m::asm::sev)); + let alarm_med = ALARM_MED.put(rtc.alarm1()); + let executor_med = EXECUTOR_MED.put(Executor::new_with_alarm(alarm_med, || { interrupt::pend(interrupt::SWI0_EGU0) })); - let executor_high = EXECUTOR_HIGH.put(TimerExecutor::new(rtc.alarm2(), || { + let alarm_high = ALARM_HIGH.put(rtc.alarm2()); + let executor_high = EXECUTOR_HIGH.put(Executor::new_with_alarm(alarm_high, || { interrupt::pend(interrupt::SWI1_EGU1) })); diff --git a/examples/src/bin/rtc_async.rs b/examples/src/bin/rtc_async.rs index b4ee736b7..aec70a072 100644 --- a/examples/src/bin/rtc_async.rs +++ b/examples/src/bin/rtc_async.rs @@ -10,7 +10,7 @@ use core::mem::MaybeUninit; use cortex_m_rt::entry; use nrf52840_hal::clocks; -use embassy::executor::{task, TimerExecutor}; +use embassy::executor::{task, Executor}; use embassy::time::{Clock, Duration, Timer}; use embassy::util::Forever; use embassy_nrf::pac; @@ -33,7 +33,8 @@ async fn run2() { } static RTC: Forever> = Forever::new(); -static EXECUTOR: Forever>> = Forever::new(); +static ALARM: Forever> = Forever::new(); +static EXECUTOR: Forever = Forever::new(); #[entry] fn main() -> ! { @@ -51,7 +52,8 @@ fn main() -> ! { unsafe { embassy::time::set_clock(rtc) }; - let executor = EXECUTOR.put(TimerExecutor::new(rtc.alarm0(), cortex_m::asm::sev)); + let alarm = ALARM.put(rtc.alarm0()); + let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev)); unwrap!(executor.spawn(run1())); unwrap!(executor.spawn(run2()));