diff --git a/embassy-stm32-wpan/src/mac/control.rs b/embassy-stm32-wpan/src/mac/control.rs index 6e45e595f..2f8a7d07f 100644 --- a/embassy-stm32-wpan/src/mac/control.rs +++ b/embassy-stm32-wpan/src/mac/control.rs @@ -6,13 +6,13 @@ pub struct Error { } pub struct Control<'a> { - runner: &'a Runner, + runner: &'a Runner<'a>, } impl<'a> Control<'a> { - pub(crate) fn new(runner: &'a Runner) -> Self { - Self { runner: runner } - } + pub(crate) fn new(runner: &'a Runner<'a>) -> Self { + Self { runner: runner } + } pub async fn init(&mut self) { // TODO diff --git a/embassy-stm32-wpan/src/mac/driver.rs b/embassy-stm32-wpan/src/mac/driver.rs index 118f6908f..8ebfb2b72 100644 --- a/embassy-stm32-wpan/src/mac/driver.rs +++ b/embassy-stm32-wpan/src/mac/driver.rs @@ -4,16 +4,20 @@ use core::task::Context; use embassy_net_driver::{Capabilities, LinkState, Medium}; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::channel::Channel; +use super::event::MacEvent; +use crate::mac::event::Event; use crate::mac::runner::Runner; use crate::mac::MTU; pub struct Driver<'d> { - runner: &'d Runner, + runner: &'d Runner<'d>, } impl<'d> Driver<'d> { - pub(crate) fn new(runner: &'d Runner) -> Self { + pub(crate) fn new(runner: &'d Runner<'d>) -> Self { Self { runner: runner } } } @@ -21,33 +25,32 @@ impl<'d> Driver<'d> { impl<'d> embassy_net_driver::Driver for Driver<'d> { // type RxToken<'a> = RxToken<'a, 'd> where Self: 'a; // type TxToken<'a> = TxToken<'a, 'd> where Self: 'a; - type RxToken<'a> = RxToken where Self: 'a; - type TxToken<'a> = TxToken where Self: 'a; + type RxToken<'a> = RxToken<'d> where Self: 'a; + type TxToken<'a> = TxToken<'d> where Self: 'a; fn receive(&mut self, cx: &mut Context) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { - self.runner.rx_waker.register(cx.waker()); - - // WAKER.register(cx.waker()); - // if self.rx.available().is_some() && self.tx.available().is_some() { - // Some((RxToken { rx: &mut self.rx }, TxToken { tx: &mut self.tx })) - // } else { - // None - // } - - None + if self.runner.rx_channel.poll_ready_to_receive(cx) && self.runner.tx_channel.poll_ready_to_receive(cx) { + Some(( + RxToken { + rx: &self.runner.rx_channel, + }, + TxToken { + tx: &self.runner.tx_channel, + }, + )) + } else { + None + } } fn transmit(&mut self, cx: &mut Context) -> Option> { - self.runner.tx_waker.register(cx.waker()); - - // WAKER.register(cx.waker()); - // / if self.tx.available().is_some() { - // / Some(TxToken { tx: &mut self.tx }) - // / } else { - // / None - // / } - - None + if self.runner.tx_channel.poll_ready_to_receive(cx) { + Some(TxToken { + tx: &self.runner.tx_channel, + }) + } else { + None + } } fn capabilities(&self) -> Capabilities { @@ -76,30 +79,38 @@ impl<'d> embassy_net_driver::Driver for Driver<'d> { } } -pub struct RxToken { - // rx: &'a mut RDesRing<'d>, +pub struct RxToken<'d> { + rx: &'d Channel, } -impl embassy_net_driver::RxToken for RxToken { +impl<'d> embassy_net_driver::RxToken for RxToken<'d> { fn consume(self, f: F) -> R where F: FnOnce(&mut [u8]) -> R, { - // NOTE(unwrap): we checked the queue wasn't full when creating the token. - // let pkt = unwrap!(self.rx.available()); + // Only valid data events should be put into the queue + + let event = self.rx.try_recv().unwrap(); + let mac_event = event.mac_event().unwrap(); + let data_event = match mac_event { + MacEvent::McpsDataInd(data_event) => data_event, + _ => unreachable!(), + }; let pkt = &mut []; let r = f(&mut pkt[0..]); - // self.rx.pop_packet(); + + // let r = f(&mut data_event.payload()); r } } -pub struct TxToken { +pub struct TxToken<'d> { + tx: &'d Channel, // tx: &'a mut TDesRing<'d>, } -impl embassy_net_driver::TxToken for TxToken { +impl<'d> embassy_net_driver::TxToken for TxToken<'d> { fn consume(self, len: usize, f: F) -> R where F: FnOnce(&mut [u8]) -> R, diff --git a/embassy-stm32-wpan/src/mac/mod.rs b/embassy-stm32-wpan/src/mac/mod.rs index 2f9d1c81e..3dcda17ae 100644 --- a/embassy-stm32-wpan/src/mac/mod.rs +++ b/embassy-stm32-wpan/src/mac/mod.rs @@ -18,7 +18,7 @@ pub use crate::mac::runner::Runner; const MTU: usize = 127; -pub async fn new<'a>(runner: &'a Runner) -> (Control<'a>, Driver<'a>) { +pub async fn new<'a>(runner: &'a Runner<'a>) -> (Control<'a>, Driver<'a>) { (Control::new(runner), Driver::new(runner)) } diff --git a/embassy-stm32-wpan/src/mac/runner.rs b/embassy-stm32-wpan/src/mac/runner.rs index d545d6c96..911ff60b9 100644 --- a/embassy-stm32-wpan/src/mac/runner.rs +++ b/embassy-stm32-wpan/src/mac/runner.rs @@ -1,4 +1,6 @@ use embassy_futures::select::{select3, Either3}; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::channel::Channel; use embassy_sync::waitqueue::AtomicWaker; use crate::mac::event::{Event, MacEvent}; @@ -44,22 +46,18 @@ impl TxRing { } } -pub struct Runner { +pub struct Runner<'a> { mac_subsystem: Mac, - pub(crate) rx_ring: Option, - pub(crate) tx_ring: TxRing, - pub(crate) rx_waker: AtomicWaker, - pub(crate) tx_waker: AtomicWaker, + pub(crate) rx_channel: Channel, + pub(crate) tx_channel: Channel, } -impl Runner { +impl<'a> Runner<'a> { pub fn new(mac: Mac) -> Self { Self { mac_subsystem: mac, - rx_ring: None, - tx_ring: TxRing::new(), - rx_waker: AtomicWaker::new(), - tx_waker: AtomicWaker::new(), + rx_channel: Channel::new(), + tx_channel: Channel::new(), } } @@ -73,8 +71,7 @@ impl Runner { if let Ok(evt) = event.mac_event() { match evt { MacEvent::McpsDataInd(data_ind) => { - // TODO: store mac_event in rx_ring - self.rx_waker.wake(); + self.rx_channel.try_send(event); } _ => {} } diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index 77352874d..f421af392 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -335,6 +335,12 @@ impl ChannelState { } } + fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> bool { + self.receiver_waker.register(cx.waker()); + + !self.queue.is_empty() + } + fn try_send(&mut self, message: T) -> Result<(), TrySendError> { self.try_send_with_context(message, None) } @@ -353,6 +359,12 @@ impl ChannelState { } } } + + fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> bool { + self.senders_waker.register(cx.waker()); + + !self.queue.is_full() + } } /// A bounded channel for communicating between asynchronous tasks @@ -401,6 +413,16 @@ where self.lock(|c| c.try_send_with_context(m, cx)) } + /// Allows a poll_fn to poll until the channel is ready to receive + pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> bool { + self.lock(|c| c.poll_ready_to_receive(cx)) + } + + /// Allows a poll_fn to poll until the channel is ready to send + pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> bool { + self.lock(|c| c.poll_ready_to_send(cx)) + } + /// Get a sender for this channel. pub fn sender(&self) -> Sender<'_, M, T, N> { Sender { channel: self } diff --git a/examples/stm32wb/src/bin/mac_ffd_net.rs b/examples/stm32wb/src/bin/mac_ffd_net.rs index b1cf051bf..6072f418c 100644 --- a/examples/stm32wb/src/bin/mac_ffd_net.rs +++ b/examples/stm32wb/src/bin/mac_ffd_net.rs @@ -25,7 +25,7 @@ async fn run_mm_queue(memory_manager: mm::MemoryManager) { } #[embassy_executor::task] -async fn run_mac(runner: &'static Runner) { +async fn run_mac(runner: &'static Runner<'static>) { runner.run().await; }