From dcd0c38109ed6711d91c4bdff42825f25e3ee402 Mon Sep 17 00:00:00 2001 From: huntc <huntchr@gmail.com> Date: Sun, 11 Jul 2021 10:54:35 +1000 Subject: [PATCH] Return a new future each time recv is called --- embassy/src/util/mpsc.rs | 31 ++++++++++++------------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index e54c507c1..8f1bba764 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs @@ -141,7 +141,18 @@ where /// /// [`close`]: Self::close pub async fn recv(&mut self) -> Option<T> { - self.await + futures::future::poll_fn(|cx| self.recv_poll(cx)).await + } + + fn recv_poll(self: &mut Self, cx: &mut Context<'_>) -> Poll<Option<T>> { + match self.try_recv() { + Ok(v) => Poll::Ready(Some(v)), + Err(TryRecvError::Closed) => Poll::Ready(None), + Err(TryRecvError::Empty) => { + self.channel.get().set_receiver_waker(cx.waker().clone()); + Poll::Pending + } + } } /// Attempts to immediately receive a message on this `Receiver` @@ -167,24 +178,6 @@ where } } -impl<'ch, M, T, const N: usize> Future for Receiver<'ch, M, T, N> -where - M: Mutex<Data = ()>, -{ - type Output = Option<T>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match self.try_recv() { - Ok(v) => Poll::Ready(Some(v)), - Err(TryRecvError::Closed) => Poll::Ready(None), - Err(TryRecvError::Empty) => { - self.channel.get().set_receiver_waker(cx.waker().clone()); - Poll::Pending - } - } - } -} - impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N> where M: Mutex<Data = ()>,