diff --git a/embassy-nrf/src/buffered_uarte.rs b/embassy-nrf/src/buffered_uarte.rs
index 112f084c1..79f9a1f78 100644
--- a/embassy-nrf/src/buffered_uarte.rs
+++ b/embassy-nrf/src/buffered_uarte.rs
@@ -1,10 +1,5 @@
 //! Async buffered UART driver.
 //!
-//! WARNING!!! The functionality provided here is intended to be used only
-//! in situations where hardware flow control are available i.e. CTS and RTS.
-//! This is a problem that should be addressed at a later stage and can be
-//! fully explained at <https://github.com/embassy-rs/embassy/issues/536>.
-//!
 //! Note that discarding a future from a read or write operation may lead to losing
 //! data. For example, when using `futures_util::future::select` and completion occurs
 //! on the "other" future, you should capture the incomplete future and continue to use
@@ -13,82 +8,120 @@
 //!
 //! Please also see [crate::uarte] to understand when [BufferedUarte] should be used.
 
-use core::cell::RefCell;
 use core::cmp::min;
 use core::future::poll_fn;
-use core::sync::atomic::{compiler_fence, Ordering};
+use core::slice;
+use core::sync::atomic::{compiler_fence, AtomicU8, AtomicUsize, Ordering};
 use core::task::Poll;
 
-use embassy_cortex_m::peripheral::{PeripheralMutex, PeripheralState, StateStorage};
-use embassy_hal_common::ring_buffer::RingBuffer;
+use embassy_cortex_m::interrupt::Interrupt;
+use embassy_hal_common::atomic_ring_buffer::RingBuffer;
 use embassy_hal_common::{into_ref, PeripheralRef};
-use embassy_sync::waitqueue::WakerRegistration;
+use embassy_sync::waitqueue::AtomicWaker;
 // Re-export SVD variants to allow user to directly set values
 pub use pac::uarte0::{baudrate::BAUDRATE_A as Baudrate, config::PARITY_A as Parity};
 
-use crate::gpio::{self, Pin as GpioPin};
+use crate::gpio::sealed::Pin;
+use crate::gpio::{self, AnyPin, Pin as GpioPin, PselBits};
 use crate::interrupt::InterruptExt;
-use crate::ppi::{AnyConfigurableChannel, ConfigurableChannel, Event, Ppi, Task};
-use crate::timer::{Frequency, Instance as TimerInstance, Timer};
+use crate::ppi::{
+    self, AnyConfigurableChannel, AnyGroup, Channel, ConfigurableChannel, Event, Group, Ppi, PpiGroup, Task,
+};
+use crate::timer::{Instance as TimerInstance, Timer};
 use crate::uarte::{apply_workaround_for_enable_anomaly, Config, Instance as UarteInstance};
 use crate::{pac, Peripheral};
 
-#[derive(Copy, Clone, Debug, PartialEq)]
-enum RxState {
-    Idle,
-    Receiving,
-}
+mod sealed {
+    use super::*;
 
-#[derive(Copy, Clone, Debug, PartialEq)]
-enum TxState {
-    Idle,
-    Transmitting(usize),
-}
+    pub struct State {
+        pub tx_waker: AtomicWaker,
+        pub tx_buf: RingBuffer,
+        pub tx_count: AtomicUsize,
 
-/// A type for storing the state of the UARTE peripheral that can be stored in a static.
-pub struct State<'d, U: UarteInstance, T: TimerInstance>(StateStorage<StateInner<'d, U, T>>);
-impl<'d, U: UarteInstance, T: TimerInstance> State<'d, U, T> {
-    /// Create an instance for storing UARTE peripheral state.
-    pub fn new() -> Self {
-        Self(StateStorage::new())
+        pub rx_waker: AtomicWaker,
+        pub rx_buf: RingBuffer,
+        pub rx_bufs: AtomicU8,
+        pub rx_ppi_ch: AtomicU8,
     }
 }
 
-struct StateInner<'d, U: UarteInstance, T: TimerInstance> {
-    _peri: PeripheralRef<'d, U>,
-    timer: Timer<'d, T>,
-    _ppi_ch1: Ppi<'d, AnyConfigurableChannel, 1, 2>,
-    _ppi_ch2: Ppi<'d, AnyConfigurableChannel, 1, 1>,
+pub(crate) use sealed::State;
 
-    rx: RingBuffer<'d>,
-    rx_state: RxState,
-    rx_waker: WakerRegistration,
+impl State {
+    pub(crate) const fn new() -> Self {
+        Self {
+            tx_waker: AtomicWaker::new(),
+            tx_buf: RingBuffer::new(),
+            tx_count: AtomicUsize::new(0),
 
-    tx: RingBuffer<'d>,
-    tx_state: TxState,
-    tx_waker: WakerRegistration,
+            rx_waker: AtomicWaker::new(),
+            rx_buf: RingBuffer::new(),
+            rx_bufs: AtomicU8::new(0),
+            rx_ppi_ch: AtomicU8::new(0),
+        }
+    }
 }
 
 /// Buffered UARTE driver.
 pub struct BufferedUarte<'d, U: UarteInstance, T: TimerInstance> {
-    inner: RefCell<PeripheralMutex<'d, StateInner<'d, U, T>>>,
+    _peri: PeripheralRef<'d, U>,
+    timer: Timer<'d, T>,
+    _ppi_ch1: Ppi<'d, AnyConfigurableChannel, 1, 1>,
+    _ppi_ch2: Ppi<'d, AnyConfigurableChannel, 1, 2>,
+    _ppi_group: PpiGroup<'d, AnyGroup>,
 }
 
 impl<'d, U: UarteInstance, T: TimerInstance> Unpin for BufferedUarte<'d, U, T> {}
 
 impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
-    /// Create a new instance of a BufferedUarte.
+    /// Create a new BufferedUarte without hardware flow control.
     ///
-    /// See the [module documentation](crate::buffered_uarte) for more details about the intended use.
+    /// # Panics
     ///
-    /// The BufferedUarte uses the provided state to store the buffers and peripheral state. The timer and ppi channels are used to 'emulate' idle line detection so that read operations
-    /// can return early if there is no data to receive.
+    /// Panics if `rx_buffer.len()` is odd.
     pub fn new(
-        state: &'d mut State<'d, U, T>,
-        peri: impl Peripheral<P = U> + 'd,
+        uarte: impl Peripheral<P = U> + 'd,
         timer: impl Peripheral<P = T> + 'd,
-        ppi_ch1: impl Peripheral<P = impl ConfigurableChannel + 'd> + 'd,
-        ppi_ch2: impl Peripheral<P = impl ConfigurableChannel + 'd> + 'd,
+        ppi_ch1: impl Peripheral<P = impl ConfigurableChannel> + 'd,
+        ppi_ch2: impl Peripheral<P = impl ConfigurableChannel> + 'd,
+        ppi_group: impl Peripheral<P = impl Group> + 'd,
+        irq: impl Peripheral<P = U::Interrupt> + 'd,
+        rxd: impl Peripheral<P = impl GpioPin> + 'd,
+        txd: impl Peripheral<P = impl GpioPin> + 'd,
+        config: Config,
+        rx_buffer: &'d mut [u8],
+        tx_buffer: &'d mut [u8],
+    ) -> Self {
+        into_ref!(rxd, txd, ppi_ch1, ppi_ch2, ppi_group);
+        Self::new_inner(
+            uarte,
+            timer,
+            ppi_ch1.map_into(),
+            ppi_ch2.map_into(),
+            ppi_group.map_into(),
+            irq,
+            rxd.map_into(),
+            txd.map_into(),
+            None,
+            None,
+            config,
+            rx_buffer,
+            tx_buffer,
+        )
+    }
+
+    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
+    ///
+    /// # Panics
+    ///
+    /// Panics if `rx_buffer.len()` is odd.
+    pub fn new_with_rtscts(
+        uarte: impl Peripheral<P = U> + 'd,
+        timer: impl Peripheral<P = T> + 'd,
+        ppi_ch1: impl Peripheral<P = impl ConfigurableChannel> + 'd,
+        ppi_ch2: impl Peripheral<P = impl ConfigurableChannel> + 'd,
+        ppi_group: impl Peripheral<P = impl Group> + 'd,
         irq: impl Peripheral<P = U::Interrupt> + 'd,
         rxd: impl Peripheral<P = impl GpioPin> + 'd,
         txd: impl Peripheral<P = impl GpioPin> + 'd,
@@ -98,12 +131,45 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
         rx_buffer: &'d mut [u8],
         tx_buffer: &'d mut [u8],
     ) -> Self {
-        into_ref!(peri, ppi_ch1, ppi_ch2, irq, rxd, txd, cts, rts);
+        into_ref!(rxd, txd, cts, rts, ppi_ch1, ppi_ch2, ppi_group);
+        Self::new_inner(
+            uarte,
+            timer,
+            ppi_ch1.map_into(),
+            ppi_ch2.map_into(),
+            ppi_group.map_into(),
+            irq,
+            rxd.map_into(),
+            txd.map_into(),
+            Some(cts.map_into()),
+            Some(rts.map_into()),
+            config,
+            rx_buffer,
+            tx_buffer,
+        )
+    }
+
+    fn new_inner(
+        peri: impl Peripheral<P = U> + 'd,
+        timer: impl Peripheral<P = T> + 'd,
+        ppi_ch1: PeripheralRef<'d, AnyConfigurableChannel>,
+        ppi_ch2: PeripheralRef<'d, AnyConfigurableChannel>,
+        ppi_group: PeripheralRef<'d, AnyGroup>,
+        irq: impl Peripheral<P = U::Interrupt> + 'd,
+        rxd: PeripheralRef<'d, AnyPin>,
+        txd: PeripheralRef<'d, AnyPin>,
+        cts: Option<PeripheralRef<'d, AnyPin>>,
+        rts: Option<PeripheralRef<'d, AnyPin>>,
+        config: Config,
+        rx_buffer: &'d mut [u8],
+        tx_buffer: &'d mut [u8],
+    ) -> Self {
+        into_ref!(peri, timer, irq);
+
+        assert!(rx_buffer.len() % 2 == 0);
 
         let r = U::regs();
 
-        let mut timer = Timer::new(timer);
-
         rxd.conf().write(|w| w.input().connect().drive().h0h1());
         r.psel.rxd.write(|w| unsafe { w.bits(rxd.psel_bits()) });
 
@@ -111,92 +177,200 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
         txd.conf().write(|w| w.dir().output().drive().h0h1());
         r.psel.txd.write(|w| unsafe { w.bits(txd.psel_bits()) });
 
-        cts.conf().write(|w| w.input().connect().drive().h0h1());
+        if let Some(pin) = &cts {
+            pin.conf().write(|w| w.input().connect().drive().h0h1());
+        }
         r.psel.cts.write(|w| unsafe { w.bits(cts.psel_bits()) });
 
-        rts.set_high();
-        rts.conf().write(|w| w.dir().output().drive().h0h1());
+        if let Some(pin) = &rts {
+            pin.set_high();
+            pin.conf().write(|w| w.dir().output().drive().h0h1());
+        }
         r.psel.rts.write(|w| unsafe { w.bits(rts.psel_bits()) });
 
-        r.baudrate.write(|w| w.baudrate().variant(config.baudrate));
-        r.config.write(|w| w.parity().variant(config.parity));
+        // Initialize state
+        let s = U::buffered_state();
+        s.tx_count.store(0, Ordering::Relaxed);
+        s.rx_bufs.store(0, Ordering::Relaxed);
+        let len = tx_buffer.len();
+        unsafe { s.tx_buf.init(tx_buffer.as_mut_ptr(), len) };
+        let len = rx_buffer.len();
+        unsafe { s.rx_buf.init(rx_buffer.as_mut_ptr(), len) };
 
         // Configure
         r.config.write(|w| {
-            w.hwfc().bit(true);
+            w.hwfc().bit(false);
             w.parity().variant(config.parity);
             w
         });
         r.baudrate.write(|w| w.baudrate().variant(config.baudrate));
 
-        // Enable interrupts
-        r.intenset.write(|w| w.endrx().set().endtx().set());
+        // clear errors
+        let errors = r.errorsrc.read().bits();
+        r.errorsrc.write(|w| unsafe { w.bits(errors) });
 
-        // Disable the irq, let the Registration enable it when everything is set up.
-        irq.disable();
-        irq.pend();
+        r.events_rxstarted.reset();
+        r.events_txstarted.reset();
+        r.events_error.reset();
+        r.events_endrx.reset();
+        r.events_endtx.reset();
+
+        // Enable interrupts
+        r.intenclr.write(|w| unsafe { w.bits(!0) });
+        r.intenset.write(|w| {
+            w.endtx().set();
+            w.rxstarted().set();
+            w.error().set();
+            w
+        });
 
         // Enable UARTE instance
         apply_workaround_for_enable_anomaly(&r);
         r.enable.write(|w| w.enable().enabled());
 
-        // BAUDRATE register values are `baudrate * 2^32 / 16000000`
-        // source: https://devzone.nordicsemi.com/f/nordic-q-a/391/uart-baudrate-register-values
-        //
-        // We want to stop RX if line is idle for 2 bytes worth of time
-        // That is 20 bits (each byte is 1 start bit + 8 data bits + 1 stop bit)
-        // This gives us the amount of 16M ticks for 20 bits.
-        let timeout = 0x8000_0000 / (config.baudrate as u32 / 40);
+        // Configure byte counter.
+        let mut timer = Timer::new_counter(timer);
+        timer.cc(1).write(rx_buffer.len() as u32 * 2);
+        timer.cc(1).short_compare_clear();
+        timer.clear();
+        timer.start();
 
-        timer.set_frequency(Frequency::F16MHz);
-        timer.cc(0).write(timeout);
-        timer.cc(0).short_compare_clear();
-        timer.cc(0).short_compare_stop();
-
-        let mut ppi_ch1 = Ppi::new_one_to_two(
-            ppi_ch1.map_into(),
-            Event::from_reg(&r.events_rxdrdy),
-            timer.task_clear(),
-            timer.task_start(),
-        );
+        let mut ppi_ch1 = Ppi::new_one_to_one(ppi_ch1, Event::from_reg(&r.events_rxdrdy), timer.task_count());
         ppi_ch1.enable();
 
-        let mut ppi_ch2 = Ppi::new_one_to_one(
-            ppi_ch2.map_into(),
-            timer.cc(0).event_compare(),
-            Task::from_reg(&r.tasks_stoprx),
+        s.rx_ppi_ch.store(ppi_ch2.number() as u8, Ordering::Relaxed);
+        let mut ppi_group = PpiGroup::new(ppi_group);
+        let mut ppi_ch2 = Ppi::new_one_to_two(
+            ppi_ch2,
+            Event::from_reg(&r.events_endrx),
+            Task::from_reg(&r.tasks_startrx),
+            ppi_group.task_disable_all(),
         );
-        ppi_ch2.enable();
+        ppi_ch2.disable();
+        ppi_group.add_channel(&ppi_ch2);
+
+        irq.disable();
+        irq.set_handler(Self::on_interrupt);
+        irq.pend();
+        irq.enable();
 
         Self {
-            inner: RefCell::new(PeripheralMutex::new(irq, &mut state.0, move || StateInner {
-                _peri: peri,
-                timer,
-                _ppi_ch1: ppi_ch1,
-                _ppi_ch2: ppi_ch2,
-
-                rx: RingBuffer::new(rx_buffer),
-                rx_state: RxState::Idle,
-                rx_waker: WakerRegistration::new(),
-
-                tx: RingBuffer::new(tx_buffer),
-                tx_state: TxState::Idle,
-                tx_waker: WakerRegistration::new(),
-            })),
+            _peri: peri,
+            timer,
+            _ppi_ch1: ppi_ch1,
+            _ppi_ch2: ppi_ch2,
+            _ppi_group: ppi_group,
         }
     }
 
+    fn pend_irq() {
+        unsafe { <U::Interrupt as Interrupt>::steal() }.pend()
+    }
+
+    fn on_interrupt(_: *mut ()) {
+        //trace!("irq: start");
+        let r = U::regs();
+        let s = U::buffered_state();
+
+        let buf_len = s.rx_buf.len();
+        let half_len = buf_len / 2;
+        let mut tx = unsafe { s.tx_buf.reader() };
+        let mut rx = unsafe { s.rx_buf.writer() };
+
+        if r.events_error.read().bits() != 0 {
+            r.events_error.reset();
+            let errs = r.errorsrc.read();
+            r.errorsrc.write(|w| unsafe { w.bits(errs.bits()) });
+
+            if errs.overrun().bit() {
+                panic!("BufferedUarte overrun");
+            }
+        }
+
+        // Received some bytes, wake task.
+        if r.inten.read().rxdrdy().bit_is_set() && r.events_rxdrdy.read().events_rxdrdy().bit_is_set() {
+            r.intenclr.write(|w| w.rxdrdy().clear());
+            r.events_rxdrdy.reset();
+            s.rx_waker.wake();
+        }
+
+        // If not RXing, start.
+        if s.rx_bufs.load(Ordering::Relaxed) == 0 {
+            let (ptr, len) = rx.push_buf();
+            if len >= half_len {
+                //trace!("  irq_rx: starting {:?}", half_len);
+                s.rx_bufs.store(1, Ordering::Relaxed);
+
+                // Set up the DMA read
+                r.rxd.ptr.write(|w| unsafe { w.ptr().bits(ptr as u32) });
+                r.rxd.maxcnt.write(|w| unsafe { w.maxcnt().bits(half_len as _) });
+
+                // Start UARTE Receive transaction
+                r.tasks_startrx.write(|w| unsafe { w.bits(1) });
+                rx.push_done(half_len);
+                r.intenset.write(|w| w.rxstarted().set());
+            }
+        }
+
+        if r.events_rxstarted.read().bits() != 0 {
+            //trace!("  irq_rx: rxstarted");
+            let (ptr, len) = rx.push_buf();
+            if len >= half_len {
+                //trace!("  irq_rx: starting second {:?}", half_len);
+
+                // Set up the DMA read
+                r.rxd.ptr.write(|w| unsafe { w.ptr().bits(ptr as u32) });
+                r.rxd.maxcnt.write(|w| unsafe { w.maxcnt().bits(half_len as _) });
+
+                let chn = s.rx_ppi_ch.load(Ordering::Relaxed);
+
+                ppi::regs().chenset.write(|w| unsafe { w.bits(1 << chn) });
+
+                rx.push_done(half_len);
+
+                r.events_rxstarted.reset();
+            } else {
+                //trace!("  irq_rx: rxstarted no buf");
+                r.intenclr.write(|w| w.rxstarted().clear());
+            }
+        }
+
+        // =============================
+
+        // TX end
+        if r.events_endtx.read().bits() != 0 {
+            r.events_endtx.reset();
+
+            let n = s.tx_count.load(Ordering::Relaxed);
+            //trace!("  irq_tx: endtx {:?}", n);
+            tx.pop_done(n);
+            s.tx_waker.wake();
+            s.tx_count.store(0, Ordering::Relaxed);
+        }
+
+        // If not TXing, start.
+        if s.tx_count.load(Ordering::Relaxed) == 0 {
+            let (ptr, len) = tx.pop_buf();
+            if len != 0 {
+                //trace!("  irq_tx: starting {:?}", len);
+                s.tx_count.store(len, Ordering::Relaxed);
+
+                // Set up the DMA write
+                r.txd.ptr.write(|w| unsafe { w.ptr().bits(ptr as u32) });
+                r.txd.maxcnt.write(|w| unsafe { w.maxcnt().bits(len as _) });
+
+                // Start UARTE Transmit transaction
+                r.tasks_starttx.write(|w| unsafe { w.bits(1) });
+            }
+        }
+
+        //trace!("irq: end");
+    }
+
     /// Adjust the baud rate to the provided value.
     pub fn set_baudrate(&mut self, baudrate: Baudrate) {
-        self.inner.borrow_mut().with(|state| {
-            let r = U::regs();
-
-            let timeout = 0x8000_0000 / (baudrate as u32 / 40);
-            state.timer.cc(0).write(timeout);
-            state.timer.clear();
-
-            r.baudrate.write(|w| w.baudrate().variant(baudrate));
-        });
+        let r = U::regs();
+        r.baudrate.write(|w| w.baudrate().variant(baudrate));
     }
 
     /// Split the UART in reader and writer parts.
@@ -206,120 +380,117 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
         (BufferedUarteRx { inner: self }, BufferedUarteTx { inner: self })
     }
 
-    async fn inner_read<'a>(&'a self, buf: &'a mut [u8]) -> Result<usize, core::convert::Infallible> {
-        poll_fn(move |cx| {
-            let mut do_pend = false;
-            let mut inner = self.inner.borrow_mut();
-            let res = inner.with(|state| {
-                compiler_fence(Ordering::SeqCst);
-                trace!("poll_read");
-
-                // We have data ready in buffer? Return it.
-                let data = state.rx.pop_buf();
-                if !data.is_empty() {
-                    trace!("  got {:?} {:?}", data.as_ptr() as u32, data.len());
-                    let len = data.len().min(buf.len());
-                    buf[..len].copy_from_slice(&data[..len]);
-                    state.rx.pop(len);
-                    do_pend = true;
-                    return Poll::Ready(Ok(len));
-                }
-
-                trace!("  empty");
-                state.rx_waker.register(cx.waker());
-                Poll::Pending
-            });
-            if do_pend {
-                inner.pend();
-            }
-
-            res
-        })
-        .await
+    async fn inner_read(&self, buf: &mut [u8]) -> Result<usize, core::convert::Infallible> {
+        let data = self.inner_fill_buf().await?;
+        let n = data.len().min(buf.len());
+        buf[..n].copy_from_slice(&data[..n]);
+        self.inner_consume(n);
+        Ok(n)
     }
 
     async fn inner_write<'a>(&'a self, buf: &'a [u8]) -> Result<usize, core::convert::Infallible> {
         poll_fn(move |cx| {
-            let mut inner = self.inner.borrow_mut();
-            let res = inner.with(|state| {
-                trace!("poll_write: {:?}", buf.len());
+            //trace!("poll_write: {:?}", buf.len());
+            let s = U::buffered_state();
+            let mut tx = unsafe { s.tx_buf.writer() };
 
-                let tx_buf = state.tx.push_buf();
-                if tx_buf.is_empty() {
-                    trace!("poll_write: pending");
-                    state.tx_waker.register(cx.waker());
-                    return Poll::Pending;
-                }
+            let tx_buf = tx.push_slice();
+            if tx_buf.is_empty() {
+                //trace!("poll_write: pending");
+                s.tx_waker.register(cx.waker());
+                return Poll::Pending;
+            }
 
-                let n = min(tx_buf.len(), buf.len());
-                tx_buf[..n].copy_from_slice(&buf[..n]);
-                state.tx.push(n);
+            let n = min(tx_buf.len(), buf.len());
+            tx_buf[..n].copy_from_slice(&buf[..n]);
+            tx.push_done(n);
 
-                trace!("poll_write: queued {:?}", n);
+            //trace!("poll_write: queued {:?}", n);
 
-                compiler_fence(Ordering::SeqCst);
+            compiler_fence(Ordering::SeqCst);
+            Self::pend_irq();
 
-                Poll::Ready(Ok(n))
-            });
-
-            inner.pend();
-
-            res
+            Poll::Ready(Ok(n))
         })
         .await
     }
 
     async fn inner_flush<'a>(&'a self) -> Result<(), core::convert::Infallible> {
         poll_fn(move |cx| {
-            self.inner.borrow_mut().with(|state| {
-                trace!("poll_flush");
+            //trace!("poll_flush");
+            let s = U::buffered_state();
+            if !s.tx_buf.is_empty() {
+                //trace!("poll_flush: pending");
+                s.tx_waker.register(cx.waker());
+                return Poll::Pending;
+            }
 
-                if !state.tx.is_empty() {
-                    trace!("poll_flush: pending");
-                    state.tx_waker.register(cx.waker());
-                    return Poll::Pending;
-                }
-
-                Poll::Ready(Ok(()))
-            })
+            Poll::Ready(Ok(()))
         })
         .await
     }
 
     async fn inner_fill_buf<'a>(&'a self) -> Result<&'a [u8], core::convert::Infallible> {
         poll_fn(move |cx| {
-            self.inner.borrow_mut().with(|state| {
-                compiler_fence(Ordering::SeqCst);
-                trace!("fill_buf");
+            compiler_fence(Ordering::SeqCst);
+            //trace!("poll_read");
 
-                // We have data ready in buffer? Return it.
-                let buf = state.rx.pop_buf();
-                if !buf.is_empty() {
-                    trace!("  got {:?} {:?}", buf.as_ptr() as u32, buf.len());
-                    let buf: &[u8] = buf;
-                    // Safety: buffer lives as long as uart
-                    let buf: &[u8] = unsafe { core::mem::transmute(buf) };
-                    return Poll::Ready(Ok(buf));
-                }
+            let r = U::regs();
+            let s = U::buffered_state();
 
-                trace!("  empty");
-                state.rx_waker.register(cx.waker());
-                Poll::<Result<&[u8], core::convert::Infallible>>::Pending
-            })
+            // Read the RXDRDY counter.
+            T::regs().tasks_capture[0].write(|w| unsafe { w.bits(1) });
+            let mut end = T::regs().cc[0].read().bits() as usize;
+            //trace!("  rxdrdy count = {:?}", end);
+
+            // We've set a compare channel that resets the counter to 0 when it reaches `len*2`.
+            // However, it's unclear if that's instant, or there's a small window where you can
+            // still read `len()*2`.
+            // This could happen if in one clock cycle the counter is updated, and in the next the
+            // clear takes effect. The docs are very sparse, they just say "Task delays: After TIMER
+            // is started, the CLEAR, COUNT, and STOP tasks are guaranteed to take effect within one
+            // clock cycle of the PCLK16M." :shrug:
+            // So, we wrap the counter ourselves, just in case.
+            if end > s.rx_buf.len() * 2 {
+                end = 0
+            }
+
+            // This logic mirrors `atomic_ring_buffer::Reader::pop_buf()`
+            let mut start = s.rx_buf.start.load(Ordering::Relaxed);
+            let len = s.rx_buf.len();
+            if start == end {
+                //trace!("  empty");
+                s.rx_waker.register(cx.waker());
+                r.intenset.write(|w| w.rxdrdy().set_bit());
+                return Poll::Pending;
+            }
+
+            if start >= len {
+                start -= len
+            }
+            if end >= len {
+                end -= len
+            }
+
+            let n = if end > start { end - start } else { len - start };
+            assert!(n != 0);
+            //trace!("  uarte ringbuf: pop_buf {:?}..{:?}", start, start + n);
+
+            let buf = s.rx_buf.buf.load(Ordering::Relaxed);
+            Poll::Ready(Ok(unsafe { slice::from_raw_parts(buf.add(start), n) }))
         })
         .await
     }
 
     fn inner_consume(&self, amt: usize) {
-        let mut inner = self.inner.borrow_mut();
-        let signal = inner.with(|state| {
-            let full = state.rx.is_full();
-            state.rx.pop(amt);
-            full
-        });
-        if signal {
-            inner.pend();
+        if amt == 0 {
+            return;
         }
+
+        let s = U::buffered_state();
+        let mut rx = unsafe { s.rx_buf.reader() };
+        rx.pop_done(amt);
+        U::regs().intenset.write(|w| w.rxstarted().set());
     }
 }
 
@@ -397,7 +568,7 @@ impl<'u, 'd: 'u, U: UarteInstance, T: TimerInstance> embedded_io::asynch::Write
     }
 }
 
-impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> {
+impl<'a, U: UarteInstance, T: TimerInstance> Drop for BufferedUarte<'a, U, T> {
     fn drop(&mut self) {
         let r = U::regs();
 
@@ -418,108 +589,11 @@ impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> {
         gpio::deconfigure_pin(r.psel.txd.read().bits());
         gpio::deconfigure_pin(r.psel.rts.read().bits());
         gpio::deconfigure_pin(r.psel.cts.read().bits());
-    }
-}
-
-impl<'a, U: UarteInstance, T: TimerInstance> PeripheralState for StateInner<'a, U, T> {
-    type Interrupt = U::Interrupt;
-    fn on_interrupt(&mut self) {
-        trace!("irq: start");
-        let r = U::regs();
-
-        loop {
-            match self.rx_state {
-                RxState::Idle => {
-                    trace!("  irq_rx: in state idle");
-
-                    let buf = self.rx.push_buf();
-                    if !buf.is_empty() {
-                        trace!("  irq_rx: starting {:?}", buf.len());
-                        self.rx_state = RxState::Receiving;
-
-                        // Set up the DMA read
-                        r.rxd.ptr.write(|w|
-                            // The PTR field is a full 32 bits wide and accepts the full range
-                            // of values.
-                            unsafe { w.ptr().bits(buf.as_ptr() as u32) });
-                        r.rxd.maxcnt.write(|w|
-                            // We're giving it the length of the buffer, so no danger of
-                            // accessing invalid memory. We have verified that the length of the
-                            // buffer fits in an `u8`, so the cast to `u8` is also fine.
-                            //
-                            // The MAXCNT field is at least 8 bits wide and accepts the full
-                            // range of values.
-                            unsafe { w.maxcnt().bits(buf.len() as _) });
-                        trace!("  irq_rx: buf {:?} {:?}", buf.as_ptr() as u32, buf.len());
-
-                        // Start UARTE Receive transaction
-                        r.tasks_startrx.write(|w| unsafe { w.bits(1) });
-                    }
-                    break;
-                }
-                RxState::Receiving => {
-                    trace!("  irq_rx: in state receiving");
-                    if r.events_endrx.read().bits() != 0 {
-                        self.timer.stop();
-
-                        let n: usize = r.rxd.amount.read().amount().bits() as usize;
-                        trace!("  irq_rx: endrx {:?}", n);
-                        self.rx.push(n);
-
-                        r.events_endrx.reset();
-
-                        self.rx_waker.wake();
-                        self.rx_state = RxState::Idle;
-                    } else {
-                        break;
-                    }
-                }
-            }
-        }
-
-        loop {
-            match self.tx_state {
-                TxState::Idle => {
-                    trace!("  irq_tx: in state Idle");
-                    let buf = self.tx.pop_buf();
-                    if !buf.is_empty() {
-                        trace!("  irq_tx: starting {:?}", buf.len());
-                        self.tx_state = TxState::Transmitting(buf.len());
-
-                        // Set up the DMA write
-                        r.txd.ptr.write(|w|
-                            // The PTR field is a full 32 bits wide and accepts the full range
-                            // of values.
-                            unsafe { w.ptr().bits(buf.as_ptr() as u32) });
-                        r.txd.maxcnt.write(|w|
-                            // We're giving it the length of the buffer, so no danger of
-                            // accessing invalid memory. We have verified that the length of the
-                            // buffer fits in an `u8`, so the cast to `u8` is also fine.
-                            //
-                            // The MAXCNT field is 8 bits wide and accepts the full range of
-                            // values.
-                            unsafe { w.maxcnt().bits(buf.len() as _) });
-
-                        // Start UARTE Transmit transaction
-                        r.tasks_starttx.write(|w| unsafe { w.bits(1) });
-                    }
-                    break;
-                }
-                TxState::Transmitting(n) => {
-                    trace!("  irq_tx: in state Transmitting");
-                    if r.events_endtx.read().bits() != 0 {
-                        r.events_endtx.reset();
-
-                        trace!("  irq_tx: endtx {:?}", n);
-                        self.tx.pop(n);
-                        self.tx_waker.wake();
-                        self.tx_state = TxState::Idle;
-                    } else {
-                        break;
-                    }
-                }
-            }
-        }
-        trace!("irq: end");
+
+        let s = U::buffered_state();
+        unsafe {
+            s.rx_buf.deinit();
+            s.tx_buf.deinit();
+        }
     }
 }
diff --git a/embassy-nrf/src/uarte.rs b/embassy-nrf/src/uarte.rs
index 48457744b..00afbd059 100644
--- a/embassy-nrf/src/uarte.rs
+++ b/embassy-nrf/src/uarte.rs
@@ -883,6 +883,7 @@ pub(crate) mod sealed {
     pub trait Instance {
         fn regs() -> &'static pac::uarte0::RegisterBlock;
         fn state() -> &'static State;
+        fn buffered_state() -> &'static crate::buffered_uarte::State;
     }
 }
 
@@ -902,6 +903,10 @@ macro_rules! impl_uarte {
                 static STATE: crate::uarte::sealed::State = crate::uarte::sealed::State::new();
                 &STATE
             }
+            fn buffered_state() -> &'static crate::buffered_uarte::State {
+                static STATE: crate::buffered_uarte::State = crate::buffered_uarte::State::new();
+                &STATE
+            }
         }
         impl crate::uarte::Instance for peripherals::$type {
             type Interrupt = crate::interrupt::$irq;
diff --git a/examples/nrf52840/src/bin/buffered_uart.rs b/examples/nrf52840/src/bin/buffered_uart.rs
index ea566f4b2..584e6b2bc 100644
--- a/examples/nrf52840/src/bin/buffered_uart.rs
+++ b/examples/nrf52840/src/bin/buffered_uart.rs
@@ -4,10 +4,9 @@
 
 use defmt::*;
 use embassy_executor::Spawner;
-use embassy_nrf::buffered_uarte::{BufferedUarte, State};
+use embassy_nrf::buffered_uarte::BufferedUarte;
 use embassy_nrf::{interrupt, uarte};
 use embedded_io::asynch::{BufRead, Write};
-use futures::pin_mut;
 use {defmt_rtt as _, panic_probe as _};
 
 #[embassy_executor::main]
@@ -21,24 +20,19 @@ async fn main(_spawner: Spawner) {
     let mut rx_buffer = [0u8; 4096];
 
     let irq = interrupt::take!(UARTE0_UART0);
-    let mut state = State::new();
-    // Please note - important to have hardware flow control (https://github.com/embassy-rs/embassy/issues/536)
-    let u = BufferedUarte::new(
-        &mut state,
+    let mut u = BufferedUarte::new(
         p.UARTE0,
         p.TIMER0,
         p.PPI_CH0,
         p.PPI_CH1,
+        p.PPI_GROUP0,
         irq,
         p.P0_08,
         p.P0_06,
-        p.P0_07,
-        p.P0_05,
         config,
         &mut rx_buffer,
         &mut tx_buffer,
     );
-    pin_mut!(u);
 
     info!("uarte initialized!");