Split executor into multiple files, remove old timers implementation.
This commit is contained in:
7 changed files with 325 additions and 451 deletions
@ -1,301 +0,0 @@
use core::cell::Cell;
use core::cell::UnsafeCell;
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::mem::MaybeUninit;
use core::pin::Pin;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering};
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
// UninitCell
struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
impl<T> UninitCell<T> {
const fn uninit() -> Self {
unsafe fn as_mut_ptr(&self) -> *mut T {
unsafe fn as_mut(&self) -> &mut T {
&mut *self.as_mut_ptr()
unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
unsafe fn drop_in_place(&self) {
impl<T: Copy> UninitCell<T> {
unsafe fn read(&self) -> T {
// Data structures
const STATE_RUNNING: u32 = 1 << 0;
const STATE_QUEUED: u32 = 1 << 1;
struct Header {
state: AtomicU32,
next: AtomicPtr<Header>,
executor: Cell<*const Executor>,
poll_fn: UninitCell<unsafe fn(*mut Header)>, // Valid if STATE_RUNNING
// repr(C) is needed to guarantee that header is located at offset 0
// This makes it safe to cast between Header and Task pointers.
pub struct Task<F: Future + 'static> {
header: Header,
future: UninitCell<F>, // Valid if STATE_RUNNING
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SpawnError {
// Atomic task queue using a very, very simple lock-free linked-list queue:
// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
// null. Then the batch is iterated following the next pointers until null is reached.
// Note that batches will be iterated in the opposite order as they were enqueued. This should
// be OK for our use case. Hopefully it doesn't create executor fairness problems.
struct Queue {
head: AtomicPtr<Header>,
impl Queue {
const fn new() -> Self {
Self {
head: AtomicPtr::new(ptr::null_mut()),
/// Enqueues an item. Returns true if the queue was empty.
unsafe fn enqueue(&self, item: *mut Header) -> bool {
let mut prev = self.head.load(Ordering::Acquire);
loop {
(*item).next.store(prev, Ordering::Relaxed);
match self
.compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire)
Ok(_) => break,
Err(next_prev) => prev = next_prev,
unsafe fn dequeue_all(&self, on_task: impl Fn(*mut Header)) {
let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
while !task.is_null() {
// If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task.
let next = (*task).next.load(Ordering::Relaxed);
task = next
// Waker
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
unsafe fn waker_wake(p: *const ()) {
let header = &*(p as *const Header);
let mut current = header.state.load(Ordering::Acquire);
loop {
// If already scheduled, or if not started,
if (current & STATE_QUEUED != 0) || (current & STATE_RUNNING == 0) {
// Mark it as scheduled
let new = current | STATE_QUEUED;
match header
.compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire)
Ok(_) => break,
Err(next_current) => current = next_current,
// We have just marked the task as scheduled, so enqueue it.
let executor = &*header.executor.get();
executor.enqueue(p as *mut Header);
unsafe fn waker_drop(_: *const ()) {
// nop
// Task
impl<F: Future + 'static> Task<F> {
pub const fn new() -> Self {
Self {
header: Header {
state: AtomicU32::new(0),
next: AtomicPtr::new(ptr::null_mut()),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
future: UninitCell::uninit(),
pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
for task in pool {
if task
.compare_and_swap(0, state, Ordering::AcqRel)
== 0
// Initialize the task
return SpawnToken {
header: Some(NonNull::new_unchecked(&task.header as *const Header as _)),
return SpawnToken { header: None };
unsafe fn poll(p: *mut Header) {
let this = &*(p as *const Task<F>);
let future = Pin::new_unchecked(this.future.as_mut());
let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE));
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(_) => {
.fetch_and(!STATE_RUNNING, Ordering::AcqRel);
Poll::Pending => {}
unsafe impl<F: Future + 'static> Sync for Task<F> {}
// Spawn token
#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
pub struct SpawnToken {
header: Option<NonNull<Header>>,
impl Drop for SpawnToken {
fn drop(&mut self) {
// TODO deallocate the task instead.
panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()")
// Executor
pub struct Executor {
queue: Queue,
signal_fn: fn(),
not_send: PhantomData<*mut ()>,
impl Executor {
pub const fn new(signal_fn: fn()) -> Self {
Self {
queue: Queue::new(),
signal_fn: signal_fn,
not_send: PhantomData,
unsafe fn enqueue(&self, item: *mut Header) {
if self.queue.enqueue(item) {
/// Spawn a future on this executor.
pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
let header = token.header;
match header {
Some(header) => unsafe {
let header = header.as_ref();
self.enqueue(header as *const _ as _);
None => Err(SpawnError::Busy),
/// Runs the executor until the queue is empty.
pub fn run(&self) {
unsafe {
self.queue.dequeue_all(|p| {
let header = &*p;
let state = header.state.fetch_and(!STATE_QUEUED, Ordering::AcqRel);
if state & STATE_RUNNING == 0 {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
// Run the task
header.poll_fn.read()(p as _);
@ -1,9 +1,224 @@
mod executor;
mod timer_executor;
// for time::Timer
pub(crate) use timer_executor::current_timer_queue;
pub use embassy_macros::task;
pub use executor::{Executor, SpawnError, SpawnToken, Task};
pub use timer_executor::TimerExecutor;
use core::cell::Cell;
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::ptr;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicU32, Ordering};
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
mod run_queue;
mod util;
use self::run_queue::{RunQueue, RunQueueItem};
use self::util::UninitCell;
/// Task is spawned and future hasn't finished running yet.
const STATE_RUNNING: u32 = 1 << 0;
/// Task is in the executor run queue
const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
const STATE_TIMER_QUEUED: u32 = 1 << 2;
pub(crate) struct TaskHeader {
state: AtomicU32,
run_queue_item: RunQueueItem,
executor: Cell<*const Executor>, // Valid if state != 0
poll_fn: UninitCell<unsafe fn(*mut TaskHeader)>, // Valid if STATE_RUNNING
// repr(C) is needed to guarantee that header is located at offset 0
// This makes it safe to cast between Header and Task pointers.
pub struct Task<F: Future + 'static> {
header: TaskHeader,
future: UninitCell<F>, // Valid if STATE_RUNNING
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum SpawnError {
// Waker
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake, waker_drop);
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &WAKER_VTABLE)
unsafe fn waker_wake(p: *const ()) {
let header = &*(p as *const TaskHeader);
let mut current = header.state.load(Ordering::Acquire);
loop {
// If already scheduled, or if not started,
if (current & STATE_RUN_QUEUED != 0) || (current & STATE_RUNNING == 0) {
// Mark it as scheduled
let new = current | STATE_RUN_QUEUED;
match header
.compare_exchange_weak(current, new, Ordering::AcqRel, Ordering::Acquire)
Ok(_) => break,
Err(next_current) => current = next_current,
// We have just marked the task as scheduled, so enqueue it.
let executor = &*header.executor.get();
executor.enqueue(p as *mut TaskHeader);
unsafe fn waker_drop(_: *const ()) {
// nop
// Task
impl<F: Future + 'static> Task<F> {
pub const fn new() -> Self {
Self {
header: TaskHeader {
state: AtomicU32::new(0),
run_queue_item: RunQueueItem::new(),
executor: Cell::new(ptr::null()),
poll_fn: UninitCell::uninit(),
future: UninitCell::uninit(),
pub unsafe fn spawn(pool: &'static [Self], future: impl FnOnce() -> F) -> SpawnToken {
for task in pool {
if task
.compare_and_swap(0, state, Ordering::AcqRel)
== 0
// Initialize the task
return SpawnToken {
header: Some(NonNull::new_unchecked(
&task.header as *const TaskHeader as _,
return SpawnToken { header: None };
unsafe fn poll(p: *mut TaskHeader) {
let this = &*(p as *const Task<F>);
let future = Pin::new_unchecked(this.future.as_mut());
let waker = Waker::from_raw(RawWaker::new(p as _, &WAKER_VTABLE));
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(_) => {
.fetch_and(!STATE_RUNNING, Ordering::AcqRel);
Poll::Pending => {}
unsafe impl<F: Future + 'static> Sync for Task<F> {}
// Spawn token
#[must_use = "Calling a task function does nothing on its own. You must pass the returned SpawnToken to Executor::spawn()"]
pub struct SpawnToken {
header: Option<NonNull<TaskHeader>>,
impl Drop for SpawnToken {
fn drop(&mut self) {
// TODO deallocate the task instead.
panic!("SpawnToken instances may not be dropped. You must pass them to Executor::spawn()")
// Executor
pub struct Executor {
run_queue: RunQueue,
signal_fn: fn(),
not_send: PhantomData<*mut ()>,
impl Executor {
pub const fn new(signal_fn: fn()) -> Self {
Self {
run_queue: RunQueue::new(),
signal_fn: signal_fn,
not_send: PhantomData,
unsafe fn enqueue(&self, item: *mut TaskHeader) {
if self.run_queue.enqueue(item) {
/// Spawn a future on this executor.
pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
let header = token.header;
match header {
Some(header) => unsafe {
let header = header.as_ref();
self.enqueue(header as *const _ as _);
None => Err(SpawnError::Busy),
/// Runs the executor until the queue is empty.
pub fn run(&self) {
unsafe {
self.run_queue.dequeue_all(|p| {
let header = &*p;
let state = header.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
if state & STATE_RUNNING == 0 {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
// Run the task
header.poll_fn.read()(p as _);
Normal file
Normal file
@ -0,0 +1,70 @@
use core::ptr;
use core::sync::atomic::{AtomicPtr, Ordering};
use super::TaskHeader;
pub(crate) struct RunQueueItem {
next: AtomicPtr<TaskHeader>,
impl RunQueueItem {
pub const fn new() -> Self {
Self {
next: AtomicPtr::new(ptr::null_mut()),
/// Atomic task queue using a very, very simple lock-free linked-list queue:
/// To enqueue a task, task.next is set to the old head, and head is atomically set to task.
/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with
/// null. Then the batch is iterated following the next pointers until null is reached.
/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK
/// for our purposes: it can't crate fairness problems since the next batch won't run until the
/// current batch is completely processed, so even if a task enqueues itself instantly (for example
/// by waking its own waker) can't prevent other tasks from running.
pub(crate) struct RunQueue {
head: AtomicPtr<TaskHeader>,
impl RunQueue {
pub const fn new() -> Self {
Self {
head: AtomicPtr::new(ptr::null_mut()),
/// Enqueues an item. Returns true if the queue was empty.
pub(crate) unsafe fn enqueue(&self, item: *mut TaskHeader) -> bool {
let mut prev = self.head.load(Ordering::Acquire);
loop {
(*item).run_queue_item.next.store(prev, Ordering::Relaxed);
match self
.compare_exchange_weak(prev, item, Ordering::AcqRel, Ordering::Acquire)
Ok(_) => break,
Err(next_prev) => prev = next_prev,
pub(crate) unsafe fn dequeue_all(&self, on_task: impl Fn(*mut TaskHeader)) {
let mut task = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
while !task.is_null() {
// If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task.
let next = (*task).run_queue_item.next.load(Ordering::Relaxed);
task = next
@ -1,77 +0,0 @@
use super::executor::{Executor, SpawnError, SpawnToken};
use core::ptr;
use core::sync::atomic::{AtomicPtr, Ordering};
use futures_intrusive::timer as fi;
use crate::time::Alarm;
pub(crate) struct IntrusiveClock;
impl fi::Clock for IntrusiveClock {
fn now(&self) -> u64 {
pub(crate) type TimerQueue = fi::LocalTimerService;
pub struct TimerExecutor<A: Alarm> {
inner: Executor,
alarm: A,
timer_queue: TimerQueue,
impl<A: Alarm> TimerExecutor<A> {
pub fn new(alarm: A, signal_fn: fn()) -> Self {
Self {
inner: Executor::new(signal_fn),
timer_queue: TimerQueue::new(&IntrusiveClock),
/// Spawn a future on this executor.
/// safety: can only be called from the executor thread
pub fn spawn(&'static self, token: SpawnToken) -> Result<(), SpawnError> {
/// Runs the executor until the queue is empty.
/// safety: can only be called from the executor thread
pub fn run(&'static self) {
with_timer_queue(&self.timer_queue, || {
match self.timer_queue.next_expiration() {
// If this is in the past, set_alarm will immediately trigger the alarm,
// which will make the wfe immediately return so we do another loop iteration.
Some(at) => self.alarm.set(at),
None => self.alarm.clear(),
static CURRENT_TIMER_QUEUE: AtomicPtr<TimerQueue> = AtomicPtr::new(ptr::null_mut());
fn with_timer_queue<R>(svc: &'static TimerQueue, f: impl FnOnce() -> R) -> R {
let svc = svc as *const _ as *mut _;
let prev_svc = CURRENT_TIMER_QUEUE.swap(svc, Ordering::Relaxed);
let r = f();
let svc2 = CURRENT_TIMER_QUEUE.swap(prev_svc, Ordering::Relaxed);
assert_eq!(svc, svc2);
pub(crate) fn current_timer_queue() -> &'static TimerQueue {
unsafe {
Normal file
Normal file
@ -0,0 +1,32 @@
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::ptr;
pub(crate) struct UninitCell<T>(MaybeUninit<UnsafeCell<T>>);
impl<T> UninitCell<T> {
pub const fn uninit() -> Self {
pub unsafe fn as_mut_ptr(&self) -> *mut T {
pub unsafe fn as_mut(&self) -> &mut T {
&mut *self.as_mut_ptr()
pub unsafe fn write(&self, val: T) {
ptr::write(self.as_mut_ptr(), val)
pub unsafe fn drop_in_place(&self) {
impl<T: Copy> UninitCell<T> {
pub unsafe fn read(&self) -> T {
@ -1,11 +1,9 @@
mod duration;
mod instant;
mod timer;
mod traits;
pub use duration::Duration;
pub use instant::Instant;
pub use timer::{Ticker, Timer};
pub use traits::*;
use crate::fmt::*;
@ -1,63 +0,0 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::Stream;
use futures_intrusive::timer::{LocalTimer, LocalTimerFuture};
use super::{Duration, Instant};
use crate::executor::current_timer_queue;
pub struct Timer {
inner: LocalTimerFuture<'static>,
impl Timer {
pub fn at(when: Instant) -> Self {
Self {
inner: current_timer_queue().deadline(when.as_ticks()),
pub fn after(dur: Duration) -> Self {
Self::at(Instant::now() + dur)
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().inner) }.poll(cx)
pub struct Ticker {
inner: LocalTimerFuture<'static>,
next: Instant,
dur: Duration,
impl Ticker {
pub fn every(dur: Duration) -> Self {
let next = Instant::now() + dur;
Self {
inner: current_timer_queue().deadline(next.as_ticks()),
impl Stream for Ticker {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = unsafe { self.get_unchecked_mut() };
match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) {
Poll::Ready(_) => {
this.next += this.dur;
this.inner = current_timer_queue().deadline(this.next.as_ticks());
Poll::Pending => Poll::Pending,
Add table
Reference in a new issue