sync/pipe: impl BufRead.

This commit is contained in:
Dario Nieuwenhuis 2023-08-28 01:53:15 +02:00
parent 88146eb53e
commit 6c165f8dc0
3 changed files with 233 additions and 133 deletions

View file

@ -1,7 +1,8 @@
//! Async byte stream pipe. //! Async byte stream pipe.
use core::cell::RefCell; use core::cell::{RefCell, UnsafeCell};
use core::future::Future; use core::future::Future;
use core::ops::Range;
use core::pin::Pin; use core::pin::Pin;
use core::task::{Context, Poll}; use core::task::{Context, Poll};
@ -82,17 +83,6 @@ where
pipe: &'p Pipe<M, N>, pipe: &'p Pipe<M, N>,
} }
impl<'p, M, const N: usize> Clone for Reader<'p, M, N>
where
M: RawMutex,
{
fn clone(&self) -> Self {
Reader { pipe: self.pipe }
}
}
impl<'p, M, const N: usize> Copy for Reader<'p, M, N> where M: RawMutex {}
impl<'p, M, const N: usize> Reader<'p, M, N> impl<'p, M, const N: usize> Reader<'p, M, N>
where where
M: RawMutex, M: RawMutex,
@ -110,6 +100,29 @@ where
pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
self.pipe.try_read(buf) self.pipe.try_read(buf)
} }
/// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
///
/// If no bytes are currently available to read, this function waits until at least one byte is available.
///
/// If the reader is at end-of-file (EOF), an empty slice is returned.
pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> {
FillBufFuture { pipe: Some(self.pipe) }
}
/// Try returning contents of the internal buffer.
///
/// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
///
/// If the reader is at end-of-file (EOF), an empty slice is returned.
pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
unsafe { self.pipe.try_fill_buf_with_context(None) }
}
/// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
pub fn consume(&mut self, amt: usize) {
self.pipe.consume(amt)
}
} }
/// Future returned by [`Pipe::read`] and [`Reader::read`]. /// Future returned by [`Pipe::read`] and [`Reader::read`].
@ -138,6 +151,35 @@ where
impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
/// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`].
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FillBufFuture<'p, M, const N: usize>
where
M: RawMutex,
{
pipe: Option<&'p Pipe<M, N>>,
}
impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N>
where
M: RawMutex,
{
type Output = &'p [u8];
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pipe = self.pipe.take().unwrap();
match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
Ok(buf) => Poll::Ready(buf),
Err(TryReadError::Empty) => {
self.pipe = Some(pipe);
Poll::Pending
}
}
}
}
impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {}
/// Error returned by [`try_read`](Pipe::try_read). /// Error returned by [`try_read`](Pipe::try_read).
#[derive(PartialEq, Eq, Clone, Copy, Debug)] #[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))] #[cfg_attr(feature = "defmt", derive(defmt::Format))]
@ -162,67 +204,24 @@ struct PipeState<const N: usize> {
write_waker: WakerRegistration, write_waker: WakerRegistration,
} }
impl<const N: usize> PipeState<N> { #[repr(transparent)]
const fn new() -> Self { struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
PipeState {
buffer: RingBuffer::new(), impl<const N: usize> Buffer<N> {
read_waker: WakerRegistration::new(), unsafe fn get<'a>(&self, r: Range<usize>) -> &'a [u8] {
write_waker: WakerRegistration::new(), let p = self.0.get() as *const u8;
} core::slice::from_raw_parts(p.add(r.start), r.end - r.start)
} }
fn clear(&mut self) { unsafe fn get_mut<'a>(&self, r: Range<usize>) -> &'a mut [u8] {
self.buffer.clear(); let p = self.0.get() as *mut u8;
self.write_waker.wake(); core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start)
}
fn try_read(&mut self, buf: &mut [u8]) -> Result<usize, TryReadError> {
self.try_read_with_context(None, buf)
}
fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
if self.buffer.is_full() {
self.write_waker.wake();
}
let available = self.buffer.pop_buf();
if available.is_empty() {
if let Some(cx) = cx {
self.read_waker.register(cx.waker());
}
return Err(TryReadError::Empty);
}
let n = available.len().min(buf.len());
buf[..n].copy_from_slice(&available[..n]);
self.buffer.pop(n);
Ok(n)
}
fn try_write(&mut self, buf: &[u8]) -> Result<usize, TryWriteError> {
self.try_write_with_context(None, buf)
}
fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
if self.buffer.is_empty() {
self.read_waker.wake();
}
let available = self.buffer.push_buf();
if available.is_empty() {
if let Some(cx) = cx {
self.write_waker.register(cx.waker());
}
return Err(TryWriteError::Full);
}
let n = available.len().min(buf.len());
available[..n].copy_from_slice(&buf[..n]);
self.buffer.push(n);
Ok(n)
} }
} }
unsafe impl<const N: usize> Send for Buffer<N> {}
unsafe impl<const N: usize> Sync for Buffer<N> {}
/// A bounded byte-oriented pipe for communicating between asynchronous tasks /// A bounded byte-oriented pipe for communicating between asynchronous tasks
/// with backpressure. /// with backpressure.
/// ///
@ -234,6 +233,7 @@ pub struct Pipe<M, const N: usize>
where where
M: RawMutex, M: RawMutex,
{ {
buf: Buffer<N>,
inner: Mutex<M, RefCell<PipeState<N>>>, inner: Mutex<M, RefCell<PipeState<N>>>,
} }
@ -252,7 +252,12 @@ where
/// ``` /// ```
pub const fn new() -> Self { pub const fn new() -> Self {
Self { Self {
inner: Mutex::new(RefCell::new(PipeState::new())), buf: Buffer(UnsafeCell::new([0; N])),
inner: Mutex::new(RefCell::new(PipeState {
buffer: RingBuffer::new(),
read_waker: WakerRegistration::new(),
write_waker: WakerRegistration::new(),
})),
} }
} }
@ -261,21 +266,91 @@ where
} }
fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> { fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
self.lock(|c| c.try_read_with_context(cx, buf)) self.inner.lock(|rc: &RefCell<PipeState<N>>| {
let s = &mut *rc.borrow_mut();
if s.buffer.is_full() {
s.write_waker.wake();
}
let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
if available.is_empty() {
if let Some(cx) = cx {
s.read_waker.register(cx.waker());
}
return Err(TryReadError::Empty);
}
let n = available.len().min(buf.len());
buf[..n].copy_from_slice(&available[..n]);
s.buffer.pop(n);
Ok(n)
})
}
// safety: While the returned slice is alive,
// no `read` or `consume` methods in the pipe must be called.
unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
self.inner.lock(|rc: &RefCell<PipeState<N>>| {
let s = &mut *rc.borrow_mut();
if s.buffer.is_full() {
s.write_waker.wake();
}
let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
if available.is_empty() {
if let Some(cx) = cx {
s.read_waker.register(cx.waker());
}
return Err(TryReadError::Empty);
}
Ok(available)
})
}
fn consume(&self, amt: usize) {
self.inner.lock(|rc: &RefCell<PipeState<N>>| {
let s = &mut *rc.borrow_mut();
let available = s.buffer.pop_buf();
assert!(amt <= available.len());
s.buffer.pop(amt);
})
} }
fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> { fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
self.lock(|c| c.try_write_with_context(cx, buf)) self.inner.lock(|rc: &RefCell<PipeState<N>>| {
let s = &mut *rc.borrow_mut();
if s.buffer.is_empty() {
s.read_waker.wake();
}
let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) };
if available.is_empty() {
if let Some(cx) = cx {
s.write_waker.register(cx.waker());
}
return Err(TryWriteError::Full);
}
let n = available.len().min(buf.len());
available[..n].copy_from_slice(&buf[..n]);
s.buffer.push(n);
Ok(n)
})
} }
/// Get a writer for this pipe. /// Split this pipe into a BufRead-capable reader and a writer.
pub fn writer(&self) -> Writer<'_, M, N> { ///
Writer { pipe: self } /// The reader and writer borrow the current pipe mutably, so it is not
} /// possible to use it directly while they exist. This is needed because
/// implementing `BufRead` requires there is a single reader.
/// Get a reader for this pipe. ///
pub fn reader(&self) -> Reader<'_, M, N> { /// The writer is cloneable, the reader is not.
Reader { pipe: self } pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) {
(Reader { pipe: self }, Writer { pipe: self })
} }
/// Write some bytes to the pipe. /// Write some bytes to the pipe.
@ -312,7 +387,7 @@ where
/// or return an error if the pipe is empty. See [`write`](Self::write) for a variant /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant
/// that waits instead of returning an error. /// that waits instead of returning an error.
pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> { pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
self.lock(|c| c.try_write(buf)) self.try_write_with_context(None, buf)
} }
/// Read some bytes from the pipe. /// Read some bytes from the pipe.
@ -339,12 +414,17 @@ where
/// or return an error if the pipe is empty. See [`read`](Self::read) for a variant /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant
/// that waits instead of returning an error. /// that waits instead of returning an error.
pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> { pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
self.lock(|c| c.try_read(buf)) self.try_read_with_context(None, buf)
} }
/// Clear the data in the pipe's buffer. /// Clear the data in the pipe's buffer.
pub fn clear(&self) { pub fn clear(&self) {
self.lock(|c| c.clear()) self.inner.lock(|rc: &RefCell<PipeState<N>>| {
let s = &mut *rc.borrow_mut();
s.buffer.clear();
s.write_waker.wake();
})
} }
/// Return whether the pipe is full (no free space in the buffer) /// Return whether the pipe is full (no free space in the buffer)
@ -433,6 +513,16 @@ mod io_impls {
} }
} }
impl<M: RawMutex, const N: usize> embedded_io_async::BufRead for Reader<'_, M, N> {
async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
Ok(Reader::fill_buf(self).await)
}
fn consume(&mut self, amt: usize) {
Reader::consume(self, amt)
}
}
impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> { impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> {
type Error = Infallible; type Error = Infallible;
} }
@ -457,43 +547,39 @@ mod tests {
use super::*; use super::*;
use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
fn capacity<const N: usize>(c: &PipeState<N>) -> usize {
N - c.buffer.len()
}
#[test] #[test]
fn writing_once() { fn writing_once() {
let mut c = PipeState::<3>::new(); let c = Pipe::<NoopRawMutex, 3>::new();
assert!(c.try_write(&[1]).is_ok()); assert!(c.try_write(&[1]).is_ok());
assert_eq!(capacity(&c), 2); assert_eq!(c.free_capacity(), 2);
} }
#[test] #[test]
fn writing_when_full() { fn writing_when_full() {
let mut c = PipeState::<3>::new(); let c = Pipe::<NoopRawMutex, 3>::new();
assert_eq!(c.try_write(&[42]), Ok(1)); assert_eq!(c.try_write(&[42]), Ok(1));
assert_eq!(c.try_write(&[43]), Ok(1)); assert_eq!(c.try_write(&[43]), Ok(1));
assert_eq!(c.try_write(&[44]), Ok(1)); assert_eq!(c.try_write(&[44]), Ok(1));
assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
assert_eq!(capacity(&c), 0); assert_eq!(c.free_capacity(), 0);
} }
#[test] #[test]
fn receiving_once_with_one_send() { fn receiving_once_with_one_send() {
let mut c = PipeState::<3>::new(); let c = Pipe::<NoopRawMutex, 3>::new();
assert!(c.try_write(&[42]).is_ok()); assert!(c.try_write(&[42]).is_ok());
let mut buf = [0; 16]; let mut buf = [0; 16];
assert_eq!(c.try_read(&mut buf), Ok(1)); assert_eq!(c.try_read(&mut buf), Ok(1));
assert_eq!(buf[0], 42); assert_eq!(buf[0], 42);
assert_eq!(capacity(&c), 3); assert_eq!(c.free_capacity(), 3);
} }
#[test] #[test]
fn receiving_when_empty() { fn receiving_when_empty() {
let mut c = PipeState::<3>::new(); let c = Pipe::<NoopRawMutex, 3>::new();
let mut buf = [0; 16]; let mut buf = [0; 16];
assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
assert_eq!(capacity(&c), 3); assert_eq!(c.free_capacity(), 3);
} }
#[test] #[test]
@ -506,13 +592,37 @@ mod tests {
} }
#[test] #[test]
fn cloning() { fn read_buf() {
let c = Pipe::<NoopRawMutex, 3>::new(); let mut c = Pipe::<NoopRawMutex, 3>::new();
let r1 = c.reader(); let (mut r, w) = c.split();
let w1 = c.writer(); assert!(w.try_write(&[42, 43]).is_ok());
let buf = r.try_fill_buf().unwrap();
assert_eq!(buf, &[42, 43]);
let buf = r.try_fill_buf().unwrap();
assert_eq!(buf, &[42, 43]);
r.consume(1);
let buf = r.try_fill_buf().unwrap();
assert_eq!(buf, &[43]);
r.consume(1);
assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
assert_eq!(w.try_write(&[45, 46]), Ok(2));
let buf = r.try_fill_buf().unwrap();
assert_eq!(buf, &[44]); // only one byte due to wraparound.
r.consume(1);
let buf = r.try_fill_buf().unwrap();
assert_eq!(buf, &[45, 46]);
assert!(w.try_write(&[47]).is_ok());
let buf = r.try_fill_buf().unwrap();
assert_eq!(buf, &[45, 46, 47]);
r.consume(3);
}
let _ = r1.clone(); #[test]
let _ = w1.clone(); fn writer_is_cloneable() {
let mut c = Pipe::<NoopRawMutex, 3>::new();
let (_r, w) = c.split();
let _ = w.clone();
} }
#[futures_test::test] #[futures_test::test]

View file

@ -1,5 +1,6 @@
use core::ops::Range;
pub struct RingBuffer<const N: usize> { pub struct RingBuffer<const N: usize> {
buf: [u8; N],
start: usize, start: usize,
end: usize, end: usize,
empty: bool, empty: bool,
@ -8,27 +9,26 @@ pub struct RingBuffer<const N: usize> {
impl<const N: usize> RingBuffer<N> { impl<const N: usize> RingBuffer<N> {
pub const fn new() -> Self { pub const fn new() -> Self {
Self { Self {
buf: [0; N],
start: 0, start: 0,
end: 0, end: 0,
empty: true, empty: true,
} }
} }
pub fn push_buf(&mut self) -> &mut [u8] { pub fn push_buf(&mut self) -> Range<usize> {
if self.start == self.end && !self.empty { if self.start == self.end && !self.empty {
trace!(" ringbuf: push_buf empty"); trace!(" ringbuf: push_buf empty");
return &mut self.buf[..0]; return 0..0;
} }
let n = if self.start <= self.end { let n = if self.start <= self.end {
self.buf.len() - self.end N - self.end
} else { } else {
self.start - self.end self.start - self.end
}; };
trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n);
&mut self.buf[self.end..self.end + n] self.end..self.end + n
} }
pub fn push(&mut self, n: usize) { pub fn push(&mut self, n: usize) {
@ -41,20 +41,20 @@ impl<const N: usize> RingBuffer<N> {
self.empty = false; self.empty = false;
} }
pub fn pop_buf(&mut self) -> &mut [u8] { pub fn pop_buf(&mut self) -> Range<usize> {
if self.empty { if self.empty {
trace!(" ringbuf: pop_buf empty"); trace!(" ringbuf: pop_buf empty");
return &mut self.buf[..0]; return 0..0;
} }
let n = if self.end <= self.start { let n = if self.end <= self.start {
self.buf.len() - self.start N - self.start
} else { } else {
self.end - self.start self.end - self.start
}; };
trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n);
&mut self.buf[self.start..self.start + n] self.start..self.start + n
} }
pub fn pop(&mut self, n: usize) { pub fn pop(&mut self, n: usize) {
@ -93,8 +93,8 @@ impl<const N: usize> RingBuffer<N> {
} }
fn wrap(&self, n: usize) -> usize { fn wrap(&self, n: usize) -> usize {
assert!(n <= self.buf.len()); assert!(n <= N);
if n == self.buf.len() { if n == N {
0 0
} else { } else {
n n
@ -110,37 +110,29 @@ mod tests {
fn push_pop() { fn push_pop() {
let mut rb: RingBuffer<4> = RingBuffer::new(); let mut rb: RingBuffer<4> = RingBuffer::new();
let buf = rb.push_buf(); let buf = rb.push_buf();
assert_eq!(4, buf.len()); assert_eq!(0..4, buf);
buf[0] = 1;
buf[1] = 2;
buf[2] = 3;
buf[3] = 4;
rb.push(4); rb.push(4);
let buf = rb.pop_buf(); let buf = rb.pop_buf();
assert_eq!(4, buf.len()); assert_eq!(0..4, buf);
assert_eq!(1, buf[0]);
rb.pop(1); rb.pop(1);
let buf = rb.pop_buf(); let buf = rb.pop_buf();
assert_eq!(3, buf.len()); assert_eq!(1..4, buf);
assert_eq!(2, buf[0]);
rb.pop(1); rb.pop(1);
let buf = rb.pop_buf(); let buf = rb.pop_buf();
assert_eq!(2, buf.len()); assert_eq!(2..4, buf);
assert_eq!(3, buf[0]);
rb.pop(1); rb.pop(1);
let buf = rb.pop_buf(); let buf = rb.pop_buf();
assert_eq!(1, buf.len()); assert_eq!(3..4, buf);
assert_eq!(4, buf[0]);
rb.pop(1); rb.pop(1);
let buf = rb.pop_buf(); let buf = rb.pop_buf();
assert_eq!(0, buf.len()); assert_eq!(0..0, buf);
let buf = rb.push_buf(); let buf = rb.push_buf();
assert_eq!(4, buf.len()); assert_eq!(0..4, buf);
} }
} }

View file

@ -91,13 +91,11 @@ async fn main(_spawner: Spawner) {
let (mut uart_tx, mut uart_rx) = uart.split(); let (mut uart_tx, mut uart_rx) = uart.split();
// Pipe setup // Pipe setup
let usb_pipe: Pipe<NoopRawMutex, 20> = Pipe::new(); let mut usb_pipe: Pipe<NoopRawMutex, 20> = Pipe::new();
let mut usb_pipe_writer = usb_pipe.writer(); let (mut usb_pipe_reader, mut usb_pipe_writer) = usb_pipe.split();
let mut usb_pipe_reader = usb_pipe.reader();
let uart_pipe: Pipe<NoopRawMutex, 20> = Pipe::new(); let mut uart_pipe: Pipe<NoopRawMutex, 20> = Pipe::new();
let mut uart_pipe_writer = uart_pipe.writer(); let (mut uart_pipe_reader, mut uart_pipe_writer) = uart_pipe.split();
let mut uart_pipe_reader = uart_pipe.reader();
let (mut usb_tx, mut usb_rx) = class.split(); let (mut usb_tx, mut usb_rx) = class.split();