From 4781feafc43b640b870fbde5f9f9c78934628dd6 Mon Sep 17 00:00:00 2001 From: Zoey Riordan Date: Tue, 30 Aug 2022 15:27:25 +0200 Subject: [PATCH 1/3] Add split() method to BufferedUarte in embassy-nrf --- embassy-nrf/src/buffered_uarte.rs | 179 ++++++++++++++++++++++++++++-- 1 file changed, 167 insertions(+), 12 deletions(-) diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 62af544ae..1bb75414d 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs @@ -13,6 +13,7 @@ //! //! Please also see [crate::uarte] to understand when [BufferedUarte] should be used. +use core::cell::RefCell; use core::cmp::min; use core::future::Future; use core::sync::atomic::{compiler_fence, Ordering}; @@ -71,7 +72,7 @@ struct StateInner<'d, U: UarteInstance, T: TimerInstance> { /// Interface to a UARTE instance pub struct BufferedUarte<'d, U: UarteInstance, T: TimerInstance> { - inner: PeripheralMutex<'d, StateInner<'d, U, T>>, + inner: RefCell>>, } impl<'d, U: UarteInstance, T: TimerInstance> Unpin for BufferedUarte<'d, U, T> {} @@ -169,7 +170,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { ppi_ch2.enable(); Self { - inner: PeripheralMutex::new(irq, &mut state.0, move || StateInner { + inner: RefCell::new(PeripheralMutex::new(irq, &mut state.0, move || StateInner { _peri: peri, timer, _ppi_ch1: ppi_ch1, @@ -182,13 +183,13 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { tx: RingBuffer::new(tx_buffer), tx_state: TxState::Idle, tx_waker: WakerRegistration::new(), - }), + })), } } /// Adjust the baud rate to the provided value. pub fn set_baudrate(&mut self, baudrate: Baudrate) { - self.inner.with(|state| { + self.inner.borrow_mut().with(|state| { let r = U::regs(); let timeout = 0x8000_0000 / (baudrate as u32 / 40); @@ -198,12 +199,35 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { r.baudrate.write(|w| w.baudrate().variant(baudrate)); }); } + + pub fn split<'u>(&'u mut self) -> (BufferedUarteRx<'u, 'd, U, T>, BufferedUarteTx<'u, 'd, U, T>) { + ( + BufferedUarteRx { inner: &self.inner }, + BufferedUarteTx { inner: &&self.inner }, + ) + } +} + +pub struct BufferedUarteTx<'u, 'd, U: UarteInstance, T: TimerInstance> { + inner: &'u RefCell>>, +} + +pub struct BufferedUarteRx<'u, 'd, U: UarteInstance, T: TimerInstance> { + inner: &'u RefCell>>, } impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarte<'d, U, T> { type Error = core::convert::Infallible; } +impl<'u, 'd, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarteRx<'u, 'd, U, T> { + type Error = core::convert::Infallible; +} + +impl<'u, 'd, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarteTx<'u, 'd, U, T> { + type Error = core::convert::Infallible; +} + impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for BufferedUarte<'d, U, T> { type ReadFuture<'a> = impl Future> where @@ -212,7 +236,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for Buffe fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { poll_fn(move |cx| { let mut do_pend = false; - let res = self.inner.with(|state| { + let res = self.inner.borrow_mut().with(|state| { compiler_fence(Ordering::SeqCst); trace!("poll_read"); @@ -232,7 +256,43 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for Buffe Poll::Pending }); if do_pend { - self.inner.pend(); + self.inner.borrow().pend(); + } + + res + }) + } +} + +impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for BufferedUarteRx<'u, 'd, U, T> { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + poll_fn(move |cx| { + let mut do_pend = false; + let res = self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("poll_read"); + + // We have data ready in buffer? Return it. + let data = state.rx.pop_buf(); + if !data.is_empty() { + trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + state.rx.pop(len); + do_pend = true; + return Poll::Ready(Ok(len)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::Pending + }); + if do_pend { + self.inner.borrow().pend(); } res @@ -247,7 +307,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for Bu fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { poll_fn(move |cx| { - self.inner.with(|state| { + self.inner.borrow_mut().with(|state| { compiler_fence(Ordering::SeqCst); trace!("fill_buf"); @@ -269,13 +329,53 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for Bu } fn consume(&mut self, amt: usize) { - let signal = self.inner.with(|state| { + let signal = self.inner.borrow_mut().with(|state| { let full = state.rx.is_full(); state.rx.pop(amt); full }); if signal { - self.inner.pend(); + self.inner.borrow().pend(); + } + } +} + +impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for BufferedUarteRx<'u, 'd, U, T> { + type FillBufFuture<'a> = impl Future> + where + Self: 'a; + + fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("fill_buf"); + + // We have data ready in buffer? Return it. + let buf = state.rx.pop_buf(); + if !buf.is_empty() { + trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::>::Pending + }) + }) + } + + fn consume(&mut self, amt: usize) { + let signal = self.inner.borrow_mut().with(|state| { + let full = state.rx.is_full(); + state.rx.pop(amt); + full + }); + if signal { + self.inner.borrow().pend(); } } } @@ -287,7 +387,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { poll_fn(move |cx| { - let res = self.inner.with(|state| { + let res = self.inner.borrow_mut().with(|state| { trace!("poll_write: {:?}", buf.len()); let tx_buf = state.tx.push_buf(); @@ -308,7 +408,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff Poll::Ready(Ok(n)) }); - self.inner.pend(); + self.inner.borrow_mut().pend(); res }) @@ -320,7 +420,62 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { poll_fn(move |cx| { - self.inner.with(|state| { + self.inner.borrow_mut().with(|state| { + trace!("poll_flush"); + + if !state.tx.is_empty() { + trace!("poll_flush: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + }) + }) + } +} + +impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for BufferedUarteTx<'u, 'd, U, T> { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + poll_fn(move |cx| { + let res = self.inner.borrow_mut().with(|state| { + trace!("poll_write: {:?}", buf.len()); + + let tx_buf = state.tx.push_buf(); + if tx_buf.is_empty() { + trace!("poll_write: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + let n = min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + state.tx.push(n); + + trace!("poll_write: queued {:?}", n); + + compiler_fence(Ordering::SeqCst); + + Poll::Ready(Ok(n)) + }); + + self.inner.borrow_mut().pend(); + + res + }) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { trace!("poll_flush"); if !state.tx.is_empty() { From b2720117c455bd0d3781f287446f86db2d94f8e4 Mon Sep 17 00:00:00 2001 From: Zoey Riordan Date: Tue, 30 Aug 2022 15:48:50 +0200 Subject: [PATCH 2/3] Deduplicate IO methods --- embassy-nrf/src/buffered_uarte.rs | 320 ++++++++++++------------------ 1 file changed, 126 insertions(+), 194 deletions(-) diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 1bb75414d..385dc7e4e 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs @@ -201,19 +201,129 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { } pub fn split<'u>(&'u mut self) -> (BufferedUarteRx<'u, 'd, U, T>, BufferedUarteTx<'u, 'd, U, T>) { - ( - BufferedUarteRx { inner: &self.inner }, - BufferedUarteTx { inner: &&self.inner }, - ) + (BufferedUarteRx { inner: self }, BufferedUarteTx { inner: self }) + } + + async fn inner_read<'a>(&'a self, buf: &'a mut [u8]) -> Result { + poll_fn(move |cx| { + let mut do_pend = false; + let res = self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("poll_read"); + + // We have data ready in buffer? Return it. + let data = state.rx.pop_buf(); + if !data.is_empty() { + trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + state.rx.pop(len); + do_pend = true; + return Poll::Ready(Ok(len)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::Pending + }); + if do_pend { + self.inner.borrow().pend(); + } + + res + }) + .await + } + + async fn inner_write<'a>(&'a self, buf: &'a [u8]) -> Result { + poll_fn(move |cx| { + let res = self.inner.borrow_mut().with(|state| { + trace!("poll_write: {:?}", buf.len()); + + let tx_buf = state.tx.push_buf(); + if tx_buf.is_empty() { + trace!("poll_write: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + let n = min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + state.tx.push(n); + + trace!("poll_write: queued {:?}", n); + + compiler_fence(Ordering::SeqCst); + + Poll::Ready(Ok(n)) + }); + + self.inner.borrow_mut().pend(); + + res + }) + .await + } + + async fn inner_flush<'a>(&'a self) -> Result<(), core::convert::Infallible> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + trace!("poll_flush"); + + if !state.tx.is_empty() { + trace!("poll_flush: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + }) + }) + .await + } + + async fn inner_fill_buf<'a>(&'a self) -> Result<&'a [u8], core::convert::Infallible> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("fill_buf"); + + // We have data ready in buffer? Return it. + let buf = state.rx.pop_buf(); + if !buf.is_empty() { + trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::>::Pending + }) + }) + .await + } + + fn inner_consume(&self, amt: usize) { + let signal = self.inner.borrow_mut().with(|state| { + let full = state.rx.is_full(); + state.rx.pop(amt); + full + }); + if signal { + self.inner.borrow().pend(); + } } } pub struct BufferedUarteTx<'u, 'd, U: UarteInstance, T: TimerInstance> { - inner: &'u RefCell>>, + inner: &'u BufferedUarte<'d, U, T>, } pub struct BufferedUarteRx<'u, 'd, U: UarteInstance, T: TimerInstance> { - inner: &'u RefCell>>, + inner: &'u BufferedUarte<'d, U, T>, } impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarte<'d, U, T> { @@ -234,33 +344,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for Buffe Self: 'a; fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - poll_fn(move |cx| { - let mut do_pend = false; - let res = self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("poll_read"); - - // We have data ready in buffer? Return it. - let data = state.rx.pop_buf(); - if !data.is_empty() { - trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); - state.rx.pop(len); - do_pend = true; - return Poll::Ready(Ok(len)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::Pending - }); - if do_pend { - self.inner.borrow().pend(); - } - - res - }) + self.inner_read(buf) } } @@ -270,33 +354,7 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read f Self: 'a; fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - poll_fn(move |cx| { - let mut do_pend = false; - let res = self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("poll_read"); - - // We have data ready in buffer? Return it. - let data = state.rx.pop_buf(); - if !data.is_empty() { - trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); - state.rx.pop(len); - do_pend = true; - return Poll::Ready(Ok(len)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::Pending - }); - if do_pend { - self.inner.borrow().pend(); - } - - res - }) + self.inner.inner_read(buf) } } @@ -306,37 +364,11 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for Bu Self: 'a; fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("fill_buf"); - - // We have data ready in buffer? Return it. - let buf = state.rx.pop_buf(); - if !buf.is_empty() { - trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::>::Pending - }) - }) + self.inner_fill_buf() } fn consume(&mut self, amt: usize) { - let signal = self.inner.borrow_mut().with(|state| { - let full = state.rx.is_full(); - state.rx.pop(amt); - full - }); - if signal { - self.inner.borrow().pend(); - } + self.inner_consume(amt) } } @@ -346,37 +378,11 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRea Self: 'a; fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("fill_buf"); - - // We have data ready in buffer? Return it. - let buf = state.rx.pop_buf(); - if !buf.is_empty() { - trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::>::Pending - }) - }) + self.inner.inner_fill_buf() } fn consume(&mut self, amt: usize) { - let signal = self.inner.borrow_mut().with(|state| { - let full = state.rx.is_full(); - state.rx.pop(amt); - full - }); - if signal { - self.inner.borrow().pend(); - } + self.inner.inner_consume(amt) } } @@ -386,32 +392,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff Self: 'a; fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - poll_fn(move |cx| { - let res = self.inner.borrow_mut().with(|state| { - trace!("poll_write: {:?}", buf.len()); - - let tx_buf = state.tx.push_buf(); - if tx_buf.is_empty() { - trace!("poll_write: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - let n = min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - state.tx.push(n); - - trace!("poll_write: queued {:?}", n); - - compiler_fence(Ordering::SeqCst); - - Poll::Ready(Ok(n)) - }); - - self.inner.borrow_mut().pend(); - - res - }) + self.inner_write(buf) } type FlushFuture<'a> = impl Future> @@ -419,19 +400,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff Self: 'a; fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - trace!("poll_flush"); - - if !state.tx.is_empty() { - trace!("poll_flush: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - }) - }) + self.inner_flush() } } @@ -441,32 +410,7 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write Self: 'a; fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - poll_fn(move |cx| { - let res = self.inner.borrow_mut().with(|state| { - trace!("poll_write: {:?}", buf.len()); - - let tx_buf = state.tx.push_buf(); - if tx_buf.is_empty() { - trace!("poll_write: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - let n = min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - state.tx.push(n); - - trace!("poll_write: queued {:?}", n); - - compiler_fence(Ordering::SeqCst); - - Poll::Ready(Ok(n)) - }); - - self.inner.borrow_mut().pend(); - - res - }) + self.inner.inner_write(buf) } type FlushFuture<'a> = impl Future> @@ -474,19 +418,7 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write Self: 'a; fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - trace!("poll_flush"); - - if !state.tx.is_empty() { - trace!("poll_flush: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - }) - }) + self.inner.inner_flush() } } From 171077bacf2a289dae115e0db00c37f3a721df53 Mon Sep 17 00:00:00 2001 From: Zoey Riordan Date: Tue, 30 Aug 2022 15:57:38 +0200 Subject: [PATCH 3/3] Avoid double-borrow --- embassy-nrf/src/buffered_uarte.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 385dc7e4e..c3cba2470 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs @@ -207,7 +207,8 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { async fn inner_read<'a>(&'a self, buf: &'a mut [u8]) -> Result { poll_fn(move |cx| { let mut do_pend = false; - let res = self.inner.borrow_mut().with(|state| { + let mut inner = self.inner.borrow_mut(); + let res = inner.with(|state| { compiler_fence(Ordering::SeqCst); trace!("poll_read"); @@ -227,7 +228,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { Poll::Pending }); if do_pend { - self.inner.borrow().pend(); + inner.pend(); } res @@ -237,7 +238,8 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { async fn inner_write<'a>(&'a self, buf: &'a [u8]) -> Result { poll_fn(move |cx| { - let res = self.inner.borrow_mut().with(|state| { + let mut inner = self.inner.borrow_mut(); + let res = inner.with(|state| { trace!("poll_write: {:?}", buf.len()); let tx_buf = state.tx.push_buf(); @@ -258,7 +260,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { Poll::Ready(Ok(n)) }); - self.inner.borrow_mut().pend(); + inner.pend(); res }) @@ -307,13 +309,14 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { } fn inner_consume(&self, amt: usize) { - let signal = self.inner.borrow_mut().with(|state| { + let mut inner = self.inner.borrow_mut(); + let signal = inner.with(|state| { let full = state.rx.is_full(); state.rx.pop(amt); full }); if signal { - self.inner.borrow().pend(); + inner.pend(); } } }