Merge #1490
1490: sync: do will_wake check in MultiWakerRegistration. r=Dirbaio a=Dirbaio Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
commit
c5c5b64729
2 changed files with 40 additions and 43 deletions
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
use core::cell::RefCell;
|
use core::cell::RefCell;
|
||||||
use core::fmt::Debug;
|
use core::fmt::Debug;
|
||||||
use core::task::{Context, Poll, Waker};
|
use core::task::{Context, Poll};
|
||||||
|
|
||||||
use heapless::Deque;
|
use heapless::Deque;
|
||||||
|
|
||||||
|
@ -179,7 +179,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
|
||||||
// No, so we need to reregister our waker and sleep again
|
// No, so we need to reregister our waker and sleep again
|
||||||
None => {
|
None => {
|
||||||
if let Some(cx) = cx {
|
if let Some(cx) = cx {
|
||||||
s.register_subscriber_waker(cx.waker());
|
s.subscriber_wakers.register(cx.waker());
|
||||||
}
|
}
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
|
@ -206,7 +206,7 @@ impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usi
|
||||||
// The queue is full, so we need to reregister our waker and go to sleep
|
// The queue is full, so we need to reregister our waker and go to sleep
|
||||||
Err(message) => {
|
Err(message) => {
|
||||||
if let Some(cx) = cx {
|
if let Some(cx) = cx {
|
||||||
s.register_publisher_waker(cx.waker());
|
s.publisher_wakers.register(cx.waker());
|
||||||
}
|
}
|
||||||
Err(message)
|
Err(message)
|
||||||
}
|
}
|
||||||
|
@ -335,34 +335,6 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
|
||||||
Some(WaitResult::Message(message))
|
Some(WaitResult::Message(message))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_subscriber_waker(&mut self, waker: &Waker) {
|
|
||||||
match self.subscriber_wakers.register(waker) {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(_) => {
|
|
||||||
// All waker slots were full. This can only happen when there was a subscriber that now has dropped.
|
|
||||||
// We need to throw it away. It's a bit inefficient, but we can wake everything.
|
|
||||||
// Any future that is still active will simply reregister.
|
|
||||||
// This won't happen a lot, so it's ok.
|
|
||||||
self.subscriber_wakers.wake();
|
|
||||||
self.subscriber_wakers.register(waker).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn register_publisher_waker(&mut self, waker: &Waker) {
|
|
||||||
match self.publisher_wakers.register(waker) {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(_) => {
|
|
||||||
// All waker slots were full. This can only happen when there was a publisher that now has dropped.
|
|
||||||
// We need to throw it away. It's a bit inefficient, but we can wake everything.
|
|
||||||
// Any future that is still active will simply reregister.
|
|
||||||
// This won't happen a lot, so it's ok.
|
|
||||||
self.publisher_wakers.wake();
|
|
||||||
self.publisher_wakers.register(waker).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
|
fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
|
||||||
self.subscriber_count -= 1;
|
self.subscriber_count -= 1;
|
||||||
|
|
||||||
|
|
|
@ -1,33 +1,58 @@
|
||||||
use core::task::Waker;
|
use core::task::Waker;
|
||||||
|
|
||||||
use super::WakerRegistration;
|
use heapless::Vec;
|
||||||
|
|
||||||
/// Utility struct to register and wake multiple wakers.
|
/// Utility struct to register and wake multiple wakers.
|
||||||
pub struct MultiWakerRegistration<const N: usize> {
|
pub struct MultiWakerRegistration<const N: usize> {
|
||||||
wakers: [WakerRegistration; N],
|
wakers: Vec<Waker, N>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<const N: usize> MultiWakerRegistration<N> {
|
impl<const N: usize> MultiWakerRegistration<N> {
|
||||||
/// Create a new empty instance
|
/// Create a new empty instance
|
||||||
pub const fn new() -> Self {
|
pub const fn new() -> Self {
|
||||||
const WAKER: WakerRegistration = WakerRegistration::new();
|
Self { wakers: Vec::new() }
|
||||||
Self { wakers: [WAKER; N] }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a waker. If the buffer is full the function returns it in the error
|
/// Register a waker. If the buffer is full the function returns it in the error
|
||||||
pub fn register<'a>(&mut self, w: &'a Waker) -> Result<(), &'a Waker> {
|
pub fn register<'a>(&mut self, w: &'a Waker) {
|
||||||
if let Some(waker_slot) = self.wakers.iter_mut().find(|waker_slot| !waker_slot.occupied()) {
|
// If we already have some waker that wakes the same task as `w`, do nothing.
|
||||||
waker_slot.register(w);
|
// This avoids cloning wakers, and avoids unnecessary mass-wakes.
|
||||||
Ok(())
|
for w2 in &self.wakers {
|
||||||
} else {
|
if w.will_wake(w2) {
|
||||||
Err(w)
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.wakers.is_full() {
|
||||||
|
// All waker slots were full. It's a bit inefficient, but we can wake everything.
|
||||||
|
// Any future that is still active will simply reregister.
|
||||||
|
// This won't happen a lot, so it's ok.
|
||||||
|
self.wake();
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.wakers.push(w.clone()).is_err() {
|
||||||
|
// This can't happen unless N=0
|
||||||
|
// (Either `wakers` wasn't full, or it was in which case `wake()` empied it)
|
||||||
|
panic!("tried to push a waker to a zero-length MultiWakerRegistration")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wake all registered wakers. This clears the buffer
|
/// Wake all registered wakers. This clears the buffer
|
||||||
pub fn wake(&mut self) {
|
pub fn wake(&mut self) {
|
||||||
for waker_slot in self.wakers.iter_mut() {
|
// heapless::Vec has no `drain()`, do it unsafely ourselves...
|
||||||
waker_slot.wake()
|
|
||||||
|
// First set length to 0, without dropping the contents.
|
||||||
|
// This is necessary for soundness: if wake() panics and we're using panic=unwind.
|
||||||
|
// Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state.
|
||||||
|
// (it'll leak wakers, but that's not UB)
|
||||||
|
let len = self.wakers.len();
|
||||||
|
unsafe { self.wakers.set_len(0) }
|
||||||
|
|
||||||
|
for i in 0..len {
|
||||||
|
// Move a waker out of the vec.
|
||||||
|
let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() };
|
||||||
|
// Wake it by value, which consumes (drops) it.
|
||||||
|
waker.wake();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue