Provides AsyncWrite with flush
As per Tokio and others, this commit provides a `poll_flush` method on `AsyncWrite` so that a best-effort attempt at wakening once all bytes are flushed can be made.
This commit is contained in:
parent
60b7c50d8b
commit
7256ff3e71
7 changed files with 100 additions and 0 deletions
|
@ -125,5 +125,12 @@ mod tests {
|
||||||
let buf = rb.pop_buf();
|
let buf = rb.pop_buf();
|
||||||
assert_eq!(1, buf.len());
|
assert_eq!(1, buf.len());
|
||||||
assert_eq!(4, buf[0]);
|
assert_eq!(4, buf[0]);
|
||||||
|
rb.pop(1);
|
||||||
|
|
||||||
|
let buf = rb.pop_buf();
|
||||||
|
assert_eq!(0, buf.len());
|
||||||
|
|
||||||
|
let buf = rb.push_buf();
|
||||||
|
assert_eq!(4, buf.len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,6 +106,17 @@ where
|
||||||
serial.poll_write(cx, buf)
|
serial.poll_write(cx, buf)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
let this = self.get_mut();
|
||||||
|
let mut mutex = this.inner.borrow_mut();
|
||||||
|
mutex.with(|state| {
|
||||||
|
let serial = state.classes.get_serial();
|
||||||
|
let serial = Pin::new(serial);
|
||||||
|
|
||||||
|
serial.poll_flush(cx)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct UsbSerial<'bus, 'a, B: UsbBus> {
|
pub struct UsbSerial<'bus, 'a, B: UsbBus> {
|
||||||
|
@ -167,6 +178,10 @@ impl<'bus, 'a, B: UsbBus> AsyncWrite for UsbSerial<'bus, 'a, B> {
|
||||||
this.flush_write();
|
this.flush_write();
|
||||||
Poll::Ready(Ok(count))
|
Poll::Ready(Ok(count))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Keeps track of the type of the last written packet.
|
/// Keeps track of the type of the last written packet.
|
||||||
|
|
|
@ -266,6 +266,20 @@ impl<'d, U: UarteInstance, T: TimerInstance> AsyncWrite for BufferedUarte<'d, U,
|
||||||
|
|
||||||
poll
|
poll
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<embassy::io::Result<()>> {
|
||||||
|
self.inner.with(|state| {
|
||||||
|
trace!("poll_flush");
|
||||||
|
|
||||||
|
if !state.tx.is_empty() {
|
||||||
|
trace!("poll_flush: pending");
|
||||||
|
state.tx_waker.register(cx.waker());
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> {
|
impl<'a, U: UarteInstance, T: TimerInstance> Drop for StateInner<'a, U, T> {
|
||||||
|
|
|
@ -89,6 +89,15 @@ pub trait AsyncWrite {
|
||||||
/// `poll_write` must try to make progress by flushing the underlying object if
|
/// `poll_write` must try to make progress by flushing the underlying object if
|
||||||
/// that is the only way the underlying object can become writable again.
|
/// that is the only way the underlying object can become writable again.
|
||||||
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>;
|
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>;
|
||||||
|
|
||||||
|
/// Attempt to flush the object, ensuring that any buffered data reach their destination.
|
||||||
|
///
|
||||||
|
/// On success, returns Poll::Ready(Ok(())).
|
||||||
|
///
|
||||||
|
/// If flushing cannot immediately complete, this method returns [Poll::Pending] and arranges for the
|
||||||
|
/// current task (via cx.waker()) to receive a notification when the object can make progress
|
||||||
|
/// towards flushing.
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! defer_async_read {
|
macro_rules! defer_async_read {
|
||||||
|
@ -135,6 +144,10 @@ macro_rules! deref_async_write {
|
||||||
) -> Poll<Result<usize>> {
|
) -> Poll<Result<usize>> {
|
||||||
Pin::new(&mut **self).poll_write(cx, buf)
|
Pin::new(&mut **self).poll_write(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||||
|
Pin::new(&mut **self).poll_flush(cx)
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,4 +168,8 @@ where
|
||||||
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
|
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
|
||||||
self.get_mut().as_mut().poll_write(cx, buf)
|
self.get_mut().as_mut().poll_write(cx, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||||
|
self.get_mut().as_mut().poll_flush(cx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
32
embassy/src/io/util/flush.rs
Normal file
32
embassy/src/io/util/flush.rs
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
use core::pin::Pin;
|
||||||
|
use futures::future::Future;
|
||||||
|
use futures::ready;
|
||||||
|
use futures::task::{Context, Poll};
|
||||||
|
|
||||||
|
use super::super::error::Result;
|
||||||
|
use super::super::traits::AsyncWrite;
|
||||||
|
|
||||||
|
/// Future for the [`flush`](super::AsyncWriteExt::flush) method.
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||||
|
pub struct Flush<'a, W: ?Sized> {
|
||||||
|
writer: &'a mut W,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {}
|
||||||
|
|
||||||
|
impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> {
|
||||||
|
pub(super) fn new(writer: &'a mut W) -> Self {
|
||||||
|
Flush { writer }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W: AsyncWrite + ?Sized + Unpin> Future for Flush<'_, W> {
|
||||||
|
type Output = Result<()>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||||
|
let this = &mut *self;
|
||||||
|
let _ = ready!(Pin::new(&mut this.writer).poll_flush(cx))?;
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,6 +27,9 @@ pub use self::skip_while::SkipWhile;
|
||||||
mod drain;
|
mod drain;
|
||||||
pub use self::drain::Drain;
|
pub use self::drain::Drain;
|
||||||
|
|
||||||
|
mod flush;
|
||||||
|
pub use self::flush::Flush;
|
||||||
|
|
||||||
mod write;
|
mod write;
|
||||||
pub use self::write::Write;
|
pub use self::write::Write;
|
||||||
|
|
||||||
|
@ -160,6 +163,15 @@ pub trait AsyncWriteExt: AsyncWrite {
|
||||||
{
|
{
|
||||||
Write::new(self, buf)
|
Write::new(self, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Awaits until all bytes have actually been written, and
|
||||||
|
/// not just enqueued as per the other "write" methods.
|
||||||
|
fn flush<'a>(&mut self) -> Flush<Self>
|
||||||
|
where
|
||||||
|
Self: Unpin,
|
||||||
|
{
|
||||||
|
Flush::new(self)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {}
|
impl<R: AsyncWrite + ?Sized> AsyncWriteExt for R {}
|
||||||
|
|
|
@ -61,5 +61,8 @@ async fn main(_spawner: Spawner, p: Peripherals) {
|
||||||
info!("writing...");
|
info!("writing...");
|
||||||
unwrap!(u.write_all(&buf).await);
|
unwrap!(u.write_all(&buf).await);
|
||||||
info!("write done");
|
info!("write done");
|
||||||
|
|
||||||
|
// Wait until the bytes are actually finished being transmitted
|
||||||
|
unwrap!(u.flush().await);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue