diff --git a/embassy-stm32/src/usart/buffered.rs b/embassy-stm32/src/usart/buffered.rs index 46c49a997..2a711bc06 100644 --- a/embassy-stm32/src/usart/buffered.rs +++ b/embassy-stm32/src/usart/buffered.rs @@ -1,3 +1,4 @@ +use core::cell::RefCell; use core::future::{poll_fn, Future}; use core::task::Poll; @@ -29,7 +30,15 @@ unsafe impl<'d, T: BasicInstance> Send for StateInner<'d, T> {} unsafe impl<'d, T: BasicInstance> Sync for StateInner<'d, T> {} pub struct BufferedUart<'d, T: BasicInstance> { - inner: PeripheralMutex<'d, StateInner<'d, T>>, + inner: RefCell>>, +} + +pub struct BufferedUartTx<'u, 'd, T: BasicInstance> { + inner: &'u BufferedUart<'d, T>, +} + +pub struct BufferedUartRx<'u, 'd, T: BasicInstance> { + inner: &'u BufferedUart<'d, T>, } impl<'d, T: BasicInstance> Unpin for BufferedUart<'d, T> {} @@ -53,14 +62,124 @@ impl<'d, T: BasicInstance> BufferedUart<'d, T> { } Self { - inner: PeripheralMutex::new(irq, &mut state.0, move || StateInner { + inner: RefCell::new(PeripheralMutex::new(irq, &mut state.0, move || StateInner { phantom: PhantomData, tx: RingBuffer::new(tx_buffer), tx_waker: WakerRegistration::new(), rx: RingBuffer::new(rx_buffer), rx_waker: WakerRegistration::new(), - }), + })), + } + } + + pub fn split<'u>(&'u mut self) -> (BufferedUartRx<'u, 'd, T>, BufferedUartTx<'u, 'd, T>) { + (BufferedUartRx { inner: self }, BufferedUartTx { inner: self }) + } + + async fn inner_read<'a>(&'a self, buf: &'a mut [u8]) -> Result { + poll_fn(move |cx| { + let mut do_pend = false; + let mut inner = self.inner.borrow_mut(); + let res = inner.with(|state| { + compiler_fence(Ordering::SeqCst); + + // We have data ready in buffer? Return it. + let data = state.rx.pop_buf(); + if !data.is_empty() { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + + if state.rx.is_full() { + do_pend = true; + } + state.rx.pop(len); + + return Poll::Ready(Ok(len)); + } + + state.rx_waker.register(cx.waker()); + Poll::Pending + }); + + if do_pend { + inner.pend(); + } + + res + }) + .await + } + + async fn inner_write<'a>(&'a self, buf: &'a [u8]) -> Result { + poll_fn(move |cx| { + let mut inner = self.inner.borrow_mut(); + let (poll, empty) = inner.with(|state| { + let empty = state.tx.is_empty(); + let tx_buf = state.tx.push_buf(); + if tx_buf.is_empty() { + state.tx_waker.register(cx.waker()); + return (Poll::Pending, empty); + } + + let n = core::cmp::min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + state.tx.push(n); + + (Poll::Ready(Ok(n)), empty) + }); + if empty { + inner.pend(); + } + poll + }) + .await + } + + async fn inner_flush<'a>(&'a self) -> Result<(), Error> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + if !state.tx.is_empty() { + state.tx_waker.register(cx.waker()); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + }) + }) + .await + } + + async fn inner_fill_buf<'a>(&'a self) -> Result<&'a [u8], Error> { + poll_fn(move |cx| { + self.inner.borrow_mut().with(|state| { + compiler_fence(Ordering::SeqCst); + + // We have data ready in buffer? Return it. + let buf = state.rx.pop_buf(); + if !buf.is_empty() { + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + state.rx_waker.register(cx.waker()); + Poll::>::Pending + }) + }) + .await + } + + fn inner_consume(&self, amt: usize) { + let mut inner = self.inner.borrow_mut(); + let signal = inner.with(|state| { + let full = state.rx.is_full(); + state.rx.pop(amt); + full + }); + if signal { + inner.pend(); } } } @@ -155,41 +274,31 @@ impl<'d, T: BasicInstance> embedded_io::Io for BufferedUart<'d, T> { type Error = Error; } +impl<'u, 'd, T: BasicInstance> embedded_io::Io for BufferedUartRx<'u, 'd, T> { + type Error = Error; +} + +impl<'u, 'd, T: BasicInstance> embedded_io::Io for BufferedUartTx<'u, 'd, T> { + type Error = Error; +} + impl<'d, T: BasicInstance> embedded_io::asynch::Read for BufferedUart<'d, T> { type ReadFuture<'a> = impl Future> where Self: 'a; fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { - poll_fn(move |cx| { - let mut do_pend = false; - let res = self.inner.with(|state| { - compiler_fence(Ordering::SeqCst); + self.inner_read(buf) + } +} - // We have data ready in buffer? Return it. - let data = state.rx.pop_buf(); - if !data.is_empty() { - let len = data.len().min(buf.len()); - buf[..len].copy_from_slice(&data[..len]); +impl<'u, 'd, T: BasicInstance> embedded_io::asynch::Read for BufferedUartRx<'u, 'd, T> { + type ReadFuture<'a> = impl Future> + where + Self: 'a; - if state.rx.is_full() { - do_pend = true; - } - state.rx.pop(len); - - return Poll::Ready(Ok(len)); - } - - state.rx_waker.register(cx.waker()); - Poll::Pending - }); - - if do_pend { - self.inner.pend(); - } - - res - }) + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + self.inner.inner_read(buf) } } @@ -199,34 +308,25 @@ impl<'d, T: BasicInstance> embedded_io::asynch::BufRead for BufferedUart<'d, T> Self: 'a; fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { - poll_fn(move |cx| { - self.inner.with(|state| { - compiler_fence(Ordering::SeqCst); - - // We have data ready in buffer? Return it. - let buf = state.rx.pop_buf(); - if !buf.is_empty() { - let buf: &[u8] = buf; - // Safety: buffer lives as long as uart - let buf: &[u8] = unsafe { core::mem::transmute(buf) }; - return Poll::Ready(Ok(buf)); - } - - state.rx_waker.register(cx.waker()); - Poll::>::Pending - }) - }) + self.inner_fill_buf() } fn consume(&mut self, amt: usize) { - let signal = self.inner.with(|state| { - let full = state.rx.is_full(); - state.rx.pop(amt); - full - }); - if signal { - self.inner.pend(); - } + self.inner_consume(amt) + } +} + +impl<'u, 'd, T: BasicInstance> embedded_io::asynch::BufRead for BufferedUartRx<'u, 'd, T> { + type FillBufFuture<'a> = impl Future> + where + Self: 'a; + + fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { + self.inner.inner_fill_buf() + } + + fn consume(&mut self, amt: usize) { + self.inner.inner_consume(amt) } } @@ -236,26 +336,7 @@ impl<'d, T: BasicInstance> embedded_io::asynch::Write for BufferedUart<'d, T> { Self: 'a; fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { - poll_fn(move |cx| { - let (poll, empty) = self.inner.with(|state| { - let empty = state.tx.is_empty(); - let tx_buf = state.tx.push_buf(); - if tx_buf.is_empty() { - state.tx_waker.register(cx.waker()); - return (Poll::Pending, empty); - } - - let n = core::cmp::min(tx_buf.len(), buf.len()); - tx_buf[..n].copy_from_slice(&buf[..n]); - state.tx.push(n); - - (Poll::Ready(Ok(n)), empty) - }); - if empty { - self.inner.pend(); - } - poll - }) + self.inner_write(buf) } type FlushFuture<'a> = impl Future> @@ -263,15 +344,24 @@ impl<'d, T: BasicInstance> embedded_io::asynch::Write for BufferedUart<'d, T> { Self: 'a; fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { - poll_fn(move |cx| { - self.inner.with(|state| { - if !state.tx.is_empty() { - state.tx_waker.register(cx.waker()); - return Poll::Pending; - } - - Poll::Ready(Ok(())) - }) - }) + self.inner_flush() + } +} + +impl<'u, 'd, T: BasicInstance> embedded_io::asynch::Write for BufferedUartTx<'u, 'd, T> { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + self.inner.inner_write(buf) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + self.inner.inner_flush() } }