From b2720117c455bd0d3781f287446f86db2d94f8e4 Mon Sep 17 00:00:00 2001 From: Zoey Riordan Date: Tue, 30 Aug 2022 15:48:50 +0200 Subject: [PATCH] Deduplicate IO methods --- embassy-nrf/src/buffered_uarte.rs | 320 ++++++++++++------------------ 1 file changed, 126 insertions(+), 194 deletions(-) diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs index 1bb75414d..385dc7e4e 100644 --- a/embassy-nrf/src/buffered_uarte.rs +++ b/embassy-nrf/src/buffered_uarte.rs @@ -201,19 +201,129 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> { } pub fn split<'u>(&'u mut self) -> (BufferedUarteRx<'u, 'd, U, T>, BufferedUarteTx<'u, 'd, U, T>) { - ( - BufferedUarteRx { inner: &self.inner }, - BufferedUarteTx { inner: &&self.inner }, - ) + (BufferedUarteRx { inner: self }, BufferedUarteTx { inner: self }) + } + + async fn inner_read<'a>(&'a self, buf: &'a mut [u8]) -> Result { + poll_fn(move |cx| { + let mut do_pend = false; + let res = self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("poll_read"); + + // We have data ready in buffer? Return it. + let data = state.rx.pop_buf(); + if !data.is_empty() { + trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + state.rx.pop(len); + do_pend = true; + return Poll::Ready(Ok(len)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::Pending + }); + if do_pend { + self.inner.borrow().pend(); + } + + res + }) + .await + } + + async fn inner_write<'a>(&'a self, buf: &'a [u8]) -> Result { + poll_fn(move |cx| { + let res = self.inner.borrow_mut().with(|state| { + trace!("poll_write: {:?}", buf.len()); + + let tx_buf = state.tx.push_buf(); + if tx_buf.is_empty() { + trace!("poll_write: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + let n = min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + state.tx.push(n); + + trace!("poll_write: queued {:?}", n); + + compiler_fence(Ordering::SeqCst); + + Poll::Ready(Ok(n)) + }); + + self.inner.borrow_mut().pend(); + + res + }) + .await + } + + async fn inner_flush<'a>(&'a self) -> Result<(), core::convert::Infallible> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + trace!("poll_flush"); + + if !state.tx.is_empty() { + trace!("poll_flush: pending"); + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + }) + }) + .await + } + + async fn inner_fill_buf<'a>(&'a self) -> Result<&'a [u8], core::convert::Infallible> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + trace!("fill_buf"); + + // We have data ready in buffer? Return it. + let buf = state.rx.pop_buf(); + if !buf.is_empty() { + trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + trace!(" empty"); + state.rx_waker.register(cx.waker()); + Poll::>::Pending + }) + }) + .await + } + + fn inner_consume(&self, amt: usize) { + let signal = self.inner.borrow_mut().with(|state| { + let full = state.rx.is_full(); + state.rx.pop(amt); + full + }); + if signal { + self.inner.borrow().pend(); + } } } pub struct BufferedUarteTx<'u, 'd, U: UarteInstance, T: TimerInstance> { - inner: &'u RefCell>>, + inner: &'u BufferedUarte<'d, U, T>, } pub struct BufferedUarteRx<'u, 'd, U: UarteInstance, T: TimerInstance> { - inner: &'u RefCell>>, + inner: &'u BufferedUarte<'d, U, T>, } impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::Io for BufferedUarte<'d, U, T> { @@ -234,33 +344,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read for Buffe Self: 'a; fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - poll_fn(move |cx| { - let mut do_pend = false; - let res = self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("poll_read"); - - // We have data ready in buffer? Return it. - let data = state.rx.pop_buf(); - if !data.is_empty() { - trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); - state.rx.pop(len); - do_pend = true; - return Poll::Ready(Ok(len)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::Pending - }); - if do_pend { - self.inner.borrow().pend(); - } - - res - }) + self.inner_read(buf) } } @@ -270,33 +354,7 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Read f Self: 'a; fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - poll_fn(move |cx| { - let mut do_pend = false; - let res = self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("poll_read"); - - // We have data ready in buffer? Return it. - let data = state.rx.pop_buf(); - if !data.is_empty() { - trace!(" got {:?} {:?}", data.as_ptr() as u32, data.len()); - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); - state.rx.pop(len); - do_pend = true; - return Poll::Ready(Ok(len)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::Pending - }); - if do_pend { - self.inner.borrow().pend(); - } - - res - }) + self.inner.inner_read(buf) } } @@ -306,37 +364,11 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRead for Bu Self: 'a; fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("fill_buf"); - - // We have data ready in buffer? Return it. - let buf = state.rx.pop_buf(); - if !buf.is_empty() { - trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::>::Pending - }) - }) + self.inner_fill_buf() } fn consume(&mut self, amt: usize) { - let signal = self.inner.borrow_mut().with(|state| { - let full = state.rx.is_full(); - state.rx.pop(amt); - full - }); - if signal { - self.inner.borrow().pend(); - } + self.inner_consume(amt) } } @@ -346,37 +378,11 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::BufRea Self: 'a; fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - compiler_fence(Ordering::SeqCst); - trace!("fill_buf"); - - // We have data ready in buffer? Return it. - let buf = state.rx.pop_buf(); - if !buf.is_empty() { - trace!(" got {:?} {:?}", buf.as_ptr() as u32, buf.len()); - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - trace!(" empty"); - state.rx_waker.register(cx.waker()); - Poll::>::Pending - }) - }) + self.inner.inner_fill_buf() } fn consume(&mut self, amt: usize) { - let signal = self.inner.borrow_mut().with(|state| { - let full = state.rx.is_full(); - state.rx.pop(amt); - full - }); - if signal { - self.inner.borrow().pend(); - } + self.inner.inner_consume(amt) } } @@ -386,32 +392,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff Self: 'a; fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - poll_fn(move |cx| { - let res = self.inner.borrow_mut().with(|state| { - trace!("poll_write: {:?}", buf.len()); - - let tx_buf = state.tx.push_buf(); - if tx_buf.is_empty() { - trace!("poll_write: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - let n = min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - state.tx.push(n); - - trace!("poll_write: queued {:?}", n); - - compiler_fence(Ordering::SeqCst); - - Poll::Ready(Ok(n)) - }); - - self.inner.borrow_mut().pend(); - - res - }) + self.inner_write(buf) } type FlushFuture<'a> = impl Future> @@ -419,19 +400,7 @@ impl<'d, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write for Buff Self: 'a; fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - trace!("poll_flush"); - - if !state.tx.is_empty() { - trace!("poll_flush: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - }) - }) + self.inner_flush() } } @@ -441,32 +410,7 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write Self: 'a; fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - poll_fn(move |cx| { - let res = self.inner.borrow_mut().with(|state| { - trace!("poll_write: {:?}", buf.len()); - - let tx_buf = state.tx.push_buf(); - if tx_buf.is_empty() { - trace!("poll_write: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - let n = min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - state.tx.push(n); - - trace!("poll_write: queued {:?}", n); - - compiler_fence(Ordering::SeqCst); - - Poll::Ready(Ok(n)) - }); - - self.inner.borrow_mut().pend(); - - res - }) + self.inner.inner_write(buf) } type FlushFuture<'a> = impl Future> @@ -474,19 +418,7 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write Self: 'a; fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - poll_fn(move |cx| { - self.inner.borrow_mut().with(|state| { - trace!("poll_flush"); - - if !state.tx.is_empty() { - trace!("poll_flush: pending"); - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - }) - }) + self.inner.inner_flush() } }