More efficient timer queue, integrated into Executor directly.

This commit is contained in:
Dario Nieuwenhuis 2020-12-26 23:44:53 +01:00
parent 8b7a42a4f9
commit 692d8bb813
10 changed files with 258 additions and 25 deletions

View file

@ -25,7 +25,7 @@ debug = 2
debug-assertions = false
incremental = false
lto = "fat"
opt-level = 3
opt-level = 's'
overflow-checks = false
# do not optimize proc-macro crates = faster builds from scratch

View file

@ -22,6 +22,5 @@ rand_core = { version = "0.5.1", optional = true, features = ["std"] }
cortex-m = "0.6.4"
futures = { version = "0.3.5", default-features = false }
pin-project = { version = "1.0.2", default-features = false }
futures-intrusive = { version = "0.3.1", default-features = false }
embassy-macros = { version = "0.1.0", path = "../embassy-macros"}

View file

@ -1,6 +1,5 @@
pub use embassy_macros::task;
use core::cell::Cell;
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
@ -8,30 +7,54 @@ use core::pin::Pin;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicU32, Ordering};
use core::task::{Context, Poll, RawWaker, Waker};
use core::task::{Context, Poll, Waker};
use core::{
cell::{Cell, UnsafeCell},
cmp::min,
};
mod run_queue;
pub(crate) mod timer;
mod timer_queue;
mod util;
mod waker;
use self::run_queue::{RunQueue, RunQueueItem};
use self::timer_queue::{TimerQueue, TimerQueueItem};
use self::util::UninitCell;
use crate::{
fmt::{panic, *},
time::{Alarm, Instant},
};
/// Task is spawned (has a future)
const STATE_SPAWNED: u32 = 1 << 0;
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
const STATE_RUN_QUEUED: u32 = 1 << 1;
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub(crate) struct TaskHeader {
state: AtomicU32,
run_queue_item: RunQueueItem,
expires_at: Cell<Instant>,
timer_queue_item: TimerQueueItem,
executor: Cell<*const Executor>, // Valid if state != 0
poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_SPAWNED
}
impl TaskHeader {
const fn new() -> Self {
Self {
state: AtomicU32::new(0),
expires_at: Cell::new(Instant::from_ticks(0)),
run_queue_item: RunQueueItem::new(),
timer_queue_item: TimerQueueItem::new(),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
}
}
pub(crate) unsafe fn enqueue(&self) {
let mut current = self.state.load(Ordering::Acquire);
loop {
@ -71,12 +94,7 @@ pub struct Task<F: Future + 'static> {
impl<F: Future + 'static> Task<F> {
pub const fn new() -> Self {
Self {
header: TaskHeader {
state: AtomicU32::new(0),
run_queue_item: RunQueueItem::new(),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
},
header: TaskHeader::new(),
future: UninitCell::uninit(),
}
}
@ -144,7 +162,9 @@ pub enum SpawnError {
}
pub struct Executor {
alarm: Option<&'static dyn Alarm>,
run_queue: RunQueue,
timer_queue: TimerQueue,
signal_fn: fn(),
not_send: PhantomData<*mut ()>,
}
@ -152,7 +172,18 @@ pub struct Executor {
impl Executor {
pub const fn new(signal_fn: fn()) -> Self {
Self {
alarm: None,
run_queue: RunQueue::new(),
timer_queue: TimerQueue::new(),
signal_fn: signal_fn,
not_send: PhantomData,
}
}
pub const fn new_with_alarm(alarm: &'static dyn Alarm, signal_fn: fn()) -> Self {
Self {
alarm: Some(alarm),
run_queue: RunQueue::new(),
timer_queue: TimerQueue::new(),
signal_fn: signal_fn,
not_send: PhantomData,
}
@ -183,8 +214,13 @@ impl Executor {
/// Runs the executor until the queue is empty.
pub fn run(&self) {
unsafe {
self.timer_queue.dequeue_expired(Instant::now(), |p| {
self.enqueue(p);
});
self.run_queue.dequeue_all(|p| {
let header = &*p;
header.expires_at.set(Instant::MAX);
let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
if state & STATE_SPAWNED == 0 {
@ -198,7 +234,25 @@ impl Executor {
// Run the task
header.poll_fn.read()(p as _);
// Enqueue or update into timer_queue
self.timer_queue.update(p);
});
// If this is in the past, set_alarm will immediately trigger the alarm,
// which will make the wfe immediately return so we do another loop iteration.
if let Some(alarm) = self.alarm {
let next_expiration = self.timer_queue.next_expiration();
alarm.set_callback(self.signal_fn);
alarm.set(next_expiration.as_ticks());
}
}
}
}
pub(crate) unsafe fn register_timer(at: Instant, waker: &Waker) {
let p = waker::task_from_waker(waker);
let header = &*p;
let expires_at = header.expires_at.get();
header.expires_at.set(min(expires_at, at));
}

View file

@ -0,0 +1,67 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::Stream;
use crate::time::{Duration, Instant};
pub struct Timer {
expires_at: Instant,
}
impl Timer {
pub fn at(expires_at: Instant) -> Self {
Self { expires_at }
}
pub fn after(duration: Duration) -> Self {
Self {
expires_at: Instant::now() + duration,
}
}
}
impl Unpin for Timer {}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.expires_at <= Instant::now() {
Poll::Ready(())
} else {
unsafe { super::register_timer(self.expires_at, cx.waker()) };
Poll::Pending
}
}
}
pub struct Ticker {
expires_at: Instant,
duration: Duration,
}
impl Ticker {
pub fn every(duration: Duration) -> Self {
let expires_at = Instant::now() + duration;
Self {
expires_at,
duration,
}
}
}
impl Unpin for Ticker {}
impl Stream for Ticker {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.expires_at <= Instant::now() {
let dur = self.duration;
self.expires_at += dur;
Poll::Ready(Some(()))
} else {
unsafe { super::register_timer(self.expires_at, cx.waker()) };
Poll::Pending
}
}
}

View file

@ -0,0 +1,85 @@
use core::cell::Cell;
use core::sync::atomic::{AtomicPtr, Ordering};
use core::{cmp::min, ptr};
use crate::time::Instant;
use super::{TaskHeader, STATE_TIMER_QUEUED};
pub(crate) struct TimerQueueItem {
next: Cell<*mut TaskHeader>,
}
impl TimerQueueItem {
pub const fn new() -> Self {
Self {
next: Cell::new(ptr::null_mut()),
}
}
}
pub(crate) struct TimerQueue {
head: Cell<*mut TaskHeader>,
}
impl TimerQueue {
pub const fn new() -> Self {
Self {
head: Cell::new(ptr::null_mut()),
}
}
pub(crate) unsafe fn update(&self, p: *mut TaskHeader) {
let header = &*p;
if header.expires_at.get() != Instant::MAX {
let old_state = header.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel);
let is_new = old_state & STATE_TIMER_QUEUED == 0;
if is_new {
header.timer_queue_item.next.set(self.head.get());
self.head.set(p);
}
}
}
pub(crate) unsafe fn next_expiration(&self) -> Instant {
let mut res = Instant::MAX;
self.retain(|p| {
let header = &*p;
let expires = header.expires_at.get();
res = min(res, expires);
expires != Instant::MAX
});
res
}
pub(crate) unsafe fn dequeue_expired(&self, now: Instant, on_task: impl Fn(*mut TaskHeader)) {
self.retain(|p| {
let header = &*p;
if header.expires_at.get() <= now {
on_task(p);
false
} else {
true
}
});
}
pub(crate) unsafe fn retain(&self, mut f: impl FnMut(*mut TaskHeader) -> bool) {
let mut prev = &self.head;
while !prev.get().is_null() {
let p = prev.get();
let header = &*p;
if f(p) {
// Skip to next
prev = &header.timer_queue_item.next;
} else {
// Remove it
prev.set(header.timer_queue_item.next.get());
header
.state
.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel);
}
}
}
}

View file

@ -1,15 +1,16 @@
use core::mem;
use core::task::{RawWaker, RawWakerVTable, Waker};
use super::TaskHeader;
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
unsafe fn clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &VTABLE)
}
unsafe fn wake(p: *const ()) {
let header = &*(p as *const TaskHeader);
let header = &*task_from_ptr(p);
header.enqueue();
}
@ -20,3 +21,18 @@ unsafe fn drop(_: *const ()) {
pub(crate) unsafe fn from_task(p: *mut TaskHeader) -> Waker {
Waker::from_raw(RawWaker::new(p as _, &VTABLE))
}
pub(crate) unsafe fn task_from_ptr(p: *const ()) -> *mut TaskHeader {
p as *mut TaskHeader
}
pub(crate) unsafe fn task_from_waker(w: &Waker) -> *mut TaskHeader {
let w: &WakerHack = mem::transmute(w);
assert_eq!(w.vtable, &VTABLE);
task_from_ptr(w.data)
}
struct WakerHack {
data: *const (),
vtable: &'static RawWakerVTable,
}

View file

@ -12,6 +12,9 @@ pub struct Instant {
}
impl Instant {
pub const MIN: Instant = Instant { ticks: u64::MIN };
pub const MAX: Instant = Instant { ticks: u64::MAX };
pub fn now() -> Instant {
Instant { ticks: now() }
}

View file

@ -2,6 +2,7 @@ mod duration;
mod instant;
mod traits;
pub use crate::executor::timer::{Ticker, Timer};
pub use duration::Duration;
pub use instant::Instant;
pub use traits::*;

View file

@ -64,7 +64,7 @@ use example_common::*;
use cortex_m_rt::entry;
use nrf52840_hal::clocks;
use embassy::executor::{task, TimerExecutor};
use embassy::executor::{task, Executor};
use embassy::time::{Duration, Instant, Timer};
use embassy::util::Forever;
use embassy_nrf::{interrupt, pac, rtc};
@ -112,9 +112,12 @@ async fn run_low() {
}
static RTC: Forever<rtc::RTC<pac::RTC1>> = Forever::new();
static EXECUTOR_LOW: Forever<TimerExecutor<rtc::Alarm<pac::RTC1>>> = Forever::new();
static EXECUTOR_MED: Forever<TimerExecutor<rtc::Alarm<pac::RTC1>>> = Forever::new();
static EXECUTOR_HIGH: Forever<TimerExecutor<rtc::Alarm<pac::RTC1>>> = Forever::new();
static ALARM_LOW: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
static EXECUTOR_LOW: Forever<Executor> = Forever::new();
static ALARM_MED: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
static EXECUTOR_MED: Forever<Executor> = Forever::new();
static ALARM_HIGH: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
static EXECUTOR_HIGH: Forever<Executor> = Forever::new();
#[entry]
fn main() -> ! {
@ -131,11 +134,14 @@ fn main() -> ! {
rtc.start();
unsafe { embassy::time::set_clock(rtc) };
let executor_low = EXECUTOR_LOW.put(TimerExecutor::new(rtc.alarm0(), cortex_m::asm::sev));
let executor_med = EXECUTOR_MED.put(TimerExecutor::new(rtc.alarm1(), || {
let alarm_low = ALARM_LOW.put(rtc.alarm0());
let executor_low = EXECUTOR_LOW.put(Executor::new_with_alarm(alarm_low, cortex_m::asm::sev));
let alarm_med = ALARM_MED.put(rtc.alarm1());
let executor_med = EXECUTOR_MED.put(Executor::new_with_alarm(alarm_med, || {
interrupt::pend(interrupt::SWI0_EGU0)
}));
let executor_high = EXECUTOR_HIGH.put(TimerExecutor::new(rtc.alarm2(), || {
let alarm_high = ALARM_HIGH.put(rtc.alarm2());
let executor_high = EXECUTOR_HIGH.put(Executor::new_with_alarm(alarm_high, || {
interrupt::pend(interrupt::SWI1_EGU1)
}));

View file

@ -10,7 +10,7 @@ use core::mem::MaybeUninit;
use cortex_m_rt::entry;
use nrf52840_hal::clocks;
use embassy::executor::{task, TimerExecutor};
use embassy::executor::{task, Executor};
use embassy::time::{Clock, Duration, Timer};
use embassy::util::Forever;
use embassy_nrf::pac;
@ -33,7 +33,8 @@ async fn run2() {
}
static RTC: Forever<rtc::RTC<pac::RTC1>> = Forever::new();
static EXECUTOR: Forever<TimerExecutor<rtc::Alarm<pac::RTC1>>> = Forever::new();
static ALARM: Forever<rtc::Alarm<pac::RTC1>> = Forever::new();
static EXECUTOR: Forever<Executor> = Forever::new();
#[entry]
fn main() -> ! {
@ -51,7 +52,8 @@ fn main() -> ! {
unsafe { embassy::time::set_clock(rtc) };
let executor = EXECUTOR.put(TimerExecutor::new(rtc.alarm0(), cortex_m::asm::sev));
let alarm = ALARM.put(rtc.alarm0());
let executor = EXECUTOR.put(Executor::new_with_alarm(alarm, cortex_m::asm::sev));
unwrap!(executor.spawn(run1()));
unwrap!(executor.spawn(run2()));