diff --git a/embassy-rp/Cargo.toml b/embassy-rp/Cargo.toml index c43fd7e72..d0cf8025c 100644 --- a/embassy-rp/Cargo.toml +++ b/embassy-rp/Cargo.toml @@ -27,7 +27,7 @@ intrinsics = [] rom-v2-intrinsics = [] # Enable nightly-only features -nightly = ["embassy-executor/nightly", "embedded-hal-1", "embedded-hal-async", "embassy-embedded-hal/nightly", "dep:embassy-usb"] +nightly = ["embassy-executor/nightly", "embedded-hal-1", "embedded-hal-async", "embassy-embedded-hal/nightly", "dep:embassy-usb", "dep:embedded-io"] # Implement embedded-hal 1.0 alpha traits. # Implement embedded-hal-async traits if `nightly` is set as well. @@ -52,6 +52,7 @@ cortex-m = "0.7.6" critical-section = "1.1" futures = { version = "0.3.17", default-features = false, features = ["async-await"] } chrono = { version = "0.4", default-features = false, optional = true } +embedded-io = { version = "0.3.0", features = ["async"], optional = true } rp2040-pac2 = { git = "https://github.com/embassy-rs/rp2040-pac2", rev="017e3c9007b2d3b6965f0d85b5bf8ce3fa6d7364", features = ["rt"] } #rp2040-pac2 = { path = "../../rp2040-pac2", features = ["rt"] } diff --git a/embassy-rp/src/uart/buffered.rs b/embassy-rp/src/uart/buffered.rs new file mode 100644 index 000000000..87e16f0eb --- /dev/null +++ b/embassy-rp/src/uart/buffered.rs @@ -0,0 +1,489 @@ +use core::future::{poll_fn, Future}; +use core::task::{Poll, Waker}; + +use atomic_polyfill::{compiler_fence, Ordering}; +use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage}; +use embassy_hal_common::ring_buffer::RingBuffer; +use embassy_sync::waitqueue::WakerRegistration; + +use super::*; + +pub struct State<'d, T: Instance>(StateStorage>); +impl<'d, T: Instance> State<'d, T> { + pub const fn new() -> Self { + Self(StateStorage::new()) + } +} + +pub struct RxState<'d, T: Instance>(StateStorage>); +impl<'d, T: Instance> RxState<'d, T> { + pub const fn new() -> Self { + Self(StateStorage::new()) + } +} + +pub struct TxState<'d, T: Instance>(StateStorage>); +impl<'d, T: Instance> TxState<'d, T> { + pub const fn new() -> Self { + Self(StateStorage::new()) + } +} + +struct RxStateInner<'d, T: Instance> { + phantom: PhantomData<&'d mut T>, + + waker: WakerRegistration, + buf: RingBuffer<'d>, +} + +struct TxStateInner<'d, T: Instance> { + phantom: PhantomData<&'d mut T>, + + waker: WakerRegistration, + buf: RingBuffer<'d>, +} + +struct FullStateInner<'d, T: Instance> { + rx: RxStateInner<'d, T>, + tx: TxStateInner<'d, T>, +} + +unsafe impl<'d, T: Instance> Send for RxStateInner<'d, T> {} +unsafe impl<'d, T: Instance> Sync for RxStateInner<'d, T> {} + +unsafe impl<'d, T: Instance> Send for TxStateInner<'d, T> {} +unsafe impl<'d, T: Instance> Sync for TxStateInner<'d, T> {} + +unsafe impl<'d, T: Instance> Send for FullStateInner<'d, T> {} +unsafe impl<'d, T: Instance> Sync for FullStateInner<'d, T> {} + +pub struct BufferedUart<'d, T: Instance> { + inner: PeripheralMutex<'d, FullStateInner<'d, T>>, +} + +pub struct BufferedUartRx<'d, T: Instance> { + inner: PeripheralMutex<'d, RxStateInner<'d, T>>, +} + +pub struct BufferedUartTx<'d, T: Instance> { + inner: PeripheralMutex<'d, TxStateInner<'d, T>>, +} + +impl<'d, T: Instance> Unpin for BufferedUart<'d, T> {} +impl<'d, T: Instance> Unpin for BufferedUartRx<'d, T> {} +impl<'d, T: Instance> Unpin for BufferedUartTx<'d, T> {} + +impl<'d, T: Instance> BufferedUart<'d, T> { + pub fn new( + state: &'d mut State<'d, T>, + _uart: Uart<'d, T, M>, + irq: impl Peripheral

+ 'd, + tx_buffer: &'d mut [u8], + rx_buffer: &'d mut [u8], + ) -> BufferedUart<'d, T> { + into_ref!(irq); + + let r = T::regs(); + unsafe { + r.uartimsc().modify(|w| { + w.set_rxim(true); + w.set_rtim(true); + w.set_txim(true); + }); + } + + Self { + inner: PeripheralMutex::new(irq, &mut state.0, move || FullStateInner { + tx: TxStateInner { + phantom: PhantomData, + waker: WakerRegistration::new(), + buf: RingBuffer::new(tx_buffer), + }, + rx: RxStateInner { + phantom: PhantomData, + waker: WakerRegistration::new(), + buf: RingBuffer::new(rx_buffer), + }, + }), + } + } +} + +impl<'d, T: Instance> BufferedUartRx<'d, T> { + pub fn new( + state: &'d mut RxState<'d, T>, + _uart: UartRx<'d, T, M>, + irq: impl Peripheral

+ 'd, + rx_buffer: &'d mut [u8], + ) -> BufferedUartRx<'d, T> { + into_ref!(irq); + + let r = T::regs(); + unsafe { + r.uartimsc().modify(|w| { + w.set_rxim(true); + w.set_rtim(true); + }); + } + + Self { + inner: PeripheralMutex::new(irq, &mut state.0, move || RxStateInner { + phantom: PhantomData, + + buf: RingBuffer::new(rx_buffer), + waker: WakerRegistration::new(), + }), + } + } +} + +impl<'d, T: Instance> BufferedUartTx<'d, T> { + pub fn new( + state: &'d mut TxState<'d, T>, + _uart: UartTx<'d, T, M>, + irq: impl Peripheral

+ 'd, + tx_buffer: &'d mut [u8], + ) -> BufferedUartTx<'d, T> { + into_ref!(irq); + + let r = T::regs(); + unsafe { + r.uartimsc().modify(|w| { + w.set_txim(true); + }); + } + + Self { + inner: PeripheralMutex::new(irq, &mut state.0, move || TxStateInner { + phantom: PhantomData, + + buf: RingBuffer::new(tx_buffer), + waker: WakerRegistration::new(), + }), + } + } +} + +impl<'d, T: Instance> PeripheralState for FullStateInner<'d, T> +where + Self: 'd, +{ + type Interrupt = T::Interrupt; + fn on_interrupt(&mut self) { + self.rx.on_interrupt(); + self.tx.on_interrupt(); + } +} + +impl<'d, T: Instance> RxStateInner<'d, T> +where + Self: 'd, +{ + fn read(&mut self, buf: &mut [u8], waker: &Waker) -> (Poll>, bool) { + // We have data ready in buffer? Return it. + let mut do_pend = false; + let data = self.buf.pop_buf(); + if !data.is_empty() { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + + if self.buf.is_full() { + do_pend = true; + } + self.buf.pop(len); + + return (Poll::Ready(Ok(len)), do_pend); + } + + self.waker.register(waker); + (Poll::Pending, do_pend) + } + + fn fill_buf<'a>(&mut self, waker: &Waker) -> Poll> { + // We have data ready in buffer? Return it. + let buf = self.buf.pop_buf(); + if !buf.is_empty() { + let buf: &[u8] = buf; + // Safety: buffer lives as long as uart + let buf: &[u8] = unsafe { core::mem::transmute(buf) }; + return Poll::Ready(Ok(buf)); + } + + self.waker.register(waker); + Poll::Pending + } + + fn consume(&mut self, amt: usize) -> bool { + let full = self.buf.is_full(); + self.buf.pop(amt); + full + } +} + +impl<'d, T: Instance> PeripheralState for RxStateInner<'d, T> +where + Self: 'd, +{ + type Interrupt = T::Interrupt; + fn on_interrupt(&mut self) { + let r = T::regs(); + unsafe { + let ris = r.uartris().read(); + // Clear interrupt flags + r.uarticr().modify(|w| { + w.set_rxic(true); + w.set_rtic(true); + }); + + if ris.peris() { + warn!("Parity error"); + r.uarticr().modify(|w| { + w.set_peic(true); + }); + } + if ris.feris() { + warn!("Framing error"); + r.uarticr().modify(|w| { + w.set_feic(true); + }); + } + if ris.beris() { + warn!("Break error"); + r.uarticr().modify(|w| { + w.set_beic(true); + }); + } + if ris.oeris() { + warn!("Overrun error"); + r.uarticr().modify(|w| { + w.set_oeic(true); + }); + } + + if !r.uartfr().read().rxfe() { + let buf = self.buf.push_buf(); + if !buf.is_empty() { + buf[0] = r.uartdr().read().data(); + self.buf.push(1); + } else { + warn!("RX buffer full, discard received byte"); + } + + if self.buf.is_full() { + self.waker.wake(); + } + } + + if ris.rtris() { + self.waker.wake(); + }; + } + } +} + +impl<'d, T: Instance> TxStateInner<'d, T> +where + Self: 'd, +{ + fn write(&mut self, buf: &[u8], waker: &Waker) -> (Poll>, bool) { + let empty = self.buf.is_empty(); + let tx_buf = self.buf.push_buf(); + if tx_buf.is_empty() { + self.waker.register(waker); + return (Poll::Pending, empty); + } + + let n = core::cmp::min(tx_buf.len(), buf.len()); + tx_buf[..n].copy_from_slice(&buf[..n]); + self.buf.push(n); + + (Poll::Ready(Ok(n)), empty) + } + + fn flush(&mut self, waker: &Waker) -> Poll> { + if !self.buf.is_empty() { + self.waker.register(waker); + return Poll::Pending; + } + + Poll::Ready(Ok(())) + } +} + +impl<'d, T: Instance> PeripheralState for TxStateInner<'d, T> +where + Self: 'd, +{ + type Interrupt = T::Interrupt; + fn on_interrupt(&mut self) { + let r = T::regs(); + unsafe { + let buf = self.buf.pop_buf(); + if !buf.is_empty() { + r.uartimsc().modify(|w| { + w.set_txim(true); + }); + r.uartdr().write(|w| w.set_data(buf[0].into())); + self.buf.pop(1); + self.waker.wake(); + } else { + // Disable interrupt until we have something to transmit again + r.uartimsc().modify(|w| { + w.set_txim(false); + }); + } + } + } +} + +impl embedded_io::Error for Error { + fn kind(&self) -> embedded_io::ErrorKind { + embedded_io::ErrorKind::Other + } +} + +impl<'d, T: Instance> embedded_io::Io for BufferedUart<'d, T> { + type Error = Error; +} + +impl<'d, T: Instance> embedded_io::Io for BufferedUartRx<'d, T> { + type Error = Error; +} + +impl<'d, T: Instance> embedded_io::Io for BufferedUartTx<'d, T> { + type Error = Error; +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUart<'d, T> { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + poll_fn(move |cx| { + let (res, do_pend) = self.inner.with(|state| { + compiler_fence(Ordering::SeqCst); + state.rx.read(buf, cx.waker()) + }); + + if do_pend { + self.inner.pend(); + } + + res + }) + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUartRx<'d, T> { + type ReadFuture<'a> = impl Future> + where + Self: 'a; + + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { + poll_fn(move |cx| { + let (res, do_pend) = self.inner.with(|state| { + compiler_fence(Ordering::SeqCst); + state.read(buf, cx.waker()) + }); + + if do_pend { + self.inner.pend(); + } + + res + }) + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T> { + type FillBufFuture<'a> = impl Future> + where + Self: 'a; + + fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { + poll_fn(move |cx| { + self.inner.with(|state| { + compiler_fence(Ordering::SeqCst); + state.rx.fill_buf(cx.waker()) + }) + }) + } + + fn consume(&mut self, amt: usize) { + let signal = self.inner.with(|state| state.rx.consume(amt)); + if signal { + self.inner.pend(); + } + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUartRx<'d, T> { + type FillBufFuture<'a> = impl Future> + where + Self: 'a; + + fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> { + poll_fn(move |cx| { + self.inner.with(|state| { + compiler_fence(Ordering::SeqCst); + state.fill_buf(cx.waker()) + }) + }) + } + + fn consume(&mut self, amt: usize) { + let signal = self.inner.with(|state| state.consume(amt)); + if signal { + self.inner.pend(); + } + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + poll_fn(move |cx| { + let (poll, empty) = self.inner.with(|state| state.tx.write(buf, cx.waker())); + if empty { + self.inner.pend(); + } + poll + }) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + poll_fn(move |cx| self.inner.with(|state| state.tx.flush(cx.waker()))) + } +} + +impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUartTx<'d, T> { + type WriteFuture<'a> = impl Future> + where + Self: 'a; + + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { + poll_fn(move |cx| { + let (poll, empty) = self.inner.with(|state| state.write(buf, cx.waker())); + if empty { + self.inner.pend(); + } + poll + }) + } + + type FlushFuture<'a> = impl Future> + where + Self: 'a; + + fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { + poll_fn(move |cx| self.inner.with(|state| state.flush(cx.waker()))) + } +} diff --git a/embassy-rp/src/uart.rs b/embassy-rp/src/uart/mod.rs similarity index 86% rename from embassy-rp/src/uart.rs rename to embassy-rp/src/uart/mod.rs index 987b716b4..d9285ee51 100644 --- a/embassy-rp/src/uart.rs +++ b/embassy-rp/src/uart/mod.rs @@ -346,6 +346,11 @@ impl<'d, T: Instance, M: Mode> Uart<'d, T, M> { w.set_fen(true); }); + r.uartifls().write(|w| { + w.set_rxiflsel(0b000); + w.set_txiflsel(0b000); + }); + r.uartcr().write(|w| { w.set_uarten(true); w.set_rxe(true); @@ -475,6 +480,75 @@ mod eh1 { impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::ErrorType for UartRx<'d, T, M> { type Error = Error; } + + impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Read for UartRx<'d, T, M> { + fn read(&mut self) -> nb::Result { + let r = T::regs(); + unsafe { + let dr = r.uartdr().read(); + + if dr.oe() { + Err(nb::Error::Other(Error::Overrun)) + } else if dr.be() { + Err(nb::Error::Other(Error::Break)) + } else if dr.pe() { + Err(nb::Error::Other(Error::Parity)) + } else if dr.fe() { + Err(nb::Error::Other(Error::Framing)) + } else if dr.fe() { + Ok(dr.data()) + } else { + Err(nb::Error::WouldBlock) + } + } + } + } + + impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::blocking::Write for UartTx<'d, T, M> { + fn write(&mut self, buffer: &[u8]) -> Result<(), Self::Error> { + self.blocking_write(buffer) + } + + fn flush(&mut self) -> Result<(), Self::Error> { + self.blocking_flush() + } + } + + impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Write for UartTx<'d, T, M> { + fn write(&mut self, char: u8) -> nb::Result<(), Self::Error> { + self.blocking_write(&[char]).map_err(nb::Error::Other) + } + + fn flush(&mut self) -> nb::Result<(), Self::Error> { + self.blocking_flush().map_err(nb::Error::Other) + } + } + + impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Read for Uart<'d, T, M> { + fn read(&mut self) -> Result> { + embedded_hal_02::serial::Read::read(&mut self.rx) + } + } + + impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::blocking::Write for Uart<'d, T, M> { + fn write(&mut self, buffer: &[u8]) -> Result<(), Self::Error> { + self.blocking_write(buffer) + } + + fn flush(&mut self) -> Result<(), Self::Error> { + self.blocking_flush() + } + } + + impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Write for Uart<'d, T, M> { + fn write(&mut self, char: u8) -> nb::Result<(), Self::Error> { + self.blocking_write(&[char]).map_err(nb::Error::Other) + } + + fn flush(&mut self) -> nb::Result<(), Self::Error> { + self.blocking_flush().map_err(nb::Error::Other) + } + } } #[cfg(all( @@ -532,6 +606,11 @@ mod eha { } } +#[cfg(feature = "nightly")] +mod buffered; +#[cfg(feature = "nightly")] +pub use buffered::*; + mod sealed { use super::*; @@ -541,6 +620,8 @@ mod sealed { const TX_DREQ: u8; const RX_DREQ: u8; + type Interrupt: crate::interrupt::Interrupt; + fn regs() -> pac::uart::Uart; } pub trait TxPin {} @@ -572,6 +653,8 @@ macro_rules! impl_instance { const TX_DREQ: u8 = $tx_dreq; const RX_DREQ: u8 = $rx_dreq; + type Interrupt = crate::interrupt::$irq; + fn regs() -> pac::uart::Uart { pac::$inst } @@ -580,8 +663,8 @@ macro_rules! impl_instance { }; } -impl_instance!(UART0, UART0, 20, 21); -impl_instance!(UART1, UART1, 22, 23); +impl_instance!(UART0, UART0_IRQ, 20, 21); +impl_instance!(UART1, UART1_IRQ, 22, 23); pub trait TxPin: sealed::TxPin + crate::gpio::Pin {} pub trait RxPin: sealed::RxPin + crate::gpio::Pin {} diff --git a/tests/rp/.cargo/config.toml b/tests/rp/.cargo/config.toml index 0330025e4..9611db3a0 100644 --- a/tests/rp/.cargo/config.toml +++ b/tests/rp/.cargo/config.toml @@ -3,7 +3,7 @@ build-std = ["core"] build-std-features = ["panic_immediate_abort"] [target.'cfg(all(target_arch = "arm", target_os = "none"))'] -#runner = "teleprobe client run --target bluepill-stm32f103c8 --elf" +#runner = "teleprobe client run --target rpi-pico --elf" runner = "teleprobe local run --chip RP2040 --elf" rustflags = [ diff --git a/tests/rp/Cargo.toml b/tests/rp/Cargo.toml index 7e2717ddf..503373759 100644 --- a/tests/rp/Cargo.toml +++ b/tests/rp/Cargo.toml @@ -20,6 +20,7 @@ embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.8" } embedded-hal-async = { version = "0.1.0-alpha.1" } panic-probe = { version = "0.3.0", features = ["print-defmt"] } futures = { version = "0.3.17", default-features = false, features = ["async-await"] } +embedded-io = { version = "0.3.0", features = ["async"] } [profile.dev] debug = 2 diff --git a/tests/rp/src/bin/uart_buffered.rs b/tests/rp/src/bin/uart_buffered.rs new file mode 100644 index 000000000..9cc20bb98 --- /dev/null +++ b/tests/rp/src/bin/uart_buffered.rs @@ -0,0 +1,44 @@ +#![no_std] +#![no_main] +#![feature(type_alias_impl_trait)] + +use defmt::{assert_eq, *}; +use embassy_executor::Spawner; +use embassy_rp::interrupt; +use embassy_rp::uart::{BufferedUart, Config, State, Uart}; +use embedded_io::asynch::{Read, Write}; +use {defmt_rtt as _, panic_probe as _}; + +#[embassy_executor::main] +async fn main(_spawner: Spawner) { + let p = embassy_rp::init(Default::default()); + info!("Hello World!"); + + let (tx, rx, uart) = (p.PIN_0, p.PIN_1, p.UART0); + + let config = Config::default(); + let uart = Uart::new_blocking(uart, tx, rx, config); + + let irq = interrupt::take!(UART0_IRQ); + let tx_buf = &mut [0u8; 16]; + let rx_buf = &mut [0u8; 16]; + let mut state = State::new(); + let mut uart = BufferedUart::new(&mut state, uart, irq, tx_buf, rx_buf); + + // Make sure we send more bytes than fits in the FIFO, to test the actual + // bufferedUart. + + let data = [ + 1_u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, 32, + ]; + uart.write_all(&data).await.unwrap(); + info!("Done writing"); + + let mut buf = [0; 32]; + uart.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, data); + + info!("Test OK"); + cortex_m::asm::bkpt(); +}