Implement Channel::poll_receive(..) -> Poll<T>

This commit is contained in:
Ruben De Smet 2023-08-11 11:30:29 +02:00
parent f9d251cd5c
commit b1ec460b9a
No known key found for this signature in database
GPG key ID: 1AE26A210C14115B
2 changed files with 40 additions and 5 deletions

View file

@ -46,7 +46,7 @@ impl<'d> embassy_net_driver::Driver for Driver<'d> {
}
fn transmit(&mut self, cx: &mut Context) -> Option<Self::TxToken<'_>> {
if self.runner.tx_buf_channel.poll_ready_to_receive(cx) {
if self.runner.tx_buf_channel.poll_ready_to_receive(cx).is_ready() {
Some(TxToken {
tx: &self.runner.tx_channel,
tx_buf: &self.runner.tx_buf_channel,

View file

@ -165,6 +165,13 @@ where
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
self.channel.poll_ready_to_receive(cx)
}
/// Poll the channel for the next item
///
/// See [`Channel::poll_receive()`]
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
self.channel.poll_receive(cx)
}
}
/// Receive-only access to a [`Channel`] without knowing channel size.
@ -201,6 +208,13 @@ impl<'ch, T> DynamicReceiver<'ch, T> {
pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
self.channel.poll_ready_to_receive(cx)
}
/// Poll the channel for the next item
///
/// See [`Channel::poll_receive()`]
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
self.channel.poll_receive(cx)
}
}
impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
@ -228,10 +242,7 @@ where
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
match self.channel.try_recv_with_context(Some(cx)) {
Ok(v) => Poll::Ready(v),
Err(TryRecvError::Empty) => Poll::Pending,
}
self.channel.poll_receive(cx)
}
}
@ -317,6 +328,8 @@ trait DynamicChannel<T> {
fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
}
/// Error returned by [`try_recv`](Channel::try_recv).
@ -370,6 +383,19 @@ impl<T, const N: usize> ChannelState<T, N> {
}
}
fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
if self.queue.is_full() {
self.senders_waker.wake();
}
if let Some(message) = self.queue.pop_front() {
Poll::Ready(message)
} else {
self.receiver_waker.register(cx.waker());
Poll::Pending
}
}
fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
self.receiver_waker.register(cx.waker());
@ -452,6 +478,11 @@ where
self.lock(|c| c.try_recv_with_context(cx))
}
/// Poll the channel for the next message
pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
self.lock(|c| c.poll_receive(cx))
}
fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
self.lock(|c| c.try_send_with_context(m, cx))
}
@ -539,6 +570,10 @@ where
fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
Channel::poll_ready_to_receive(self, cx)
}
fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
Channel::poll_receive(self, cx)
}
}
#[cfg(test)]