From c2404ee8ca6200d9037f096eee3f0eab98711778 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Tue, 6 Sep 2022 21:39:23 +0300 Subject: [PATCH 01/11] Initial generic timer queue impl --- embassy-time/Cargo.toml | 5 + embassy-time/src/lib.rs | 1 + embassy-time/src/queue.rs | 197 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 203 insertions(+) create mode 100644 embassy-time/src/queue.rs diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml index c51a71d01..0e3391d1f 100644 --- a/embassy-time/Cargo.toml +++ b/embassy-time/Cargo.toml @@ -26,6 +26,9 @@ unstable-traits = ["embedded-hal-1"] # To use this you must have a time driver provided. defmt-timestamp-uptime = ["defmt"] +# TODO: Doc +generic-queue = [] + # Set the `embassy_time` tick rate. # # At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. @@ -111,9 +114,11 @@ embedded-hal-async = { version = "=0.1.0-alpha.2", optional = true} futures-util = { version = "0.3.17", default-features = false } embassy-macros = { version = "0.1.0", path = "../embassy-macros"} +embassy-sync = { version = "0.1", path = "../embassy-sync" } atomic-polyfill = "1.0.1" critical-section = "1.1" cfg-if = "1.0.0" +heapless = "0.7" # WASM dependencies wasm-bindgen = { version = "0.2.81", optional = true } diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs index 4edc883fe..0457a6571 100644 --- a/embassy-time/src/lib.rs +++ b/embassy-time/src/lib.rs @@ -11,6 +11,7 @@ mod delay; pub mod driver; mod duration; mod instant; +pub mod queue; mod tick; mod timer; diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs new file mode 100644 index 000000000..7e84090b1 --- /dev/null +++ b/embassy-time/src/queue.rs @@ -0,0 +1,197 @@ +//! Generic timer queue implementation +use core::cell::RefCell; +use core::cmp::Ordering; +use core::task::Waker; + +use atomic_polyfill::{AtomicBool, Ordering as AtomicOrdering}; +use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; +use embassy_sync::blocking_mutex::Mutex; +use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; + +use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; +use crate::Instant; + +#[derive(Debug)] +struct Timer { + at: Instant, + waker: Waker, +} + +impl PartialEq for Timer { + fn eq(&self, other: &Self) -> bool { + self.at == other.at + } +} + +impl Eq for Timer {} + +impl PartialOrd for Timer { + fn partial_cmp(&self, other: &Self) -> Option { + self.at.partial_cmp(&other.at) + } +} + +impl Ord for Timer { + fn cmp(&self, other: &Self) -> Ordering { + self.at.cmp(&other.at) + } +} + +struct InnerQueue { + queue: SortedLinkedList, + alarm_at: Instant, + alarm: Option, +} + +impl InnerQueue { + const fn new() -> Self { + Self { + queue: SortedLinkedList::new_u8(), + alarm_at: Instant::MAX, + alarm: None, + } + } + + fn schedule(&mut self, at: Instant, waker: &Waker) { + self.queue + .find_mut(|timer| timer.waker.will_wake(waker)) + .map(|mut timer| { + timer.waker = waker.clone(); + timer.at = at; + + timer.finish(); + }) + .unwrap_or_else(|| { + let mut timer = Timer { + waker: waker.clone(), + at, + }; + + loop { + match self.queue.push(timer) { + Ok(()) => break, + Err(e) => timer = e, + } + + self.queue.pop().unwrap().waker.wake(); + } + }); + + // Don't wait for the alarm callback to trigger and directly + // dispatch all timers that are already due + // + // Then update the alarm if necessary + self.dispatch(); + } + + fn dispatch(&mut self) { + let now = Instant::now(); + + while self.queue.peek().filter(|timer| timer.at <= now).is_some() { + self.queue.pop().unwrap().waker.wake(); + } + + self.update_alarm(); + } + + fn update_alarm(&mut self) { + if let Some(timer) = self.queue.peek() { + let new_at = timer.at; + + if self.alarm_at != new_at { + self.alarm_at = new_at; + set_alarm(self.alarm.unwrap(), new_at.as_ticks()); + } + } else { + self.alarm_at = Instant::MAX; + } + } + + fn handle_alarm(&mut self) { + self.alarm_at = Instant::MAX; + + self.dispatch(); + } +} + +/// TODO: Doc +pub struct Queue { + initialized: AtomicBool, + inner: Mutex>>, +} + +impl Queue { + /// TODO: Doc + pub const fn new() -> Self { + Self { + initialized: AtomicBool::new(false), + inner: Mutex::new(RefCell::new(InnerQueue::::new())), + } + } + + /// TODO: Doc + pub unsafe fn initialize(&'static self) { + if self.initialized.load(AtomicOrdering::SeqCst) { + panic!("Queue already initialized"); + } + + let handle = allocate_alarm().unwrap(); + self.inner.lock(|inner| inner.borrow_mut().alarm = Some(handle)); + + set_alarm_callback(handle, Self::handle_alarm, self as *const _ as _); + + self.initialized.store(true, AtomicOrdering::SeqCst); + } + + /// TODO: Doc + pub fn schedule(&'static self, at: Instant, waker: &Waker) { + self.check_initialized(); + + self.inner.lock(|inner| inner.borrow_mut().schedule(at, waker)); + } + + fn check_initialized(&self) { + if !self.initialized.load(AtomicOrdering::SeqCst) { + panic!("Queue is not initialized"); + } + } + + fn handle_alarm(ctx: *mut ()) { + let this = unsafe { (ctx as *const Self).as_ref().unwrap() }; + + this.check_initialized(); + this.inner.lock(|inner| inner.borrow_mut().handle_alarm()); + } +} + +/// TODO: Doc +pub unsafe fn initialize() { + extern "Rust" { + fn _embassy_time_generic_queue_initialize(); + } + + _embassy_time_generic_queue_initialize(); +} + +/// TODO: Doc +#[macro_export] +macro_rules! generic_queue { + (static $name:ident: $t: ty = $val:expr) => { + static $name: $t = $val; + + #[no_mangle] + fn _embassy_time_generic_queue_initialize() { + unsafe { + $crate::queue::Queue::initialize(&$name); + } + } + + #[no_mangle] + fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) { + $crate::queue::Queue::schedule(&$name, at, waker); + } + }; +} + +#[cfg(feature = "generic-queue")] +generic_queue!(static QUEUE: Queue = Queue::new()); From ba6e452cc5d6c33029f34d7cfb5cd5ea846979bd Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Tue, 20 Sep 2022 20:23:56 +0300 Subject: [PATCH 02/11] Documentation and initial testing framework Add mock waker First simple test Tests & documentation --- embassy-time/Cargo.toml | 6 +- embassy-time/src/lib.rs | 2 +- embassy-time/src/queue.rs | 486 +++++++++++++++++++++++++++++++++++--- 3 files changed, 459 insertions(+), 35 deletions(-) diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml index 0e3391d1f..4fbf97f0d 100644 --- a/embassy-time/Cargo.toml +++ b/embassy-time/Cargo.toml @@ -26,7 +26,8 @@ unstable-traits = ["embedded-hal-1"] # To use this you must have a time driver provided. defmt-timestamp-uptime = ["defmt"] -# TODO: Doc +# Create a global queue that can be used with any executor +# To use this you must have a time driver provided. generic-queue = [] # Set the `embassy_time` tick rate. @@ -124,3 +125,6 @@ heapless = "0.7" wasm-bindgen = { version = "0.2.81", optional = true } js-sys = { version = "0.3", optional = true } wasm-timer = { version = "0.2.5", optional = true } + +[dev-dependencies] +serial_test = "0.9" diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs index 0457a6571..50f437baf 100644 --- a/embassy-time/src/lib.rs +++ b/embassy-time/src/lib.rs @@ -1,4 +1,4 @@ -#![cfg_attr(not(any(feature = "std", feature = "wasm")), no_std)] +#![cfg_attr(not(any(feature = "std", feature = "wasm", test)), no_std)] #![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))] #![doc = include_str!("../README.md")] #![allow(clippy::new_without_default)] diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs index 7e84090b1..56ad5af8d 100644 --- a/embassy-time/src/queue.rs +++ b/embassy-time/src/queue.rs @@ -1,9 +1,53 @@ //! Generic timer queue implementation -use core::cell::RefCell; +//! +//! This module provides a timer queue that works with any executor. +//! +//! In terms of performance, this queue will likely be less efficient in comparison to executor-native queues, +//! like the one provided with e.g. the `embassy-executor` crate. +//! +//! # Enabling the queue +//! - Enable the Cargo feature `generic-queue`. This will automatically instantiate the queue. +//! +//! # Initializing the queue +//! - Call ```unsafe { embassy_time::queue::initialize(); }``` early on in your program, before any of your futures that utilize `embassy-time` are polled. +//! +//! # Customizing the queue +//! - It is possible to customize two aspects of the queue: +//! - Queue size: +//! By default, the queue can hold up to 128 timer schedules and their corresponding wakers. While it will not crash if more timer schedules are added, +//! the performance will degrade, as one of the already added wakers will be awoken, thus making room for the new timer schedule and its waker. +//! - The mutex (i.e. the [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation) utilized by the queue: +//! By default, the utilized [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation is [`CriticalSectionRawMutex`](embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex) +//! which is provided by the `critical-section` crate. This should work just fine, except in a few niche cases like running on +//! top of an RTOS which provides a [`Driver`](crate::driver::Driver) implementation that will call-back directly from an ISR. As the +//! `critical-section` implementation for RTOS-es will likely provide an RTOS mutex which cannot be locked from an ISR, user needs to instead +//! configure the queue with a "disable-all-interrupts" style of mutex. +//! - To customize any of these queue aspects, don't enable the `generic-queue` Cargo feature and instead instantiate the queue with the [`generic_queue`](crate::generic_queue) +//! macro, as per the example below. +//! +//! +//! # Example +//! +//! ```ignore +//! use embassy_time::queue::Queue; +//! +//! // You only need to invoke this macro in case you need to customize the queue. +//! // +//! // Otherwise, just depend on the `embassy-time` crate with feature `generic-queue` enabled, +//! // and the queue instantiation will be done for you behind the scenes. +//! embassy_time::generic_queue!(static QUEUE: Queue<200, MyCustomRawMutex> = Queue::new()); +//! +//! fn main() { +//! unsafe { +//! embassy_time::queue::initialize(); +//! } +//! } +//! ``` +use core::cell::{Cell, RefCell}; use core::cmp::Ordering; use core::task::Waker; -use atomic_polyfill::{AtomicBool, Ordering as AtomicOrdering}; +use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; use embassy_sync::blocking_mutex::Mutex; use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; @@ -40,7 +84,6 @@ impl Ord for Timer { struct InnerQueue { queue: SortedLinkedList, alarm_at: Instant, - alarm: Option, } impl InnerQueue { @@ -48,11 +91,10 @@ impl InnerQueue { Self { queue: SortedLinkedList::new_u8(), alarm_at: Instant::MAX, - alarm: None, } } - fn schedule(&mut self, at: Instant, waker: &Waker) { + fn schedule(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { self.queue .find_mut(|timer| timer.waker.will_wake(waker)) .map(|mut timer| { @@ -81,90 +123,128 @@ impl InnerQueue { // dispatch all timers that are already due // // Then update the alarm if necessary - self.dispatch(); + self.dispatch(alarm_schedule); } - fn dispatch(&mut self) { + fn dispatch(&mut self, alarm_schedule: &AtomicU64) { let now = Instant::now(); while self.queue.peek().filter(|timer| timer.at <= now).is_some() { self.queue.pop().unwrap().waker.wake(); } - self.update_alarm(); + self.update_alarm(alarm_schedule); } - fn update_alarm(&mut self) { + fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { if let Some(timer) = self.queue.peek() { let new_at = timer.at; if self.alarm_at != new_at { self.alarm_at = new_at; - set_alarm(self.alarm.unwrap(), new_at.as_ticks()); + alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); } } else { self.alarm_at = Instant::MAX; + alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); } } - fn handle_alarm(&mut self) { + fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { self.alarm_at = Instant::MAX; - self.dispatch(); + self.dispatch(alarm_schedule); } } -/// TODO: Doc +/// The generic queue implementation pub struct Queue { - initialized: AtomicBool, inner: Mutex>>, + alarm: Cell>, + alarm_schedule: AtomicU64, } impl Queue { - /// TODO: Doc + /// Create a Queue pub const fn new() -> Self { Self { - initialized: AtomicBool::new(false), inner: Mutex::new(RefCell::new(InnerQueue::::new())), + alarm: Cell::new(None), + alarm_schedule: AtomicU64::new(u64::MAX), } } - /// TODO: Doc + /// Initialize the queue + /// + /// This method is called from [`initialize`](crate::queue::initialize), so you are not expected to call it directly. + /// Call [`initialize`](crate::queue::initialize) instead. + /// + /// # Safety + /// It is UB call this function more than once, or to call it after any of your + /// futures that use `embassy-time` are polled already. pub unsafe fn initialize(&'static self) { - if self.initialized.load(AtomicOrdering::SeqCst) { - panic!("Queue already initialized"); + if self.alarm.get().is_some() { + panic!("Queue is already initialized"); } let handle = allocate_alarm().unwrap(); - self.inner.lock(|inner| inner.borrow_mut().alarm = Some(handle)); + self.alarm.set(Some(handle)); - set_alarm_callback(handle, Self::handle_alarm, self as *const _ as _); - - self.initialized.store(true, AtomicOrdering::SeqCst); + set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); } - /// TODO: Doc + /// Schedule a new waker to be awoken at moment `at` + /// + /// This method is called internally by [`embassy-time`](crate), so you are not expected to call it directly. pub fn schedule(&'static self, at: Instant, waker: &Waker) { self.check_initialized(); - self.inner.lock(|inner| inner.borrow_mut().schedule(at, waker)); + self.inner + .lock(|inner| inner.borrow_mut().schedule(at, waker, &self.alarm_schedule)); + + self.update_alarm(); } fn check_initialized(&self) { - if !self.initialized.load(AtomicOrdering::SeqCst) { - panic!("Queue is not initialized"); + if self.alarm.get().is_none() { + panic!("Queue is not initialized yet"); } } - fn handle_alarm(ctx: *mut ()) { - let this = unsafe { (ctx as *const Self).as_ref().unwrap() }; + fn update_alarm(&self) { + // Need to set the alarm when we are *not* holding the mutex on the inner queue + // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately + // call us back if the timestamp is in the past. + let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); - this.check_initialized(); - this.inner.lock(|inner| inner.borrow_mut().handle_alarm()); + if alarm_at < u64::MAX { + set_alarm(self.alarm.get().unwrap(), alarm_at); + } + } + + fn handle_alarm(&self) { + self.check_initialized(); + self.inner + .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); + + self.update_alarm(); + } + + fn handle_alarm_callback(ctx: *mut ()) { + unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm(); } } -/// TODO: Doc +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + +/// Initialize the queue +/// +/// Call this function early on in your program, before any of your futures that utilize `embassy-time` are polled. +/// +/// # Safety +/// It is UB call this function more than once, or to call it after any of your +/// futures that use `embassy-time` are polled already. pub unsafe fn initialize() { extern "Rust" { fn _embassy_time_generic_queue_initialize(); @@ -173,7 +253,12 @@ pub unsafe fn initialize() { _embassy_time_generic_queue_initialize(); } -/// TODO: Doc +/// Instantiates a global, generic (as in executor-agnostic) timer queue. +/// +/// Unless you plan to customize the queue (size or mutex), prefer +/// instantiating the queue via the `generic-queue` feature. +/// +/// See the module documentation for an example. #[macro_export] macro_rules! generic_queue { (static $name:ident: $t: ty = $val:expr) => { @@ -195,3 +280,338 @@ macro_rules! generic_queue { #[cfg(feature = "generic-queue")] generic_queue!(static QUEUE: Queue = Queue::new()); + +#[cfg(test)] +mod tests { + use core::cell::Cell; + use core::sync::atomic::Ordering; + use core::task::{RawWaker, RawWakerVTable, Waker}; + use std::rc::Rc; + use std::sync::Mutex; + + use embassy_sync::blocking_mutex::raw::RawMutex; + use serial_test::serial; + + use super::InnerQueue; + use crate::driver::{AlarmHandle, Driver}; + use crate::Instant; + + struct InnerTestDriver { + now: u64, + alarm: u64, + callback: fn(*mut ()), + ctx: *mut (), + } + + impl InnerTestDriver { + const fn new() -> Self { + Self { + now: 0, + alarm: u64::MAX, + callback: Self::noop, + ctx: core::ptr::null_mut(), + } + } + + fn noop(_ctx: *mut ()) {} + } + + unsafe impl Send for InnerTestDriver {} + + struct TestDriver(Mutex); + + impl TestDriver { + const fn new() -> Self { + Self(Mutex::new(InnerTestDriver::new())) + } + + fn reset(&self) { + *self.0.lock().unwrap() = InnerTestDriver::new(); + } + + fn set_now(&self, now: u64) { + let notify = { + let mut inner = self.0.lock().unwrap(); + + if inner.now < now { + inner.now = now; + + if inner.alarm <= now { + inner.alarm = u64::MAX; + + Some((inner.callback, inner.ctx)) + } else { + None + } + } else { + panic!("Going back in time?"); + } + }; + + if let Some((callback, ctx)) = notify { + (callback)(ctx); + } + } + } + + impl Driver for TestDriver { + fn now(&self) -> u64 { + self.0.lock().unwrap().now + } + + unsafe fn allocate_alarm(&self) -> Option { + Some(AlarmHandle::new(0)) + } + + fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { + let mut inner = self.0.lock().unwrap(); + + inner.callback = callback; + inner.ctx = ctx; + } + + fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { + let notify = { + let mut inner = self.0.lock().unwrap(); + + if timestamp <= inner.now { + Some((inner.callback, inner.ctx)) + } else { + inner.alarm = timestamp; + None + } + }; + + if let Some((callback, ctx)) = notify { + (callback)(ctx); + } + } + } + + struct TestWaker { + pub awoken: Rc>, + pub waker: Waker, + } + + impl TestWaker { + fn new() -> Self { + let flag = Rc::new(Cell::new(false)); + + const VTABLE: RawWakerVTable = RawWakerVTable::new( + |data: *const ()| { + unsafe { + Rc::increment_strong_count(data as *const Cell); + } + + RawWaker::new(data as _, &VTABLE) + }, + |data: *const ()| unsafe { + let data = data as *const Cell; + data.as_ref().unwrap().set(true); + Rc::decrement_strong_count(data); + }, + |data: *const ()| unsafe { + (data as *const Cell).as_ref().unwrap().set(true); + }, + |data: *const ()| unsafe { + Rc::decrement_strong_count(data); + }, + ); + + let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE); + + Self { + awoken: flag.clone(), + waker: unsafe { Waker::from_raw(raw) }, + } + } + } + + // TODO: This impl should be part of `embassy-sync`, hidden behind the "std" feature gate + pub struct StdRawMutex(std::sync::Mutex<()>); + + unsafe impl RawMutex for StdRawMutex { + const INIT: Self = StdRawMutex(std::sync::Mutex::new(())); + + fn lock(&self, f: impl FnOnce() -> R) -> R { + let _guard = self.0.lock().unwrap(); + + f() + } + } + + const QUEUE_MAX_LEN: usize = 8; + + crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new()); + crate::generic_queue!(static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new()); + + fn setup() { + DRIVER.reset(); + + QUEUE.alarm.set(None); + QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); + QUEUE.inner.lock(|inner| { + *inner.borrow_mut() = InnerQueue::new(); + }); + + unsafe { super::initialize() }; + } + + fn queue_len() -> usize { + QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count()) + } + + #[test] + #[serial] + #[should_panic(expected = "Queue is not initialized yet")] + fn test_not_initialized() { + static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + } + + #[test] + #[serial] + fn test_initialized() { + static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); + + assert!(QUEUE.alarm.get().is_none()); + + unsafe { QUEUE.initialize() }; + + assert!(QUEUE.alarm.get().is_some()); + } + + #[test] + #[serial] + #[should_panic(expected = "Queue is already initialized")] + fn test_already_initialized() { + static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); + + unsafe { QUEUE.initialize() }; + + assert!(QUEUE.alarm.get().is_some()); + + unsafe { QUEUE.initialize() }; + } + + #[test] + #[serial] + fn test_schedule() { + setup(); + + assert_eq!(queue_len(), 0); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + + assert!(!waker.awoken.get()); + assert_eq!(queue_len(), 1); + } + + #[test] + #[serial] + fn test_schedule_same() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + + assert_eq!(queue_len(), 1); + + QUEUE.schedule(Instant::from_secs(1), &waker.waker); + + assert_eq!(queue_len(), 1); + + QUEUE.schedule(Instant::from_secs(100), &waker.waker); + + assert_eq!(queue_len(), 1); + + let waker2 = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(100), &waker2.waker); + + assert_eq!(queue_len(), 2); + } + + #[test] + #[serial] + fn test_trigger() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(100), &waker.waker); + + assert!(!waker.awoken.get()); + + DRIVER.set_now(Instant::from_secs(99).as_ticks()); + + assert!(!waker.awoken.get()); + + assert_eq!(queue_len(), 1); + + DRIVER.set_now(Instant::from_secs(100).as_ticks()); + + assert!(waker.awoken.get()); + + assert_eq!(queue_len(), 0); + } + + #[test] + #[serial] + fn test_immediate_trigger() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(100), &waker.waker); + + DRIVER.set_now(Instant::from_secs(50).as_ticks()); + + let waker2 = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(40), &waker2.waker); + + assert!(!waker.awoken.get()); + assert!(waker2.awoken.get()); + assert_eq!(queue_len(), 1); + } + + #[test] + #[serial] + fn test_queue_overflow() { + setup(); + + for i in 1..QUEUE_MAX_LEN { + let waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(310), &waker.waker); + + assert_eq!(queue_len(), i); + assert!(!waker.awoken.get()); + } + + let first_waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(300), &first_waker.waker); + + assert_eq!(queue_len(), QUEUE_MAX_LEN); + assert!(!first_waker.awoken.get()); + + let second_waker = TestWaker::new(); + + QUEUE.schedule(Instant::from_secs(305), &second_waker.waker); + + assert_eq!(queue_len(), QUEUE_MAX_LEN); + assert!(first_waker.awoken.get()); + + QUEUE.schedule(Instant::from_secs(320), &TestWaker::new().waker); + assert_eq!(queue_len(), QUEUE_MAX_LEN); + assert!(second_waker.awoken.get()); + } +} From 53608a87ac4b6c8c60b5508551d12f5ba76ca2f6 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 26 Sep 2022 13:46:15 +0300 Subject: [PATCH 03/11] Address feedback after code review --- embassy-time/Cargo.toml | 16 +- embassy-time/src/lib.rs | 2 + embassy-time/src/queue.rs | 613 ++---------------------------- embassy-time/src/queue_generic.rs | 474 +++++++++++++++++++++++ 4 files changed, 518 insertions(+), 587 deletions(-) create mode 100644 embassy-time/src/queue_generic.rs diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml index 4fbf97f0d..e1ad4b9dd 100644 --- a/embassy-time/Cargo.toml +++ b/embassy-time/Cargo.toml @@ -26,10 +26,22 @@ unstable-traits = ["embedded-hal-1"] # To use this you must have a time driver provided. defmt-timestamp-uptime = ["defmt"] -# Create a global queue that can be used with any executor +# Create a global, generic queue that can be used with any executor # To use this you must have a time driver provided. generic-queue = [] +# Set the number of timers for the generic queue. +# +# At most 1 `generic-queue-*` feature can be enabled. If none is enabled, a default of 64 timers is used. +# +# When using embassy-time from libraries, you should *not* enable any `generic-queue-*` feature, to allow the +# end user to pick. +generic-queue-8 = ["generic-queue"] +generic-queue-16 = ["generic-queue"] +generic-queue-32 = ["generic-queue"] +generic-queue-64 = ["generic-queue"] +generic-queue-128 = ["generic-queue"] + # Set the `embassy_time` tick rate. # # At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. @@ -128,3 +140,5 @@ wasm-timer = { version = "0.2.5", optional = true } [dev-dependencies] serial_test = "0.9" +critical-section = { version = "1.1", features = ["std"] } + diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs index 50f437baf..586aa28de 100644 --- a/embassy-time/src/lib.rs +++ b/embassy-time/src/lib.rs @@ -19,6 +19,8 @@ mod timer; mod driver_std; #[cfg(feature = "wasm")] mod driver_wasm; +#[cfg(feature = "generic-queue")] +mod queue_generic; pub use delay::{block_for, Delay}; pub use duration::Duration; diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs index 56ad5af8d..c6f8b440a 100644 --- a/embassy-time/src/queue.rs +++ b/embassy-time/src/queue.rs @@ -1,617 +1,58 @@ -//! Generic timer queue implementation +//! Timer queue implementation //! -//! This module provides a timer queue that works with any executor. +//! This module defines the interface a timer queue needs to implement to power the `embassy_time` module. //! -//! In terms of performance, this queue will likely be less efficient in comparison to executor-native queues, -//! like the one provided with e.g. the `embassy-executor` crate. +//! # Implementing a timer queue //! -//! # Enabling the queue -//! - Enable the Cargo feature `generic-queue`. This will automatically instantiate the queue. +//! - Define a struct `MyTimerQueue` +//! - Implement [`TimerQueue`] for it +//! - Register it as the global timer queue with [`timer_queue_impl`](crate::timer_queue_impl). //! -//! # Initializing the queue -//! - Call ```unsafe { embassy_time::queue::initialize(); }``` early on in your program, before any of your futures that utilize `embassy-time` are polled. +//! # Linkage details //! -//! # Customizing the queue -//! - It is possible to customize two aspects of the queue: -//! - Queue size: -//! By default, the queue can hold up to 128 timer schedules and their corresponding wakers. While it will not crash if more timer schedules are added, -//! the performance will degrade, as one of the already added wakers will be awoken, thus making room for the new timer schedule and its waker. -//! - The mutex (i.e. the [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation) utilized by the queue: -//! By default, the utilized [`RawMutex`](embassy_sync::blocking_mutex::raw::RawMutex) implementation is [`CriticalSectionRawMutex`](embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex) -//! which is provided by the `critical-section` crate. This should work just fine, except in a few niche cases like running on -//! top of an RTOS which provides a [`Driver`](crate::driver::Driver) implementation that will call-back directly from an ISR. As the -//! `critical-section` implementation for RTOS-es will likely provide an RTOS mutex which cannot be locked from an ISR, user needs to instead -//! configure the queue with a "disable-all-interrupts" style of mutex. -//! - To customize any of these queue aspects, don't enable the `generic-queue` Cargo feature and instead instantiate the queue with the [`generic_queue`](crate::generic_queue) -//! macro, as per the example below. +//! Check the documentation of the [`driver`](crate::driver) module for more information. //! +//! Similarly to driver, if there is none or multiple timer queues in the crate tree, linking will fail. //! //! # Example //! -//! ```ignore -//! use embassy_time::queue::Queue; +//! ``` +//! use core::task::Waker; //! -//! // You only need to invoke this macro in case you need to customize the queue. -//! // -//! // Otherwise, just depend on the `embassy-time` crate with feature `generic-queue` enabled, -//! // and the queue instantiation will be done for you behind the scenes. -//! embassy_time::generic_queue!(static QUEUE: Queue<200, MyCustomRawMutex> = Queue::new()); +//! use embassy_time::Instant; +//! use embassy_time::queue::{TimerQueue}; //! -//! fn main() { -//! unsafe { -//! embassy_time::queue::initialize(); +//! struct MyTimerQueue{}; // not public! +//! embassy_time::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{}); +//! +//! impl TimerQueue for MyTimerQueue { +//! fn schedule_wake(&'static self, at: Instant, waker: &Waker) { +//! todo!() //! } //! } //! ``` -use core::cell::{Cell, RefCell}; -use core::cmp::Ordering; use core::task::Waker; -use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; -use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; -use embassy_sync::blocking_mutex::Mutex; -use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; - -use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; use crate::Instant; -#[derive(Debug)] -struct Timer { - at: Instant, - waker: Waker, +/// Timer queue +pub trait TimerQueue { + /// Schedules a waker in the queue to be awoken at moment `at`. + /// If this moment is in the past, the waker might be awoken immediately. + fn schedule_wake(&'static self, at: Instant, waker: &Waker); } -impl PartialEq for Timer { - fn eq(&self, other: &Self) -> bool { - self.at == other.at - } -} - -impl Eq for Timer {} - -impl PartialOrd for Timer { - fn partial_cmp(&self, other: &Self) -> Option { - self.at.partial_cmp(&other.at) - } -} - -impl Ord for Timer { - fn cmp(&self, other: &Self) -> Ordering { - self.at.cmp(&other.at) - } -} - -struct InnerQueue { - queue: SortedLinkedList, - alarm_at: Instant, -} - -impl InnerQueue { - const fn new() -> Self { - Self { - queue: SortedLinkedList::new_u8(), - alarm_at: Instant::MAX, - } - } - - fn schedule(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { - self.queue - .find_mut(|timer| timer.waker.will_wake(waker)) - .map(|mut timer| { - timer.waker = waker.clone(); - timer.at = at; - - timer.finish(); - }) - .unwrap_or_else(|| { - let mut timer = Timer { - waker: waker.clone(), - at, - }; - - loop { - match self.queue.push(timer) { - Ok(()) => break, - Err(e) => timer = e, - } - - self.queue.pop().unwrap().waker.wake(); - } - }); - - // Don't wait for the alarm callback to trigger and directly - // dispatch all timers that are already due - // - // Then update the alarm if necessary - self.dispatch(alarm_schedule); - } - - fn dispatch(&mut self, alarm_schedule: &AtomicU64) { - let now = Instant::now(); - - while self.queue.peek().filter(|timer| timer.at <= now).is_some() { - self.queue.pop().unwrap().waker.wake(); - } - - self.update_alarm(alarm_schedule); - } - - fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { - if let Some(timer) = self.queue.peek() { - let new_at = timer.at; - - if self.alarm_at != new_at { - self.alarm_at = new_at; - alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); - } - } else { - self.alarm_at = Instant::MAX; - alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); - } - } - - fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { - self.alarm_at = Instant::MAX; - - self.dispatch(alarm_schedule); - } -} - -/// The generic queue implementation -pub struct Queue { - inner: Mutex>>, - alarm: Cell>, - alarm_schedule: AtomicU64, -} - -impl Queue { - /// Create a Queue - pub const fn new() -> Self { - Self { - inner: Mutex::new(RefCell::new(InnerQueue::::new())), - alarm: Cell::new(None), - alarm_schedule: AtomicU64::new(u64::MAX), - } - } - - /// Initialize the queue - /// - /// This method is called from [`initialize`](crate::queue::initialize), so you are not expected to call it directly. - /// Call [`initialize`](crate::queue::initialize) instead. - /// - /// # Safety - /// It is UB call this function more than once, or to call it after any of your - /// futures that use `embassy-time` are polled already. - pub unsafe fn initialize(&'static self) { - if self.alarm.get().is_some() { - panic!("Queue is already initialized"); - } - - let handle = allocate_alarm().unwrap(); - self.alarm.set(Some(handle)); - - set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); - } - - /// Schedule a new waker to be awoken at moment `at` - /// - /// This method is called internally by [`embassy-time`](crate), so you are not expected to call it directly. - pub fn schedule(&'static self, at: Instant, waker: &Waker) { - self.check_initialized(); - - self.inner - .lock(|inner| inner.borrow_mut().schedule(at, waker, &self.alarm_schedule)); - - self.update_alarm(); - } - - fn check_initialized(&self) { - if self.alarm.get().is_none() { - panic!("Queue is not initialized yet"); - } - } - - fn update_alarm(&self) { - // Need to set the alarm when we are *not* holding the mutex on the inner queue - // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately - // call us back if the timestamp is in the past. - let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); - - if alarm_at < u64::MAX { - set_alarm(self.alarm.get().unwrap(), alarm_at); - } - } - - fn handle_alarm(&self) { - self.check_initialized(); - self.inner - .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); - - self.update_alarm(); - } - - fn handle_alarm_callback(ctx: *mut ()) { - unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm(); - } -} - -unsafe impl Send for Queue {} -unsafe impl Sync for Queue {} - -/// Initialize the queue -/// -/// Call this function early on in your program, before any of your futures that utilize `embassy-time` are polled. -/// -/// # Safety -/// It is UB call this function more than once, or to call it after any of your -/// futures that use `embassy-time` are polled already. -pub unsafe fn initialize() { - extern "Rust" { - fn _embassy_time_generic_queue_initialize(); - } - - _embassy_time_generic_queue_initialize(); -} - -/// Instantiates a global, generic (as in executor-agnostic) timer queue. -/// -/// Unless you plan to customize the queue (size or mutex), prefer -/// instantiating the queue via the `generic-queue` feature. +/// Set the TimerQueue implementation. /// /// See the module documentation for an example. #[macro_export] -macro_rules! generic_queue { +macro_rules! timer_queue_impl { (static $name:ident: $t: ty = $val:expr) => { static $name: $t = $val; - #[no_mangle] - fn _embassy_time_generic_queue_initialize() { - unsafe { - $crate::queue::Queue::initialize(&$name); - } - } - #[no_mangle] fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) { - $crate::queue::Queue::schedule(&$name, at, waker); + <$t as $crate::queue::TimerQueue>::schedule_wake(&$name, at, waker); } }; } - -#[cfg(feature = "generic-queue")] -generic_queue!(static QUEUE: Queue = Queue::new()); - -#[cfg(test)] -mod tests { - use core::cell::Cell; - use core::sync::atomic::Ordering; - use core::task::{RawWaker, RawWakerVTable, Waker}; - use std::rc::Rc; - use std::sync::Mutex; - - use embassy_sync::blocking_mutex::raw::RawMutex; - use serial_test::serial; - - use super::InnerQueue; - use crate::driver::{AlarmHandle, Driver}; - use crate::Instant; - - struct InnerTestDriver { - now: u64, - alarm: u64, - callback: fn(*mut ()), - ctx: *mut (), - } - - impl InnerTestDriver { - const fn new() -> Self { - Self { - now: 0, - alarm: u64::MAX, - callback: Self::noop, - ctx: core::ptr::null_mut(), - } - } - - fn noop(_ctx: *mut ()) {} - } - - unsafe impl Send for InnerTestDriver {} - - struct TestDriver(Mutex); - - impl TestDriver { - const fn new() -> Self { - Self(Mutex::new(InnerTestDriver::new())) - } - - fn reset(&self) { - *self.0.lock().unwrap() = InnerTestDriver::new(); - } - - fn set_now(&self, now: u64) { - let notify = { - let mut inner = self.0.lock().unwrap(); - - if inner.now < now { - inner.now = now; - - if inner.alarm <= now { - inner.alarm = u64::MAX; - - Some((inner.callback, inner.ctx)) - } else { - None - } - } else { - panic!("Going back in time?"); - } - }; - - if let Some((callback, ctx)) = notify { - (callback)(ctx); - } - } - } - - impl Driver for TestDriver { - fn now(&self) -> u64 { - self.0.lock().unwrap().now - } - - unsafe fn allocate_alarm(&self) -> Option { - Some(AlarmHandle::new(0)) - } - - fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { - let mut inner = self.0.lock().unwrap(); - - inner.callback = callback; - inner.ctx = ctx; - } - - fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { - let notify = { - let mut inner = self.0.lock().unwrap(); - - if timestamp <= inner.now { - Some((inner.callback, inner.ctx)) - } else { - inner.alarm = timestamp; - None - } - }; - - if let Some((callback, ctx)) = notify { - (callback)(ctx); - } - } - } - - struct TestWaker { - pub awoken: Rc>, - pub waker: Waker, - } - - impl TestWaker { - fn new() -> Self { - let flag = Rc::new(Cell::new(false)); - - const VTABLE: RawWakerVTable = RawWakerVTable::new( - |data: *const ()| { - unsafe { - Rc::increment_strong_count(data as *const Cell); - } - - RawWaker::new(data as _, &VTABLE) - }, - |data: *const ()| unsafe { - let data = data as *const Cell; - data.as_ref().unwrap().set(true); - Rc::decrement_strong_count(data); - }, - |data: *const ()| unsafe { - (data as *const Cell).as_ref().unwrap().set(true); - }, - |data: *const ()| unsafe { - Rc::decrement_strong_count(data); - }, - ); - - let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE); - - Self { - awoken: flag.clone(), - waker: unsafe { Waker::from_raw(raw) }, - } - } - } - - // TODO: This impl should be part of `embassy-sync`, hidden behind the "std" feature gate - pub struct StdRawMutex(std::sync::Mutex<()>); - - unsafe impl RawMutex for StdRawMutex { - const INIT: Self = StdRawMutex(std::sync::Mutex::new(())); - - fn lock(&self, f: impl FnOnce() -> R) -> R { - let _guard = self.0.lock().unwrap(); - - f() - } - } - - const QUEUE_MAX_LEN: usize = 8; - - crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new()); - crate::generic_queue!(static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new()); - - fn setup() { - DRIVER.reset(); - - QUEUE.alarm.set(None); - QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); - QUEUE.inner.lock(|inner| { - *inner.borrow_mut() = InnerQueue::new(); - }); - - unsafe { super::initialize() }; - } - - fn queue_len() -> usize { - QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count()) - } - - #[test] - #[serial] - #[should_panic(expected = "Queue is not initialized yet")] - fn test_not_initialized() { - static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); - - let waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(1), &waker.waker); - } - - #[test] - #[serial] - fn test_initialized() { - static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); - - assert!(QUEUE.alarm.get().is_none()); - - unsafe { QUEUE.initialize() }; - - assert!(QUEUE.alarm.get().is_some()); - } - - #[test] - #[serial] - #[should_panic(expected = "Queue is already initialized")] - fn test_already_initialized() { - static QUEUE: super::Queue<{ QUEUE_MAX_LEN }, StdRawMutex> = super::Queue::new(); - - unsafe { QUEUE.initialize() }; - - assert!(QUEUE.alarm.get().is_some()); - - unsafe { QUEUE.initialize() }; - } - - #[test] - #[serial] - fn test_schedule() { - setup(); - - assert_eq!(queue_len(), 0); - - let waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(1), &waker.waker); - - assert!(!waker.awoken.get()); - assert_eq!(queue_len(), 1); - } - - #[test] - #[serial] - fn test_schedule_same() { - setup(); - - let waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(1), &waker.waker); - - assert_eq!(queue_len(), 1); - - QUEUE.schedule(Instant::from_secs(1), &waker.waker); - - assert_eq!(queue_len(), 1); - - QUEUE.schedule(Instant::from_secs(100), &waker.waker); - - assert_eq!(queue_len(), 1); - - let waker2 = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(100), &waker2.waker); - - assert_eq!(queue_len(), 2); - } - - #[test] - #[serial] - fn test_trigger() { - setup(); - - let waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(100), &waker.waker); - - assert!(!waker.awoken.get()); - - DRIVER.set_now(Instant::from_secs(99).as_ticks()); - - assert!(!waker.awoken.get()); - - assert_eq!(queue_len(), 1); - - DRIVER.set_now(Instant::from_secs(100).as_ticks()); - - assert!(waker.awoken.get()); - - assert_eq!(queue_len(), 0); - } - - #[test] - #[serial] - fn test_immediate_trigger() { - setup(); - - let waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(100), &waker.waker); - - DRIVER.set_now(Instant::from_secs(50).as_ticks()); - - let waker2 = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(40), &waker2.waker); - - assert!(!waker.awoken.get()); - assert!(waker2.awoken.get()); - assert_eq!(queue_len(), 1); - } - - #[test] - #[serial] - fn test_queue_overflow() { - setup(); - - for i in 1..QUEUE_MAX_LEN { - let waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(310), &waker.waker); - - assert_eq!(queue_len(), i); - assert!(!waker.awoken.get()); - } - - let first_waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(300), &first_waker.waker); - - assert_eq!(queue_len(), QUEUE_MAX_LEN); - assert!(!first_waker.awoken.get()); - - let second_waker = TestWaker::new(); - - QUEUE.schedule(Instant::from_secs(305), &second_waker.waker); - - assert_eq!(queue_len(), QUEUE_MAX_LEN); - assert!(first_waker.awoken.get()); - - QUEUE.schedule(Instant::from_secs(320), &TestWaker::new().waker); - assert_eq!(queue_len(), QUEUE_MAX_LEN); - assert!(second_waker.awoken.get()); - } -} diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs new file mode 100644 index 000000000..1c4e5398b --- /dev/null +++ b/embassy-time/src/queue_generic.rs @@ -0,0 +1,474 @@ +use core::cell::RefCell; +use core::cmp::Ordering; +use core::task::Waker; + +use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::blocking_mutex::Mutex; +use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; + +use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; +use crate::queue::TimerQueue; +use crate::Instant; + +#[cfg(feature = "generic-queue-8")] +const QUEUE_SIZE: usize = 8; +#[cfg(feature = "generic-queue-16")] +const QUEUE_SIZE: usize = 16; +#[cfg(feature = "generic-queue-32")] +const QUEUE_SIZE: usize = 32; +#[cfg(feature = "generic-queue-64")] +const QUEUE_SIZE: usize = 32; +#[cfg(feature = "generic-queue-128")] +const QUEUE_SIZE: usize = 128; +#[cfg(not(any( + feature = "generic-queue-8", + feature = "generic-queue-16", + feature = "generic-queue-32", + feature = "generic-queue-64", + feature = "generic-queue-128" +)))] +const QUEUE_SIZE: usize = 64; + +#[derive(Debug)] +struct Timer { + at: Instant, + waker: Waker, +} + +impl PartialEq for Timer { + fn eq(&self, other: &Self) -> bool { + self.at == other.at + } +} + +impl Eq for Timer {} + +impl PartialOrd for Timer { + fn partial_cmp(&self, other: &Self) -> Option { + self.at.partial_cmp(&other.at) + } +} + +impl Ord for Timer { + fn cmp(&self, other: &Self) -> Ordering { + self.at.cmp(&other.at) + } +} + +struct InnerQueue { + queue: SortedLinkedList, + alarm: Option, + alarm_at: Instant, +} + +impl InnerQueue { + const fn new() -> Self { + Self { + queue: SortedLinkedList::new_u8(), + alarm: None, + alarm_at: Instant::MAX, + } + } + + fn schedule_wake(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { + self.queue + .find_mut(|timer| timer.waker.will_wake(waker)) + .map(|mut timer| { + timer.at = at; + timer.finish(); + }) + .unwrap_or_else(|| { + let mut timer = Timer { + waker: waker.clone(), + at, + }; + + loop { + match self.queue.push(timer) { + Ok(()) => break, + Err(e) => timer = e, + } + + self.queue.pop().unwrap().waker.wake(); + } + }); + + // Don't wait for the alarm callback to trigger and directly + // dispatch all timers that are already due + // + // Then update the alarm if necessary + self.dispatch(alarm_schedule); + } + + fn dispatch(&mut self, alarm_schedule: &AtomicU64) { + let now = Instant::now(); + + while self.queue.peek().filter(|timer| timer.at <= now).is_some() { + self.queue.pop().unwrap().waker.wake(); + } + + self.update_alarm(alarm_schedule); + } + + fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { + if let Some(timer) = self.queue.peek() { + let new_at = timer.at; + + if self.alarm_at != new_at { + self.alarm_at = new_at; + alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); + } + } else { + self.alarm_at = Instant::MAX; + alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); + } + } + + fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { + self.alarm_at = Instant::MAX; + + self.dispatch(alarm_schedule); + } +} + +struct Queue { + inner: Mutex>, + alarm_schedule: AtomicU64, +} + +impl Queue { + const fn new() -> Self { + Self { + inner: Mutex::new(RefCell::new(InnerQueue::new())), + alarm_schedule: AtomicU64::new(u64::MAX), + } + } + + fn schedule_wake(&'static self, at: Instant, waker: &Waker) { + self.inner.lock(|inner| { + let mut inner = inner.borrow_mut(); + + if inner.alarm.is_none() { + let handle = unsafe { allocate_alarm() }.unwrap(); + inner.alarm = Some(handle); + + set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); + } + + inner.schedule_wake(at, waker, &self.alarm_schedule) + }); + + self.update_alarm(); + } + + fn update_alarm(&self) { + // Need to set the alarm when we are *not* holding the mutex on the inner queue + // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately + // call us back if the timestamp is in the past. + let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); + + if alarm_at < u64::MAX { + set_alarm(self.inner.lock(|inner| inner.borrow().alarm.unwrap()), alarm_at); + } + } + + fn handle_alarm(&self) { + self.inner + .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); + + self.update_alarm(); + } + + fn handle_alarm_callback(ctx: *mut ()) { + unsafe { (ctx as *const Self).as_ref().unwrap() }.handle_alarm(); + } +} + +impl TimerQueue for Queue { + fn schedule_wake(&'static self, at: Instant, waker: &Waker) { + Queue::schedule_wake(self, at, waker); + } +} + +crate::timer_queue_impl!(static QUEUE: Queue = Queue::new()); + +#[cfg(test)] +mod tests { + use core::cell::Cell; + use core::sync::atomic::Ordering; + use core::task::{RawWaker, RawWakerVTable, Waker}; + use std::rc::Rc; + use std::sync::Mutex; + + use serial_test::serial; + + use super::InnerQueue; + use crate::driver::{AlarmHandle, Driver}; + use crate::queue_generic::QUEUE; + use crate::Instant; + + struct InnerTestDriver { + now: u64, + alarm: u64, + callback: fn(*mut ()), + ctx: *mut (), + } + + impl InnerTestDriver { + const fn new() -> Self { + Self { + now: 0, + alarm: u64::MAX, + callback: Self::noop, + ctx: core::ptr::null_mut(), + } + } + + fn noop(_ctx: *mut ()) {} + } + + unsafe impl Send for InnerTestDriver {} + + struct TestDriver(Mutex); + + impl TestDriver { + const fn new() -> Self { + Self(Mutex::new(InnerTestDriver::new())) + } + + fn reset(&self) { + *self.0.lock().unwrap() = InnerTestDriver::new(); + } + + fn set_now(&self, now: u64) { + let notify = { + let mut inner = self.0.lock().unwrap(); + + if inner.now < now { + inner.now = now; + + if inner.alarm <= now { + inner.alarm = u64::MAX; + + Some((inner.callback, inner.ctx)) + } else { + None + } + } else { + panic!("Going back in time?"); + } + }; + + if let Some((callback, ctx)) = notify { + (callback)(ctx); + } + } + } + + impl Driver for TestDriver { + fn now(&self) -> u64 { + self.0.lock().unwrap().now + } + + unsafe fn allocate_alarm(&self) -> Option { + Some(AlarmHandle::new(0)) + } + + fn set_alarm_callback(&self, _alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { + let mut inner = self.0.lock().unwrap(); + + inner.callback = callback; + inner.ctx = ctx; + } + + fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { + let notify = { + let mut inner = self.0.lock().unwrap(); + + if timestamp <= inner.now { + Some((inner.callback, inner.ctx)) + } else { + inner.alarm = timestamp; + None + } + }; + + if let Some((callback, ctx)) = notify { + (callback)(ctx); + } + } + } + + struct TestWaker { + pub awoken: Rc>, + pub waker: Waker, + } + + impl TestWaker { + fn new() -> Self { + let flag = Rc::new(Cell::new(false)); + + const VTABLE: RawWakerVTable = RawWakerVTable::new( + |data: *const ()| { + unsafe { + Rc::increment_strong_count(data as *const Cell); + } + + RawWaker::new(data as _, &VTABLE) + }, + |data: *const ()| unsafe { + let data = data as *const Cell; + data.as_ref().unwrap().set(true); + Rc::decrement_strong_count(data); + }, + |data: *const ()| unsafe { + (data as *const Cell).as_ref().unwrap().set(true); + }, + |data: *const ()| unsafe { + Rc::decrement_strong_count(data); + }, + ); + + let raw = RawWaker::new(Rc::into_raw(flag.clone()) as _, &VTABLE); + + Self { + awoken: flag.clone(), + waker: unsafe { Waker::from_raw(raw) }, + } + } + } + + crate::time_driver_impl!(static DRIVER: TestDriver = TestDriver::new()); + + fn setup() { + DRIVER.reset(); + + QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); + QUEUE.inner.lock(|inner| { + *inner.borrow_mut() = InnerQueue::new(); + }); + } + + fn queue_len() -> usize { + QUEUE.inner.lock(|inner| inner.borrow().queue.iter().count()) + } + + #[test] + #[serial] + fn test_schedule() { + setup(); + + assert_eq!(queue_len(), 0); + + let waker = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); + + assert!(!waker.awoken.get()); + assert_eq!(queue_len(), 1); + } + + #[test] + #[serial] + fn test_schedule_same() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); + + assert_eq!(queue_len(), 1); + + QUEUE.schedule_wake(Instant::from_secs(1), &waker.waker); + + assert_eq!(queue_len(), 1); + + QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); + + assert_eq!(queue_len(), 1); + + let waker2 = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(100), &waker2.waker); + + assert_eq!(queue_len(), 2); + } + + #[test] + #[serial] + fn test_trigger() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); + + assert!(!waker.awoken.get()); + + DRIVER.set_now(Instant::from_secs(99).as_ticks()); + + assert!(!waker.awoken.get()); + + assert_eq!(queue_len(), 1); + + DRIVER.set_now(Instant::from_secs(100).as_ticks()); + + assert!(waker.awoken.get()); + + assert_eq!(queue_len(), 0); + } + + #[test] + #[serial] + fn test_immediate_trigger() { + setup(); + + let waker = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(100), &waker.waker); + + DRIVER.set_now(Instant::from_secs(50).as_ticks()); + + let waker2 = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(40), &waker2.waker); + + assert!(!waker.awoken.get()); + assert!(waker2.awoken.get()); + assert_eq!(queue_len(), 1); + } + + #[test] + #[serial] + fn test_queue_overflow() { + setup(); + + for i in 1..super::QUEUE_SIZE { + let waker = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(310), &waker.waker); + + assert_eq!(queue_len(), i); + assert!(!waker.awoken.get()); + } + + let first_waker = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(300), &first_waker.waker); + + assert_eq!(queue_len(), super::QUEUE_SIZE); + assert!(!first_waker.awoken.get()); + + let second_waker = TestWaker::new(); + + QUEUE.schedule_wake(Instant::from_secs(305), &second_waker.waker); + + assert_eq!(queue_len(), super::QUEUE_SIZE); + assert!(first_waker.awoken.get()); + + QUEUE.schedule_wake(Instant::from_secs(320), &TestWaker::new().waker); + assert_eq!(queue_len(), super::QUEUE_SIZE); + assert!(second_waker.awoken.get()); + } +} From 4d5550070fe5e80ff2296a71239c568c774b9ceb Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 24 Oct 2022 09:17:43 +0300 Subject: [PATCH 04/11] Change time Driver contract to never fire the alarm synchronously --- embassy-executor/src/raw/mod.rs | 78 +++++++++++++++++-------------- embassy-nrf/src/time_driver.rs | 19 ++++---- embassy-rp/src/timer.rs | 17 +++---- embassy-stm32/src/time_driver.rs | 18 +++---- embassy-time/src/driver.rs | 13 +++--- embassy-time/src/driver_std.rs | 4 +- embassy-time/src/driver_wasm.rs | 4 +- embassy-time/src/queue_generic.rs | 77 +++++++++++------------------- 8 files changed, 113 insertions(+), 117 deletions(-) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index e1258ebb5..5bcb1e6e7 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -354,46 +354,54 @@ impl Executor { /// somehow schedule for `poll()` to be called later, at a time you know for sure there's /// no `poll()` already running. pub unsafe fn poll(&'static self) { - #[cfg(feature = "integrated-timers")] - self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); + loop { + #[cfg(feature = "integrated-timers")] + self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task)); - self.run_queue.dequeue_all(|p| { - let task = p.as_ref(); + self.run_queue.dequeue_all(|p| { + let task = p.as_ref(); + + #[cfg(feature = "integrated-timers")] + task.expires_at.set(Instant::MAX); + + let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + if state & STATE_SPAWNED == 0 { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } + + #[cfg(feature = "rtos-trace")] + trace::task_exec_begin(p.as_ptr() as u32); + + // Run the task + task.poll_fn.read()(p as _); + + #[cfg(feature = "rtos-trace")] + trace::task_exec_end(); + + // Enqueue or update into timer_queue + #[cfg(feature = "integrated-timers")] + self.timer_queue.update(p); + }); #[cfg(feature = "integrated-timers")] - task.expires_at.set(Instant::MAX); - - let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - return; + { + // If this is already in the past, set_alarm might return false + // In that case do another poll loop iteration. + let next_expiration = self.timer_queue.next_expiration(); + if driver::set_alarm(self.alarm, next_expiration.as_ticks()) { + break; + } } - #[cfg(feature = "rtos-trace")] - trace::task_exec_begin(p.as_ptr() as u32); - - // Run the task - task.poll_fn.read()(p as _); - - #[cfg(feature = "rtos-trace")] - trace::task_exec_end(); - - // Enqueue or update into timer_queue - #[cfg(feature = "integrated-timers")] - self.timer_queue.update(p); - }); - - #[cfg(feature = "integrated-timers")] - { - // If this is already in the past, set_alarm will immediately trigger the alarm. - // This will cause `signal_fn` to be called, which will cause `poll()` to be called again, - // so we immediately do another poll loop iteration. - let next_expiration = self.timer_queue.next_expiration(); - driver::set_alarm(self.alarm, next_expiration.as_ticks()); + #[cfg(not(feature = "integrated-timers"))] + { + break; + } } #[cfg(feature = "rtos-trace")] diff --git a/embassy-nrf/src/time_driver.rs b/embassy-nrf/src/time_driver.rs index c32a44637..0d03ad529 100644 --- a/embassy-nrf/src/time_driver.rs +++ b/embassy-nrf/src/time_driver.rs @@ -243,20 +243,19 @@ impl Driver for RtcDriver { }) } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { critical_section::with(|cs| { + let t = self.now(); + + // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. + if timestamp <= t { + return false; + } + let n = alarm.id() as _; let alarm = self.get_alarm(cs, alarm); alarm.timestamp.set(timestamp); - let t = self.now(); - - // If alarm timestamp has passed, trigger it instantly. - if timestamp <= t { - self.trigger_alarm(n, cs); - return; - } - let r = rtc(); // If it hasn't triggered yet, setup it in the compare channel. @@ -287,6 +286,8 @@ impl Driver for RtcDriver { // It will be setup later by `next_period`. r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) }); } + + true }) } } diff --git a/embassy-rp/src/timer.rs b/embassy-rp/src/timer.rs index 5215c0c0f..8f280f550 100644 --- a/embassy-rp/src/timer.rs +++ b/embassy-rp/src/timer.rs @@ -68,9 +68,16 @@ impl Driver for TimerDriver { }) } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { let n = alarm.id() as usize; critical_section::with(|cs| { + let now = self.now(); + + // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. + if timestamp <= now { + return false; + } + let alarm = &self.alarms.borrow(cs)[n]; alarm.timestamp.set(timestamp); @@ -80,13 +87,7 @@ impl Driver for TimerDriver { // it is checked if the alarm time has passed. unsafe { pac::TIMER.alarm(n).write_value(timestamp as u32) }; - let now = self.now(); - - // If alarm timestamp has passed, trigger it instantly. - // This disarms it. - if timestamp <= now { - self.trigger_alarm(n, cs); - } + true }) } } diff --git a/embassy-stm32/src/time_driver.rs b/embassy-stm32/src/time_driver.rs index ed3225c51..e4c266e7f 100644 --- a/embassy-stm32/src/time_driver.rs +++ b/embassy-stm32/src/time_driver.rs @@ -292,21 +292,21 @@ impl Driver for RtcDriver { }) } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { critical_section::with(|cs| { + let t = self.now(); + + // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. + if timestamp <= t { + return false; + } + let r = T::regs_gp16(); let n = alarm.id() as _; let alarm = self.get_alarm(cs, alarm); alarm.timestamp.set(timestamp); - let t = self.now(); - if timestamp <= t { - unsafe { r.dier().modify(|w| w.set_ccie(n + 1, false)) }; - self.trigger_alarm(n, cs); - return; - } - let safe_timestamp = timestamp.max(t + 3); // Write the CCR value regardless of whether we're going to enable it now or not. @@ -317,6 +317,8 @@ impl Driver for RtcDriver { let diff = timestamp - t; // NOTE(unsafe) We're in a critical section unsafe { r.dier().modify(|w| w.set_ccie(n + 1, diff < 0xc000)) }; + + true }) } } diff --git a/embassy-time/src/driver.rs b/embassy-time/src/driver.rs index 79ae14b91..5c2ad3b23 100644 --- a/embassy-time/src/driver.rs +++ b/embassy-time/src/driver.rs @@ -105,20 +105,21 @@ pub trait Driver: Send + Sync + 'static { /// Sets an alarm at the given timestamp. When the current timestamp reaches the alarm /// timestamp, the provided callback function will be called. /// - /// If `timestamp` is already in the past, the alarm callback must be immediately fired. - /// In this case, it is allowed (but not mandatory) to call the alarm callback synchronously from `set_alarm`. + /// The `Driver` implementation should guarantee that the alarm callback is never called synchronously from `set_alarm`. + /// Rather - if `timestamp` is already in the past - `false` should be returned and alarm should not be set, + /// or alternatively, the driver should return `true` and arrange to call the alarm callback as soon as possible, but not synchronously. /// /// When callback is called, it is guaranteed that now() will return a value greater or equal than timestamp. /// /// Only one alarm can be active at a time for each AlarmHandle. This overwrites any previously-set alarm if any. - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64); + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool; } extern "Rust" { fn _embassy_time_now() -> u64; fn _embassy_time_allocate_alarm() -> Option; fn _embassy_time_set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()); - fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64); + fn _embassy_time_set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool; } /// See [`Driver::now`] @@ -139,7 +140,7 @@ pub fn set_alarm_callback(alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ( } /// See [`Driver::set_alarm`] -pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) { +pub fn set_alarm(alarm: AlarmHandle, timestamp: u64) -> bool { unsafe { _embassy_time_set_alarm(alarm, timestamp) } } @@ -167,7 +168,7 @@ macro_rules! time_driver_impl { } #[no_mangle] - fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) { + fn _embassy_time_set_alarm(alarm: $crate::driver::AlarmHandle, timestamp: u64) -> bool { <$t as $crate::driver::Driver>::set_alarm(&$name, alarm, timestamp) } }; diff --git a/embassy-time/src/driver_std.rs b/embassy-time/src/driver_std.rs index 2ddb2e604..fc7fd1979 100644 --- a/embassy-time/src/driver_std.rs +++ b/embassy-time/src/driver_std.rs @@ -127,12 +127,14 @@ impl Driver for TimeDriver { alarm.ctx = ctx; } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { self.init(); let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); let alarm = &mut alarms[alarm.id() as usize]; alarm.timestamp = timestamp; unsafe { self.signaler.as_ref() }.signal(); + + true } } diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs index e4497e6a2..d7a6b0d8d 100644 --- a/embassy-time/src/driver_wasm.rs +++ b/embassy-time/src/driver_wasm.rs @@ -81,13 +81,15 @@ impl Driver for TimeDriver { } } - fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { + fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) -> bool { self.init(); let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); let alarm = &mut alarms[alarm.id() as usize]; alarm.closure.replace(Closure::new(move || { callback(ctx); })); + + true } fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs index 1c4e5398b..83f734848 100644 --- a/embassy-time/src/queue_generic.rs +++ b/embassy-time/src/queue_generic.rs @@ -2,7 +2,6 @@ use core::cell::RefCell; use core::cmp::Ordering; use core::task::Waker; -use atomic_polyfill::{AtomicU64, Ordering as AtomicOrdering}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::Mutex; use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; @@ -71,7 +70,7 @@ impl InnerQueue { } } - fn schedule_wake(&mut self, at: Instant, waker: &Waker, alarm_schedule: &AtomicU64) { + fn schedule_wake(&mut self, at: Instant, waker: &Waker) { self.queue .find_mut(|timer| timer.waker.will_wake(waker)) .map(|mut timer| { @@ -98,50 +97,54 @@ impl InnerQueue { // dispatch all timers that are already due // // Then update the alarm if necessary - self.dispatch(alarm_schedule); + self.dispatch(); } - fn dispatch(&mut self, alarm_schedule: &AtomicU64) { - let now = Instant::now(); + fn dispatch(&mut self) { + loop { + let now = Instant::now(); - while self.queue.peek().filter(|timer| timer.at <= now).is_some() { - self.queue.pop().unwrap().waker.wake(); + while self.queue.peek().filter(|timer| timer.at <= now).is_some() { + self.queue.pop().unwrap().waker.wake(); + } + + if self.update_alarm() { + break; + } } - - self.update_alarm(alarm_schedule); } - fn update_alarm(&mut self, alarm_schedule: &AtomicU64) { + fn update_alarm(&mut self) -> bool { if let Some(timer) = self.queue.peek() { let new_at = timer.at; if self.alarm_at != new_at { self.alarm_at = new_at; - alarm_schedule.store(new_at.as_ticks(), AtomicOrdering::SeqCst); + + return set_alarm(self.alarm.unwrap(), self.alarm_at.as_ticks()); } } else { self.alarm_at = Instant::MAX; - alarm_schedule.store(Instant::MAX.as_ticks(), AtomicOrdering::SeqCst); } + + true } - fn handle_alarm(&mut self, alarm_schedule: &AtomicU64) { + fn handle_alarm(&mut self) { self.alarm_at = Instant::MAX; - self.dispatch(alarm_schedule); + self.dispatch(); } } struct Queue { inner: Mutex>, - alarm_schedule: AtomicU64, } impl Queue { const fn new() -> Self { Self { inner: Mutex::new(RefCell::new(InnerQueue::new())), - alarm_schedule: AtomicU64::new(u64::MAX), } } @@ -156,28 +159,12 @@ impl Queue { set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); } - inner.schedule_wake(at, waker, &self.alarm_schedule) + inner.schedule_wake(at, waker) }); - - self.update_alarm(); - } - - fn update_alarm(&self) { - // Need to set the alarm when we are *not* holding the mutex on the inner queue - // because mutexes are not re-entrant, which is a problem because `set_alarm` might immediately - // call us back if the timestamp is in the past. - let alarm_at = self.alarm_schedule.swap(u64::MAX, AtomicOrdering::SeqCst); - - if alarm_at < u64::MAX { - set_alarm(self.inner.lock(|inner| inner.borrow().alarm.unwrap()), alarm_at); - } } fn handle_alarm(&self) { - self.inner - .lock(|inner| inner.borrow_mut().handle_alarm(&self.alarm_schedule)); - - self.update_alarm(); + self.inner.lock(|inner| inner.borrow_mut().handle_alarm()); } fn handle_alarm_callback(ctx: *mut ()) { @@ -196,7 +183,6 @@ crate::timer_queue_impl!(static QUEUE: Queue = Queue::new()); #[cfg(test)] mod tests { use core::cell::Cell; - use core::sync::atomic::Ordering; use core::task::{RawWaker, RawWakerVTable, Waker}; use std::rc::Rc; use std::sync::Mutex; @@ -282,20 +268,14 @@ mod tests { inner.ctx = ctx; } - fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) { - let notify = { - let mut inner = self.0.lock().unwrap(); + fn set_alarm(&self, _alarm: AlarmHandle, timestamp: u64) -> bool { + let mut inner = self.0.lock().unwrap(); - if timestamp <= inner.now { - Some((inner.callback, inner.ctx)) - } else { - inner.alarm = timestamp; - None - } - }; - - if let Some((callback, ctx)) = notify { - (callback)(ctx); + if timestamp <= inner.now { + false + } else { + inner.alarm = timestamp; + true } } } @@ -344,7 +324,6 @@ mod tests { fn setup() { DRIVER.reset(); - QUEUE.alarm_schedule.store(u64::MAX, Ordering::SeqCst); QUEUE.inner.lock(|inner| { *inner.borrow_mut() = InnerQueue::new(); }); From f78c706b89feed71a7e0a3eaf332f55813698c7f Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 24 Oct 2022 11:10:59 +0300 Subject: [PATCH 05/11] Address review feedback --- embassy-nrf/src/time_driver.rs | 18 +++++++++++------- embassy-rp/src/timer.rs | 20 ++++++++++++-------- embassy-stm32/src/time_driver.rs | 18 +++++++++++------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/embassy-nrf/src/time_driver.rs b/embassy-nrf/src/time_driver.rs index 0d03ad529..bc2c8a3c1 100644 --- a/embassy-nrf/src/time_driver.rs +++ b/embassy-nrf/src/time_driver.rs @@ -245,19 +245,23 @@ impl Driver for RtcDriver { fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { critical_section::with(|cs| { - let t = self.now(); - - // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. - if timestamp <= t { - return false; - } - let n = alarm.id() as _; let alarm = self.get_alarm(cs, alarm); alarm.timestamp.set(timestamp); let r = rtc(); + let t = self.now(); + if timestamp <= t { + // If alarm timestamp has passed the alarm will not fire. + // Disarm the alarm and return `false` to indicate that. + r.intenclr.write(|w| unsafe { w.bits(compare_n(n)) }); + + alarm.timestamp.set(u64::MAX); + + return false; + } + // If it hasn't triggered yet, setup it in the compare channel. // Write the CC value regardless of whether we're going to enable it now or not. diff --git a/embassy-rp/src/timer.rs b/embassy-rp/src/timer.rs index 8f280f550..80efd779f 100644 --- a/embassy-rp/src/timer.rs +++ b/embassy-rp/src/timer.rs @@ -71,13 +71,6 @@ impl Driver for TimerDriver { fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { let n = alarm.id() as usize; critical_section::with(|cs| { - let now = self.now(); - - // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. - if timestamp <= now { - return false; - } - let alarm = &self.alarms.borrow(cs)[n]; alarm.timestamp.set(timestamp); @@ -87,7 +80,18 @@ impl Driver for TimerDriver { // it is checked if the alarm time has passed. unsafe { pac::TIMER.alarm(n).write_value(timestamp as u32) }; - true + let now = self.now(); + if timestamp <= now { + // If alarm timestamp has passed the alarm will not fire. + // Disarm the alarm and return `false` to indicate that. + unsafe { pac::TIMER.armed().write(|w| w.set_armed(1 << n)) } + + alarm.timestamp.set(u64::MAX); + + false + } else { + true + } }) } } diff --git a/embassy-stm32/src/time_driver.rs b/embassy-stm32/src/time_driver.rs index e4c266e7f..ab55cc081 100644 --- a/embassy-stm32/src/time_driver.rs +++ b/embassy-stm32/src/time_driver.rs @@ -294,19 +294,23 @@ impl Driver for RtcDriver { fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { critical_section::with(|cs| { - let t = self.now(); - - // If alarm timestamp has passed don't set the alarm and return `false` to indicate that. - if timestamp <= t { - return false; - } - let r = T::regs_gp16(); let n = alarm.id() as _; let alarm = self.get_alarm(cs, alarm); alarm.timestamp.set(timestamp); + let t = self.now(); + if timestamp <= t { + // If alarm timestamp has passed the alarm will not fire. + // Disarm the alarm and return `false` to indicate that. + unsafe { r.dier().modify(|w| w.set_ccie(n + 1, false)) }; + + alarm.timestamp.set(u64::MAX); + + return false; + } + let safe_timestamp = timestamp.max(t + 3); // Write the CCR value regardless of whether we're going to enable it now or not. From e3cf4255c6395ff5174432ab332189adf1ef431a Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 24 Oct 2022 11:31:54 +0300 Subject: [PATCH 06/11] Help compiler with type inference --- embassy-stm32/src/time_driver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embassy-stm32/src/time_driver.rs b/embassy-stm32/src/time_driver.rs index ab55cc081..8e84570a4 100644 --- a/embassy-stm32/src/time_driver.rs +++ b/embassy-stm32/src/time_driver.rs @@ -296,7 +296,7 @@ impl Driver for RtcDriver { critical_section::with(|cs| { let r = T::regs_gp16(); - let n = alarm.id() as _; + let n = alarm.id() as usize; let alarm = self.get_alarm(cs, alarm); alarm.timestamp.set(timestamp); From 516f4ce94684d9b3f9310c9972f878dbf883600c Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 24 Oct 2022 12:15:53 +0300 Subject: [PATCH 07/11] Fix embassy-time wasm build and fix a bug in wasm time driver --- embassy-time/src/driver_wasm.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/embassy-time/src/driver_wasm.rs b/embassy-time/src/driver_wasm.rs index d7a6b0d8d..63d049897 100644 --- a/embassy-time/src/driver_wasm.rs +++ b/embassy-time/src/driver_wasm.rs @@ -81,26 +81,32 @@ impl Driver for TimeDriver { } } - fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) -> bool { + fn set_alarm_callback(&self, alarm: AlarmHandle, callback: fn(*mut ()), ctx: *mut ()) { self.init(); let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); let alarm = &mut alarms[alarm.id() as usize]; alarm.closure.replace(Closure::new(move || { callback(ctx); })); - - true } - fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) { + fn set_alarm(&self, alarm: AlarmHandle, timestamp: u64) -> bool { self.init(); let mut alarms = unsafe { self.alarms.as_ref() }.lock().unwrap(); let alarm = &mut alarms[alarm.id() as usize]; - let timeout = (timestamp - self.now()) as u32; if let Some(token) = alarm.token { clearTimeout(token); } - alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000)); + + let now = self.now(); + if timestamp <= now { + false + } else { + let timeout = (timestamp - now) as u32; + alarm.token = Some(setTimeout(alarm.closure.as_ref().unwrap(), timeout / 1000)); + + true + } } } From ac6995f9e656a724d92590e722ac0c25f417893b Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Wed, 26 Oct 2022 17:48:22 +0300 Subject: [PATCH 08/11] Fix a bug identified during code review --- embassy-time/src/queue_generic.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs index 83f734848..6769d6a58 100644 --- a/embassy-time/src/queue_generic.rs +++ b/embassy-time/src/queue_generic.rs @@ -1,5 +1,5 @@ use core::cell::RefCell; -use core::cmp::Ordering; +use core::cmp::{min, Ordering}; use core::task::Waker; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; @@ -74,7 +74,7 @@ impl InnerQueue { self.queue .find_mut(|timer| timer.waker.will_wake(waker)) .map(|mut timer| { - timer.at = at; + timer.at = min(timer.at, at); timer.finish(); }) .unwrap_or_else(|| { From 560eecdb737642e6aba132408039195c7d6b6dd8 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Wed, 26 Oct 2022 18:05:27 +0300 Subject: [PATCH 09/11] Remove the _embassy_time_schedule_wake magic --- embassy-executor/src/raw/mod.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 5bcb1e6e7..181dabe8e 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -444,14 +444,21 @@ pub unsafe fn wake_task(task: NonNull) { } #[cfg(feature = "integrated-timers")] -#[no_mangle] -unsafe fn _embassy_time_schedule_wake(at: Instant, waker: &core::task::Waker) { - let task = waker::task_from_waker(waker); - let task = task.as_ref(); - let expires_at = task.expires_at.get(); - task.expires_at.set(expires_at.min(at)); +struct TimerQueue; + +#[cfg(feature = "integrated-timers")] +impl embassy_time::queue::TimerQueue for TimerQueue { + fn schedule_wake(&'static self, at: Instant, waker: &core::task::Waker) { + let task = waker::task_from_waker(waker); + let task = unsafe { task.as_ref() }; + let expires_at = task.expires_at.get(); + task.expires_at.set(expires_at.min(at)); + } } +#[cfg(feature = "integrated-timers")] +embassy_time::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue); + #[cfg(feature = "rtos-trace")] impl rtos_trace::RtosTraceOSCallbacks for Executor { fn task_list() { From 4976cbbe6040d5e147e7c42bd29b72d6223b05b0 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 26 Oct 2022 20:02:58 +0200 Subject: [PATCH 10/11] time/generic-queue: ensure queue goes in .bss instead of .data --- embassy-time/src/queue_generic.rs | 38 +++++++++++++++---------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs index 6769d6a58..8a355b327 100644 --- a/embassy-time/src/queue_generic.rs +++ b/embassy-time/src/queue_generic.rs @@ -57,19 +57,11 @@ impl Ord for Timer { struct InnerQueue { queue: SortedLinkedList, - alarm: Option, + alarm: AlarmHandle, alarm_at: Instant, } impl InnerQueue { - const fn new() -> Self { - Self { - queue: SortedLinkedList::new_u8(), - alarm: None, - alarm_at: Instant::MAX, - } - } - fn schedule_wake(&mut self, at: Instant, waker: &Waker) { self.queue .find_mut(|timer| timer.waker.will_wake(waker)) @@ -121,7 +113,7 @@ impl InnerQueue { if self.alarm_at != new_at { self.alarm_at = new_at; - return set_alarm(self.alarm.unwrap(), self.alarm_at.as_ticks()); + return set_alarm(self.alarm, self.alarm_at.as_ticks()); } } else { self.alarm_at = Instant::MAX; @@ -138,13 +130,13 @@ impl InnerQueue { } struct Queue { - inner: Mutex>, + inner: Mutex>>, } impl Queue { const fn new() -> Self { Self { - inner: Mutex::new(RefCell::new(InnerQueue::new())), + inner: Mutex::new(RefCell::new(None)), } } @@ -152,19 +144,25 @@ impl Queue { self.inner.lock(|inner| { let mut inner = inner.borrow_mut(); - if inner.alarm.is_none() { - let handle = unsafe { allocate_alarm() }.unwrap(); - inner.alarm = Some(handle); + if inner.is_none() {} - set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); - } - - inner.schedule_wake(at, waker) + inner + .get_or_insert_with(|| { + let handle = unsafe { allocate_alarm() }.unwrap(); + set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); + InnerQueue { + queue: SortedLinkedList::new_u8(), + alarm: handle, + alarm_at: Instant::MAX, + } + }) + .schedule_wake(at, waker) }); } fn handle_alarm(&self) { - self.inner.lock(|inner| inner.borrow_mut().handle_alarm()); + self.inner + .lock(|inner| inner.borrow_mut().as_mut().unwrap().handle_alarm()); } fn handle_alarm_callback(ctx: *mut ()) { From f9da6271cea7035b2c9f27cfe479aa81889168d1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 26 Oct 2022 21:00:50 +0200 Subject: [PATCH 11/11] time/generic_queue: use Vec instead of SortedLinkedList --- embassy-time/src/queue_generic.rs | 46 +++++++++++++++---------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/embassy-time/src/queue_generic.rs b/embassy-time/src/queue_generic.rs index 8a355b327..20ae7e6cc 100644 --- a/embassy-time/src/queue_generic.rs +++ b/embassy-time/src/queue_generic.rs @@ -4,7 +4,7 @@ use core::task::Waker; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::Mutex; -use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; +use heapless::Vec; use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; use crate::queue::TimerQueue; @@ -56,18 +56,17 @@ impl Ord for Timer { } struct InnerQueue { - queue: SortedLinkedList, + queue: Vec, alarm: AlarmHandle, - alarm_at: Instant, } impl InnerQueue { fn schedule_wake(&mut self, at: Instant, waker: &Waker) { self.queue - .find_mut(|timer| timer.waker.will_wake(waker)) + .iter_mut() + .find(|timer| timer.waker.will_wake(waker)) .map(|mut timer| { timer.at = min(timer.at, at); - timer.finish(); }) .unwrap_or_else(|| { let mut timer = Timer { @@ -96,35 +95,35 @@ impl InnerQueue { loop { let now = Instant::now(); - while self.queue.peek().filter(|timer| timer.at <= now).is_some() { - self.queue.pop().unwrap().waker.wake(); + let mut next_alarm = Instant::MAX; + + let mut i = 0; + while i < self.queue.len() { + let timer = &self.queue[i]; + if timer.at <= now { + let timer = self.queue.swap_remove(i); + timer.waker.wake(); + } else { + next_alarm = min(next_alarm, timer.at); + i += 1; + } } - if self.update_alarm() { + if self.update_alarm(next_alarm) { break; } } } - fn update_alarm(&mut self) -> bool { - if let Some(timer) = self.queue.peek() { - let new_at = timer.at; - - if self.alarm_at != new_at { - self.alarm_at = new_at; - - return set_alarm(self.alarm, self.alarm_at.as_ticks()); - } + fn update_alarm(&mut self, next_alarm: Instant) -> bool { + if next_alarm == Instant::MAX { + true } else { - self.alarm_at = Instant::MAX; + set_alarm(self.alarm, next_alarm.as_ticks()) } - - true } fn handle_alarm(&mut self) { - self.alarm_at = Instant::MAX; - self.dispatch(); } } @@ -151,9 +150,8 @@ impl Queue { let handle = unsafe { allocate_alarm() }.unwrap(); set_alarm_callback(handle, Self::handle_alarm_callback, self as *const _ as _); InnerQueue { - queue: SortedLinkedList::new_u8(), + queue: Vec::new(), alarm: handle, - alarm_at: Instant::MAX, } }) .schedule_wake(at, waker)