534: Provides AsyncWrite with flush r=huntc a=huntc

As per Tokio and others, this commit provides a `poll_flush` method on `AsyncWrite` so that a best-effort attempt at wakening once all bytes are flushed can be made.

Co-authored-by: huntc <huntchr@gmail.com>
This commit is contained in:
bors[bot] 2021-12-10 04:26:11 +00:00 committed by GitHub
commit dce3f8c47d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 127 additions and 0 deletions

View file

@ -125,5 +125,12 @@ mod tests {
let buf = rb.pop_buf(); let buf = rb.pop_buf();
assert_eq!(1, buf.len()); assert_eq!(1, buf.len());
assert_eq!(4, buf[0]); assert_eq!(4, buf[0]);
rb.pop(1);
let buf = rb.pop_buf();
assert_eq!(0, buf.len());
let buf = rb.push_buf();
assert_eq!(4, buf.len());
} }
} }

View file

@ -106,6 +106,17 @@ where
serial.poll_write(cx, buf) serial.poll_write(cx, buf)
}) })
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
let mut mutex = this.inner.borrow_mut();
mutex.with(|state| {
let serial = state.classes.get_serial();
let serial = Pin::new(serial);
serial.poll_flush(cx)
})
}
} }
pub struct UsbSerial<'bus, 'a, B: UsbBus> { pub struct UsbSerial<'bus, 'a, B: UsbBus> {
@ -167,6 +178,10 @@ impl<'bus, 'a, B: UsbBus> AsyncWrite for UsbSerial<'bus, 'a, B> {
this.flush_write(); this.flush_write();
Poll::Ready(Ok(count)) Poll::Ready(Ok(count))
} }
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
} }
/// Keeps track of the type of the last written packet. /// Keeps track of the type of the last written packet.

View file

@ -200,4 +200,8 @@ impl<'a> AsyncWrite for TcpSocket<'a> {
Err(e) => Poll::Ready(Err(to_ioerr(e))), Err(e) => Poll::Ready(Err(to_ioerr(e))),
}) })
} }
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
}
} }

View file

@ -266,6 +266,20 @@ impl<'d, U: UarteInstance, T: TimerInstance> AsyncWrite for BufferedUarte<'d, U,
poll poll
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<embassy::io::Result<()>> {
self.inner.with(|state| {
trace!("poll_flush");
if !state.tx.is_empty() {
trace!("poll_flush: pending");
state.tx_waker.register(cx.waker());
return Poll::Pending;
}
Poll::Ready(Ok(()))
})
}
} }
impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> { impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> {

View file

@ -449,6 +449,20 @@ mod buffered {
} }
poll poll
} }
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), embassy::io::Error>> {
self.inner.with(|state| {
if !state.tx.is_empty() {
state.tx_waker.register(cx.waker());
return Poll::Pending;
}
Poll::Ready(Ok(()))
})
}
} }
} }

View file

@ -32,4 +32,10 @@ impl<T: std_io::AsyncWrite> AsyncWrite for FromStdIo<T> {
.poll_write(cx, buf) .poll_write(cx, buf)
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let Self(inner) = unsafe { self.get_unchecked_mut() };
unsafe { Pin::new_unchecked(inner) }
.poll_flush(cx)
.map_err(|e| e.into())
}
} }

View file

@ -89,6 +89,15 @@ pub trait AsyncWrite {
/// `poll_write` must try to make progress by flushing the underlying object if /// `poll_write` must try to make progress by flushing the underlying object if
/// that is the only way the underlying object can become writable again. /// that is the only way the underlying object can become writable again.
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>; fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>;
/// Attempt to flush the object, ensuring that any buffered data reach their destination.
///
/// On success, returns Poll::Ready(Ok(())).
///
/// If flushing cannot immediately complete, this method returns [Poll::Pending] and arranges for the
/// current task (via cx.waker()) to receive a notification when the object can make progress
/// towards flushing.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
} }
macro_rules! defer_async_read { macro_rules! defer_async_read {
@ -135,6 +144,10 @@ macro_rules! deref_async_write {
) -> Poll<Result<usize>> { ) -> Poll<Result<usize>> {
Pin::new(&mut **self).poll_write(cx, buf) Pin::new(&mut **self).poll_write(cx, buf)
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut **self).poll_flush(cx)
}
}; };
} }
@ -155,4 +168,8 @@ where
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
self.get_mut().as_mut().poll_write(cx, buf) self.get_mut().as_mut().poll_write(cx, buf)
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.get_mut().as_mut().poll_flush(cx)
}
} }

View file

@ -0,0 +1,32 @@
use core::pin::Pin;
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
use super::super::error::Result;
use super::super::traits::AsyncWrite;
/// Future for the [`flush`](super::AsyncWriteExt::flush) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Flush<'a, W: ?Sized> {
writer: &'a mut W,
}
impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> {
pub(super) fn new(writer: &'a mut W) -> Self {
Flush { writer }
}
}
impl<W: AsyncWrite + ?Sized + Unpin> Future for Flush<'_, W> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = &mut *self;
let _ = ready!(Pin::new(&mut this.writer).poll_flush(cx))?;
Poll::Ready(Ok(()))
}
}

View file

@ -27,6 +27,9 @@ pub use self::skip_while::SkipWhile;
mod drain; mod drain;
pub use self::drain::Drain; pub use self::drain::Drain;
mod flush;
pub use self::flush::Flush;
mod write; mod write;
pub use self::write::Write; pub use self::write::Write;
@ -160,6 +163,15 @@ pub trait AsyncWriteExt: AsyncWrite {
{ {
Write::new(self, buf) Write::new(self, buf)
} }
/// Awaits until all bytes have actually been written, and
/// not just enqueued as per the other "write" methods.
fn flush<'a>(&mut self) -> Flush<Self>
where
Self: Unpin,
{
Flush::new(self)
}
} }
impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {} impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {}

View file

@ -32,6 +32,9 @@ impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
Pin::new(unsafe { &mut *self.handle.get() }).poll_write(cx, buf) Pin::new(unsafe { &mut *self.handle.get() }).poll_write(cx, buf)
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(unsafe { &mut *self.handle.get() }).poll_flush(cx)
}
} }
pub fn split<T: AsyncBufRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) { pub fn split<T: AsyncBufRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {

View file

@ -61,5 +61,8 @@ async fn main(_spawner: Spawner, p: Peripherals) {
info!("writing..."); info!("writing...");
unwrap!(u.write_all(&buf).await); unwrap!(u.write_all(&buf).await);
info!("write done"); info!("write done");
// Wait until the bytes are actually finished being transmitted
unwrap!(u.flush().await);
} }
} }