Use atomics to share state instead of a RefCell

This commit is contained in:
Liam Murphy 2021-06-30 15:55:52 +10:00
parent a64dec517c
commit 53b95588df

View file

@ -1,14 +1,14 @@
use core::cell::RefCell;
use core::convert::Infallible; use core::convert::Infallible;
use core::future::Future; use core::future::Future;
use core::marker::PhantomData; use core::marker::PhantomData;
use core::ptr::NonNull; use core::ptr;
use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering;
use core::task::Poll; use core::task::Poll;
use core::task::Waker;
use embassy::interrupt::InterruptExt; use embassy::interrupt::InterruptExt;
use embassy::traits; use embassy::traits;
use embassy::util::CriticalSectionMutex; use embassy::util::AtomicWaker;
use embassy::util::OnDrop; use embassy::util::OnDrop;
use embassy::util::Unborrow; use embassy::util::Unborrow;
use embassy_extras::unborrow; use embassy_extras::unborrow;
@ -25,25 +25,18 @@ impl RNG {
} }
} }
static STATE: CriticalSectionMutex<RefCell<State>> = static STATE: State = State {
CriticalSectionMutex::new(RefCell::new(State { ptr: AtomicPtr::new(ptr::null_mut()),
buffer: None, end: AtomicPtr::new(ptr::null_mut()),
waker: None, waker: AtomicWaker::new(),
index: 0, };
}));
struct State { struct State {
buffer: Option<NonNull<[u8]>>, ptr: AtomicPtr<u8>,
waker: Option<Waker>, end: AtomicPtr<u8>,
index: usize, waker: AtomicWaker,
} }
// SAFETY: `NonNull` is `!Send` because of the possibility of it being aliased.
// However, `buffer` is only used within `on_interrupt`,
// and the original `&mut` passed to `fill_bytes` cannot be used because the safety contract of `Rng::new`
// means that it must still be borrowed by `RngFuture`, and so `rustc` will not let it be accessed.
unsafe impl Send for State {}
/// A wrapper around an nRF RNG peripheral. /// A wrapper around an nRF RNG peripheral.
/// ///
/// It has a non-blocking API, through `embassy::traits::Rng`, and a blocking api through `rand`. /// It has a non-blocking API, through `embassy::traits::Rng`, and a blocking api through `rand`.
@ -70,7 +63,7 @@ impl<'d> Rng<'d> {
phantom: PhantomData, phantom: PhantomData,
}; };
Self::stop(); this.stop();
this.disable_irq(); this.disable_irq();
this.irq.set_handler(Self::on_interrupt); this.irq.set_handler(Self::on_interrupt);
@ -81,25 +74,54 @@ impl<'d> Rng<'d> {
} }
fn on_interrupt(_: *mut ()) { fn on_interrupt(_: *mut ()) {
critical_section::with(|cs| { // Clear the event.
let mut state = STATE.borrow(cs).borrow_mut();
// SAFETY: the safety requirements on `Rng::new` make sure that the original `&mut`'s lifetime is still valid,
// meaning it can't be aliased and is a valid pointer.
let buffer = unsafe { state.buffer.unwrap().as_mut() };
buffer[state.index] = RNG::regs().value.read().value().bits();
state.index += 1;
if state.index == buffer.len() {
// Stop the RNG within the interrupt so that it doesn't get triggered again on the way to waking the future.
Self::stop();
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
RNG::regs().events_valrdy.reset(); RNG::regs().events_valrdy.reset();
// Mutate the slice within a critical section,
// so that the future isn't dropped in between us loading the pointer and actually dereferencing it.
let (ptr, end) = critical_section::with(|_| {
let ptr = STATE.ptr.load(Ordering::Relaxed);
// We need to make sure we haven't already filled the whole slice,
// in case the interrupt fired again before the executor got back to the future.
let end = STATE.end.load(Ordering::Relaxed);
if !ptr.is_null() && ptr != end {
// If the future was dropped, the pointer would have been set to null,
// so we're still good to mutate the slice.
// The safety contract of `Rng::new` means that the future can't have been dropped
// without calling its destructor.
unsafe {
*ptr = RNG::regs().value.read().value().bits();
}
}
(ptr, end)
}); });
if ptr.is_null() || ptr == end {
// If the future was dropped, there's nothing to do.
// If `ptr == end`, we were called by mistake, so return.
return;
} }
fn stop() { let new_ptr = unsafe { ptr.add(1) };
match STATE
.ptr
.compare_exchange(ptr, new_ptr, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(ptr) => {
let end = STATE.end.load(Ordering::Relaxed);
// It doesn't matter if `end` was changed under our feet, because then this will just be false.
if ptr == end {
STATE.waker.wake();
}
}
Err(_) => {
// If the future was dropped or finished, there's no point trying to wake it.
// It will have already stopped the RNG, so there's no need to do that either.
}
}
}
fn stop(&self) {
RNG::regs().tasks_stop.write(|w| unsafe { w.bits(1) }) RNG::regs().tasks_stop.write(|w| unsafe { w.bits(1) })
} }
@ -140,38 +162,42 @@ impl<'d> traits::rng::Rng for Rng<'d> {
fn fill_bytes<'a>(&'a mut self, dest: &'a mut [u8]) -> Self::RngFuture<'a> { fn fill_bytes<'a>(&'a mut self, dest: &'a mut [u8]) -> Self::RngFuture<'a> {
async move { async move {
critical_section::with(|cs| { if dest.len() == 0 {
let mut state = STATE.borrow(cs).borrow_mut(); return Ok(()); // Nothing to fill
state.buffer = Some(dest.into()); }
});
let range = dest.as_mut_ptr_range();
// Even if we've preempted the interrupt, it can't preempt us again,
// so we don't need to worry about the order we write these in.
STATE.ptr.store(range.start, Ordering::Relaxed);
STATE.end.store(range.end, Ordering::Relaxed);
self.enable_irq(); self.enable_irq();
self.start(); self.start();
let on_drop = OnDrop::new(|| { let on_drop = OnDrop::new(|| {
Self::stop(); self.stop();
self.disable_irq(); self.disable_irq();
// The interrupt is now disabled and can't preempt us anymore, so the order doesn't matter here.
STATE.ptr.store(ptr::null_mut(), Ordering::Relaxed);
STATE.end.store(ptr::null_mut(), Ordering::Relaxed);
}); });
poll_fn(|cx| { poll_fn(|cx| {
critical_section::with(|cs| { STATE.waker.register(cx.waker());
let mut state = STATE.borrow(cs).borrow_mut();
state.waker = Some(cx.waker().clone());
// SAFETY: see safety message in interrupt handler.
// Also, both here and in the interrupt handler, we're in a critical section,
// so they can't interfere with each other.
let buffer = unsafe { state.buffer.unwrap().as_ref() };
if state.index == buffer.len() { // The interrupt will never modify `end`, so load it first and then get the most up-to-date `ptr`.
// Reset the state for next time let end = STATE.end.load(Ordering::Relaxed);
state.buffer = None; let ptr = STATE.ptr.load(Ordering::Relaxed);
state.index = 0;
if ptr == end {
// We're done.
Poll::Ready(()) Poll::Ready(())
} else { } else {
Poll::Pending Poll::Pending
} }
}) })
})
.await; .await;
// Trigger the teardown // Trigger the teardown
@ -193,7 +219,7 @@ impl<'d> RngCore for Rng<'d> {
*byte = regs.value.read().value().bits(); *byte = regs.value.read().value().bits();
} }
Self::stop(); self.stop();
} }
fn next_u32(&mut self) -> u32 { fn next_u32(&mut self) -> u32 {