934: (embassy-rp): Add Buffered UART implementation r=MathiasKoch a=MathiasKoch

### Questions & concerns: 
- ~~Would it make sense to add `RxBufferedUart` and `TxBufferedUart`, for cases where you would want to only buffer one way?~~
- ~~Do I need to be monitoring more interrupt flags than `Receive` & `Receive timeout`?~~

This PR adds working `BufferedUart` implementation, along with `RxBufferedUart` and `TxBufferedUart`. The implementation leaves room for improvement with respect to performance, as it still does not utilize DMA nor the internal UART buffers.

Co-authored-by: Mathias <mk@blackbird.online>
Co-authored-by: Dario Nieuwenhuis <dirbaio@dirbaio.net>
This commit is contained in:
bors[bot] 2022-09-27 06:00:33 +00:00 committed by GitHub
commit 82d4360756
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 622 additions and 4 deletions

View file

@ -27,7 +27,7 @@ intrinsics = []
rom-v2-intrinsics = []
# Enable nightly-only features
nightly = ["embassy-executor/nightly", "embedded-hal-1", "embedded-hal-async", "embassy-embedded-hal/nightly", "dep:embassy-usb"]
nightly = ["embassy-executor/nightly", "embedded-hal-1", "embedded-hal-async", "embassy-embedded-hal/nightly", "dep:embassy-usb", "dep:embedded-io"]
# Implement embedded-hal 1.0 alpha traits.
# Implement embedded-hal-async traits if `nightly` is set as well.
@ -52,6 +52,7 @@ cortex-m = "0.7.6"
critical-section = "1.1"
futures = { version = "0.3.17", default-features = false, features = ["async-await"] }
chrono = { version = "0.4", default-features = false, optional = true }
embedded-io = { version = "0.3.0", features = ["async"], optional = true }
rp2040-pac2 = { git = "https://github.com/embassy-rs/rp2040-pac2", rev="017e3c9007b2d3b6965f0d85b5bf8ce3fa6d7364", features = ["rt"] }
#rp2040-pac2 = { path = "../../rp2040-pac2", features = ["rt"] }

View file

@ -0,0 +1,489 @@
use core::future::{poll_fn, Future};
use core::task::{Poll, Waker};
use atomic_polyfill::{compiler_fence, Ordering};
use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage};
use embassy_hal_common::ring_buffer::RingBuffer;
use embassy_sync::waitqueue::WakerRegistration;
use super::*;
pub struct State<'d, T: Instance>(StateStorage<FullStateInner<'d, T>>);
impl<'d, T: Instance> State<'d, T> {
pub const fn new() -> Self {
Self(StateStorage::new())
}
}
pub struct RxState<'d, T: Instance>(StateStorage<RxStateInner<'d, T>>);
impl<'d, T: Instance> RxState<'d, T> {
pub const fn new() -> Self {
Self(StateStorage::new())
}
}
pub struct TxState<'d, T: Instance>(StateStorage<TxStateInner<'d, T>>);
impl<'d, T: Instance> TxState<'d, T> {
pub const fn new() -> Self {
Self(StateStorage::new())
}
}
struct RxStateInner<'d, T: Instance> {
phantom: PhantomData<&'d mut T>,
waker: WakerRegistration,
buf: RingBuffer<'d>,
}
struct TxStateInner<'d, T: Instance> {
phantom: PhantomData<&'d mut T>,
waker: WakerRegistration,
buf: RingBuffer<'d>,
}
struct FullStateInner<'d, T: Instance> {
rx: RxStateInner<'d, T>,
tx: TxStateInner<'d, T>,
}
unsafe impl<'d, T: Instance> Send for RxStateInner<'d, T> {}
unsafe impl<'d, T: Instance> Sync for RxStateInner<'d, T> {}
unsafe impl<'d, T: Instance> Send for TxStateInner<'d, T> {}
unsafe impl<'d, T: Instance> Sync for TxStateInner<'d, T> {}
unsafe impl<'d, T: Instance> Send for FullStateInner<'d, T> {}
unsafe impl<'d, T: Instance> Sync for FullStateInner<'d, T> {}
pub struct BufferedUart<'d, T: Instance> {
inner: PeripheralMutex<'d, FullStateInner<'d, T>>,
}
pub struct BufferedUartRx<'d, T: Instance> {
inner: PeripheralMutex<'d, RxStateInner<'d, T>>,
}
pub struct BufferedUartTx<'d, T: Instance> {
inner: PeripheralMutex<'d, TxStateInner<'d, T>>,
}
impl<'d, T: Instance> Unpin for BufferedUart<'d, T> {}
impl<'d, T: Instance> Unpin for BufferedUartRx<'d, T> {}
impl<'d, T: Instance> Unpin for BufferedUartTx<'d, T> {}
impl<'d, T: Instance> BufferedUart<'d, T> {
pub fn new<M: Mode>(
state: &'d mut State<'d, T>,
_uart: Uart<'d, T, M>,
irq: impl Peripheral<P = T::Interrupt> + 'd,
tx_buffer: &'d mut [u8],
rx_buffer: &'d mut [u8],
) -> BufferedUart<'d, T> {
into_ref!(irq);
let r = T::regs();
unsafe {
r.uartimsc().modify(|w| {
w.set_rxim(true);
w.set_rtim(true);
w.set_txim(true);
});
}
Self {
inner: PeripheralMutex::new(irq, &mut state.0, move || FullStateInner {
tx: TxStateInner {
phantom: PhantomData,
waker: WakerRegistration::new(),
buf: RingBuffer::new(tx_buffer),
},
rx: RxStateInner {
phantom: PhantomData,
waker: WakerRegistration::new(),
buf: RingBuffer::new(rx_buffer),
},
}),
}
}
}
impl<'d, T: Instance> BufferedUartRx<'d, T> {
pub fn new<M: Mode>(
state: &'d mut RxState<'d, T>,
_uart: UartRx<'d, T, M>,
irq: impl Peripheral<P = T::Interrupt> + 'd,
rx_buffer: &'d mut [u8],
) -> BufferedUartRx<'d, T> {
into_ref!(irq);
let r = T::regs();
unsafe {
r.uartimsc().modify(|w| {
w.set_rxim(true);
w.set_rtim(true);
});
}
Self {
inner: PeripheralMutex::new(irq, &mut state.0, move || RxStateInner {
phantom: PhantomData,
buf: RingBuffer::new(rx_buffer),
waker: WakerRegistration::new(),
}),
}
}
}
impl<'d, T: Instance> BufferedUartTx<'d, T> {
pub fn new<M: Mode>(
state: &'d mut TxState<'d, T>,
_uart: UartTx<'d, T, M>,
irq: impl Peripheral<P = T::Interrupt> + 'd,
tx_buffer: &'d mut [u8],
) -> BufferedUartTx<'d, T> {
into_ref!(irq);
let r = T::regs();
unsafe {
r.uartimsc().modify(|w| {
w.set_txim(true);
});
}
Self {
inner: PeripheralMutex::new(irq, &mut state.0, move || TxStateInner {
phantom: PhantomData,
buf: RingBuffer::new(tx_buffer),
waker: WakerRegistration::new(),
}),
}
}
}
impl<'d, T: Instance> PeripheralState for FullStateInner<'d, T>
where
Self: 'd,
{
type Interrupt = T::Interrupt;
fn on_interrupt(&mut self) {
self.rx.on_interrupt();
self.tx.on_interrupt();
}
}
impl<'d, T: Instance> RxStateInner<'d, T>
where
Self: 'd,
{
fn read(&mut self, buf: &mut [u8], waker: &Waker) -> (Poll<Result<usize, Error>>, bool) {
// We have data ready in buffer? Return it.
let mut do_pend = false;
let data = self.buf.pop_buf();
if !data.is_empty() {
let len = data.len().min(buf.len());
buf[..len].copy_from_slice(&data[..len]);
if self.buf.is_full() {
do_pend = true;
}
self.buf.pop(len);
return (Poll::Ready(Ok(len)), do_pend);
}
self.waker.register(waker);
(Poll::Pending, do_pend)
}
fn fill_buf<'a>(&mut self, waker: &Waker) -> Poll<Result<&'a [u8], Error>> {
// We have data ready in buffer? Return it.
let buf = self.buf.pop_buf();
if !buf.is_empty() {
let buf: &[u8] = buf;
// Safety: buffer lives as long as uart
let buf: &[u8] = unsafe { core::mem::transmute(buf) };
return Poll::Ready(Ok(buf));
}
self.waker.register(waker);
Poll::Pending
}
fn consume(&mut self, amt: usize) -> bool {
let full = self.buf.is_full();
self.buf.pop(amt);
full
}
}
impl<'d, T: Instance> PeripheralState for RxStateInner<'d, T>
where
Self: 'd,
{
type Interrupt = T::Interrupt;
fn on_interrupt(&mut self) {
let r = T::regs();
unsafe {
let ris = r.uartris().read();
// Clear interrupt flags
r.uarticr().modify(|w| {
w.set_rxic(true);
w.set_rtic(true);
});
if ris.peris() {
warn!("Parity error");
r.uarticr().modify(|w| {
w.set_peic(true);
});
}
if ris.feris() {
warn!("Framing error");
r.uarticr().modify(|w| {
w.set_feic(true);
});
}
if ris.beris() {
warn!("Break error");
r.uarticr().modify(|w| {
w.set_beic(true);
});
}
if ris.oeris() {
warn!("Overrun error");
r.uarticr().modify(|w| {
w.set_oeic(true);
});
}
if !r.uartfr().read().rxfe() {
let buf = self.buf.push_buf();
if !buf.is_empty() {
buf[0] = r.uartdr().read().data();
self.buf.push(1);
} else {
warn!("RX buffer full, discard received byte");
}
if self.buf.is_full() {
self.waker.wake();
}
}
if ris.rtris() {
self.waker.wake();
};
}
}
}
impl<'d, T: Instance> TxStateInner<'d, T>
where
Self: 'd,
{
fn write(&mut self, buf: &[u8], waker: &Waker) -> (Poll<Result<usize, Error>>, bool) {
let empty = self.buf.is_empty();
let tx_buf = self.buf.push_buf();
if tx_buf.is_empty() {
self.waker.register(waker);
return (Poll::Pending, empty);
}
let n = core::cmp::min(tx_buf.len(), buf.len());
tx_buf[..n].copy_from_slice(&buf[..n]);
self.buf.push(n);
(Poll::Ready(Ok(n)), empty)
}
fn flush(&mut self, waker: &Waker) -> Poll<Result<(), Error>> {
if !self.buf.is_empty() {
self.waker.register(waker);
return Poll::Pending;
}
Poll::Ready(Ok(()))
}
}
impl<'d, T: Instance> PeripheralState for TxStateInner<'d, T>
where
Self: 'd,
{
type Interrupt = T::Interrupt;
fn on_interrupt(&mut self) {
let r = T::regs();
unsafe {
let buf = self.buf.pop_buf();
if !buf.is_empty() {
r.uartimsc().modify(|w| {
w.set_txim(true);
});
r.uartdr().write(|w| w.set_data(buf[0].into()));
self.buf.pop(1);
self.waker.wake();
} else {
// Disable interrupt until we have something to transmit again
r.uartimsc().modify(|w| {
w.set_txim(false);
});
}
}
}
}
impl embedded_io::Error for Error {
fn kind(&self) -> embedded_io::ErrorKind {
embedded_io::ErrorKind::Other
}
}
impl<'d, T: Instance> embedded_io::Io for BufferedUart<'d, T> {
type Error = Error;
}
impl<'d, T: Instance> embedded_io::Io for BufferedUartRx<'d, T> {
type Error = Error;
}
impl<'d, T: Instance> embedded_io::Io for BufferedUartTx<'d, T> {
type Error = Error;
}
impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUart<'d, T> {
type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
where
Self: 'a;
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
poll_fn(move |cx| {
let (res, do_pend) = self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
state.rx.read(buf, cx.waker())
});
if do_pend {
self.inner.pend();
}
res
})
}
}
impl<'d, T: Instance + 'd> embedded_io::asynch::Read for BufferedUartRx<'d, T> {
type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
where
Self: 'a;
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
poll_fn(move |cx| {
let (res, do_pend) = self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
state.read(buf, cx.waker())
});
if do_pend {
self.inner.pend();
}
res
})
}
}
impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUart<'d, T> {
type FillBufFuture<'a> = impl Future<Output = Result<&'a [u8], Self::Error>>
where
Self: 'a;
fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> {
poll_fn(move |cx| {
self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
state.rx.fill_buf(cx.waker())
})
})
}
fn consume(&mut self, amt: usize) {
let signal = self.inner.with(|state| state.rx.consume(amt));
if signal {
self.inner.pend();
}
}
}
impl<'d, T: Instance + 'd> embedded_io::asynch::BufRead for BufferedUartRx<'d, T> {
type FillBufFuture<'a> = impl Future<Output = Result<&'a [u8], Self::Error>>
where
Self: 'a;
fn fill_buf<'a>(&'a mut self) -> Self::FillBufFuture<'a> {
poll_fn(move |cx| {
self.inner.with(|state| {
compiler_fence(Ordering::SeqCst);
state.fill_buf(cx.waker())
})
})
}
fn consume(&mut self, amt: usize) {
let signal = self.inner.with(|state| state.consume(amt));
if signal {
self.inner.pend();
}
}
}
impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUart<'d, T> {
type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
where
Self: 'a;
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
poll_fn(move |cx| {
let (poll, empty) = self.inner.with(|state| state.tx.write(buf, cx.waker()));
if empty {
self.inner.pend();
}
poll
})
}
type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
where
Self: 'a;
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
poll_fn(move |cx| self.inner.with(|state| state.tx.flush(cx.waker())))
}
}
impl<'d, T: Instance + 'd> embedded_io::asynch::Write for BufferedUartTx<'d, T> {
type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
where
Self: 'a;
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
poll_fn(move |cx| {
let (poll, empty) = self.inner.with(|state| state.write(buf, cx.waker()));
if empty {
self.inner.pend();
}
poll
})
}
type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
where
Self: 'a;
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
poll_fn(move |cx| self.inner.with(|state| state.flush(cx.waker())))
}
}

View file

@ -346,6 +346,11 @@ impl<'d, T: Instance, M: Mode> Uart<'d, T, M> {
w.set_fen(true);
});
r.uartifls().write(|w| {
w.set_rxiflsel(0b000);
w.set_txiflsel(0b000);
});
r.uartcr().write(|w| {
w.set_uarten(true);
w.set_rxe(true);
@ -475,6 +480,75 @@ mod eh1 {
impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::ErrorType for UartRx<'d, T, M> {
type Error = Error;
}
impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Read for UartRx<'d, T, M> {
fn read(&mut self) -> nb::Result<u8, Self::Error> {
let r = T::regs();
unsafe {
let dr = r.uartdr().read();
if dr.oe() {
Err(nb::Error::Other(Error::Overrun))
} else if dr.be() {
Err(nb::Error::Other(Error::Break))
} else if dr.pe() {
Err(nb::Error::Other(Error::Parity))
} else if dr.fe() {
Err(nb::Error::Other(Error::Framing))
} else if dr.fe() {
Ok(dr.data())
} else {
Err(nb::Error::WouldBlock)
}
}
}
}
impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::blocking::Write for UartTx<'d, T, M> {
fn write(&mut self, buffer: &[u8]) -> Result<(), Self::Error> {
self.blocking_write(buffer)
}
fn flush(&mut self) -> Result<(), Self::Error> {
self.blocking_flush()
}
}
impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Write for UartTx<'d, T, M> {
fn write(&mut self, char: u8) -> nb::Result<(), Self::Error> {
self.blocking_write(&[char]).map_err(nb::Error::Other)
}
fn flush(&mut self) -> nb::Result<(), Self::Error> {
self.blocking_flush().map_err(nb::Error::Other)
}
}
impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Read for Uart<'d, T, M> {
fn read(&mut self) -> Result<u8, nb::Error<Self::Error>> {
embedded_hal_02::serial::Read::read(&mut self.rx)
}
}
impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::blocking::Write for Uart<'d, T, M> {
fn write(&mut self, buffer: &[u8]) -> Result<(), Self::Error> {
self.blocking_write(buffer)
}
fn flush(&mut self) -> Result<(), Self::Error> {
self.blocking_flush()
}
}
impl<'d, T: Instance, M: Mode> embedded_hal_1::serial::nb::Write for Uart<'d, T, M> {
fn write(&mut self, char: u8) -> nb::Result<(), Self::Error> {
self.blocking_write(&[char]).map_err(nb::Error::Other)
}
fn flush(&mut self) -> nb::Result<(), Self::Error> {
self.blocking_flush().map_err(nb::Error::Other)
}
}
}
#[cfg(all(
@ -532,6 +606,11 @@ mod eha {
}
}
#[cfg(feature = "nightly")]
mod buffered;
#[cfg(feature = "nightly")]
pub use buffered::*;
mod sealed {
use super::*;
@ -541,6 +620,8 @@ mod sealed {
const TX_DREQ: u8;
const RX_DREQ: u8;
type Interrupt: crate::interrupt::Interrupt;
fn regs() -> pac::uart::Uart;
}
pub trait TxPin<T: Instance> {}
@ -572,6 +653,8 @@ macro_rules! impl_instance {
const TX_DREQ: u8 = $tx_dreq;
const RX_DREQ: u8 = $rx_dreq;
type Interrupt = crate::interrupt::$irq;
fn regs() -> pac::uart::Uart {
pac::$inst
}
@ -580,8 +663,8 @@ macro_rules! impl_instance {
};
}
impl_instance!(UART0, UART0, 20, 21);
impl_instance!(UART1, UART1, 22, 23);
impl_instance!(UART0, UART0_IRQ, 20, 21);
impl_instance!(UART1, UART1_IRQ, 22, 23);
pub trait TxPin<T: Instance>: sealed::TxPin<T> + crate::gpio::Pin {}
pub trait RxPin<T: Instance>: sealed::RxPin<T> + crate::gpio::Pin {}

View file

@ -3,7 +3,7 @@ build-std = ["core"]
build-std-features = ["panic_immediate_abort"]
[target.'cfg(all(target_arch = "arm", target_os = "none"))']
#runner = "teleprobe client run --target bluepill-stm32f103c8 --elf"
#runner = "teleprobe client run --target rpi-pico --elf"
runner = "teleprobe local run --chip RP2040 --elf"
rustflags = [

View file

@ -20,6 +20,7 @@ embedded-hal-1 = { package = "embedded-hal", version = "1.0.0-alpha.8" }
embedded-hal-async = { version = "0.1.0-alpha.1" }
panic-probe = { version = "0.3.0", features = ["print-defmt"] }
futures = { version = "0.3.17", default-features = false, features = ["async-await"] }
embedded-io = { version = "0.3.0", features = ["async"] }
[profile.dev]
debug = 2

View file

@ -0,0 +1,44 @@
#![no_std]
#![no_main]
#![feature(type_alias_impl_trait)]
use defmt::{assert_eq, *};
use embassy_executor::Spawner;
use embassy_rp::interrupt;
use embassy_rp::uart::{BufferedUart, Config, State, Uart};
use embedded_io::asynch::{Read, Write};
use {defmt_rtt as _, panic_probe as _};
#[embassy_executor::main]
async fn main(_spawner: Spawner) {
let p = embassy_rp::init(Default::default());
info!("Hello World!");
let (tx, rx, uart) = (p.PIN_0, p.PIN_1, p.UART0);
let config = Config::default();
let uart = Uart::new_blocking(uart, tx, rx, config);
let irq = interrupt::take!(UART0_IRQ);
let tx_buf = &mut [0u8; 16];
let rx_buf = &mut [0u8; 16];
let mut state = State::new();
let mut uart = BufferedUart::new(&mut state, uart, irq, tx_buf, rx_buf);
// Make sure we send more bytes than fits in the FIFO, to test the actual
// bufferedUart.
let data = [
1_u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32,
];
uart.write_all(&data).await.unwrap();
info!("Done writing");
let mut buf = [0; 32];
uart.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, data);
info!("Test OK");
cortex_m::asm::bkpt();
}