net-ppp: nicer processing loop structure that can't deadlock.
This commit is contained in:
parent
aacf14b62a
commit
2303382dfd
1 changed files with 41 additions and 43 deletions
|
@ -8,11 +8,9 @@ mod fmt;
|
||||||
use core::convert::Infallible;
|
use core::convert::Infallible;
|
||||||
use core::mem::MaybeUninit;
|
use core::mem::MaybeUninit;
|
||||||
|
|
||||||
use embassy_futures::select::{select3, Either3};
|
use embassy_futures::select::{select, Either};
|
||||||
use embassy_net_driver_channel as ch;
|
use embassy_net_driver_channel as ch;
|
||||||
use embassy_net_driver_channel::driver::LinkState;
|
use embassy_net_driver_channel::driver::LinkState;
|
||||||
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
|
|
||||||
use embassy_sync::signal::Signal;
|
|
||||||
use embedded_io_async::{BufRead, Write, WriteAllError};
|
use embedded_io_async::{BufRead, Write, WriteAllError};
|
||||||
use ppproto::pppos::{BufferFullError, PPPoS, PPPoSAction};
|
use ppproto::pppos::{BufferFullError, PPPoS, PPPoSAction};
|
||||||
|
|
||||||
|
@ -75,22 +73,50 @@ impl<'d, R: BufRead, W: Write> Runner<'d, R, W> {
|
||||||
let mut rx_buf = [0; 2048];
|
let mut rx_buf = [0; 2048];
|
||||||
let mut tx_buf = [0; 2048];
|
let mut tx_buf = [0; 2048];
|
||||||
|
|
||||||
let poll_signal: Signal<NoopRawMutex, ()> = Signal::new();
|
let mut needs_poll = true;
|
||||||
poll_signal.signal(());
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut poll = false;
|
let rx_fut = async {
|
||||||
match select3(self.r.fill_buf(), tx_chan.tx_buf(), poll_signal.wait()).await {
|
let buf = rx_chan.rx_buf().await;
|
||||||
Either3::First(r) => {
|
let rx_data = match needs_poll {
|
||||||
let data = r.map_err(RunError::Read)?;
|
true => &[][..],
|
||||||
if data.is_empty() {
|
false => match self.r.fill_buf().await {
|
||||||
return Err(RunError::Eof);
|
Ok(rx_data) if rx_data.len() == 0 => return Err(RunError::Eof),
|
||||||
}
|
Ok(rx_data) => rx_data,
|
||||||
let n = ppp.consume(data, &mut rx_buf);
|
Err(e) => return Err(RunError::Read(e)),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
Ok((buf, rx_data))
|
||||||
|
};
|
||||||
|
let tx_fut = tx_chan.tx_buf();
|
||||||
|
match select(rx_fut, tx_fut).await {
|
||||||
|
Either::First(r) => {
|
||||||
|
needs_poll = false;
|
||||||
|
|
||||||
|
let (buf, rx_data) = r?;
|
||||||
|
let n = ppp.consume(rx_data, &mut rx_buf);
|
||||||
self.r.consume(n);
|
self.r.consume(n);
|
||||||
poll = true;
|
|
||||||
|
match ppp.poll(&mut tx_buf, &mut rx_buf) {
|
||||||
|
PPPoSAction::None => {}
|
||||||
|
PPPoSAction::Received(rg) => {
|
||||||
|
let pkt = &rx_buf[rg];
|
||||||
|
buf[..pkt.len()].copy_from_slice(pkt);
|
||||||
|
rx_chan.rx_done(pkt.len());
|
||||||
}
|
}
|
||||||
Either3::Second(pkt) => {
|
PPPoSAction::Transmit(n) => match self.w.write_all(&tx_buf[..n]).await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(WriteAllError::WriteZero) => return Err(RunError::WriteZero),
|
||||||
|
Err(WriteAllError::Other(e)) => return Err(RunError::Write(e)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
match ppp.status().phase {
|
||||||
|
ppproto::Phase::Open => state_chan.set_link_state(LinkState::Up),
|
||||||
|
_ => state_chan.set_link_state(LinkState::Down),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Either::Second(pkt) => {
|
||||||
match ppp.send(pkt, &mut tx_buf) {
|
match ppp.send(pkt, &mut tx_buf) {
|
||||||
Ok(n) => match self.w.write_all(&tx_buf[..n]).await {
|
Ok(n) => match self.w.write_all(&tx_buf[..n]).await {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
|
@ -101,34 +127,6 @@ impl<'d, R: BufRead, W: Write> Runner<'d, R, W> {
|
||||||
}
|
}
|
||||||
tx_chan.tx_done();
|
tx_chan.tx_done();
|
||||||
}
|
}
|
||||||
Either3::Third(_) => poll = true,
|
|
||||||
}
|
|
||||||
|
|
||||||
if poll {
|
|
||||||
match ppp.poll(&mut tx_buf, &mut rx_buf) {
|
|
||||||
PPPoSAction::None => {}
|
|
||||||
PPPoSAction::Received(rg) => {
|
|
||||||
let pkt = &rx_buf[rg];
|
|
||||||
let buf = rx_chan.rx_buf().await; // TODO: fix possible deadlock
|
|
||||||
buf[..pkt.len()].copy_from_slice(pkt);
|
|
||||||
rx_chan.rx_done(pkt.len());
|
|
||||||
|
|
||||||
poll_signal.signal(());
|
|
||||||
}
|
|
||||||
PPPoSAction::Transmit(n) => {
|
|
||||||
match self.w.write_all(&tx_buf[..n]).await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(WriteAllError::WriteZero) => return Err(RunError::WriteZero),
|
|
||||||
Err(WriteAllError::Other(e)) => return Err(RunError::Write(e)),
|
|
||||||
}
|
|
||||||
poll_signal.signal(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match ppp.status().phase {
|
|
||||||
ppproto::Phase::Open => state_chan.set_link_state(LinkState::Up),
|
|
||||||
_ => state_chan.set_link_state(LinkState::Down),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue