embassy/channel: use heapless::Deque.

This commit is contained in:
Dario Nieuwenhuis 2021-09-11 02:49:16 +02:00
parent 67fa6b06fa
commit b78f4695c4
2 changed files with 22 additions and 50 deletions

View file

@ -42,6 +42,7 @@ embassy-traits = { version = "0.1.0", path = "../embassy-traits"}
atomic-polyfill = "0.1.3" atomic-polyfill = "0.1.3"
critical-section = "0.2.1" critical-section = "0.2.1"
embedded-hal = "0.2.6" embedded-hal = "0.2.6"
heapless = "0.7.5"
[dev-dependencies] [dev-dependencies]
embassy = { path = ".", features = ["executor-agnostic"] } embassy = { path = ".", features = ["executor-agnostic"] }

View file

@ -40,14 +40,13 @@
use core::cell::UnsafeCell; use core::cell::UnsafeCell;
use core::fmt; use core::fmt;
use core::marker::PhantomData; use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::pin::Pin; use core::pin::Pin;
use core::ptr;
use core::task::Context; use core::task::Context;
use core::task::Poll; use core::task::Poll;
use core::task::Waker; use core::task::Waker;
use futures::Future; use futures::Future;
use heapless::Deque;
use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex}; use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex};
use crate::waitqueue::WakerRegistration; use crate::waitqueue::WakerRegistration;
@ -446,10 +445,7 @@ impl<T> defmt::Format for TrySendError<T> {
} }
struct ChannelState<T, const N: usize> { struct ChannelState<T, const N: usize> {
buf: [MaybeUninit<UnsafeCell<T>>; N], queue: Deque<T, N>,
read_pos: usize,
write_pos: usize,
full: bool,
closed: bool, closed: bool,
receiver_registered: bool, receiver_registered: bool,
senders_registered: u32, senders_registered: u32,
@ -458,14 +454,9 @@ struct ChannelState<T, const N: usize> {
} }
impl<T, const N: usize> ChannelState<T, N> { impl<T, const N: usize> ChannelState<T, N> {
const INIT: MaybeUninit<UnsafeCell<T>> = MaybeUninit::uninit();
const fn new() -> Self { const fn new() -> Self {
ChannelState { ChannelState {
buf: [Self::INIT; N], queue: Deque::new(),
read_pos: 0,
write_pos: 0,
full: false,
closed: false, closed: false,
receiver_registered: false, receiver_registered: false,
senders_registered: 0, senders_registered: 0,
@ -479,17 +470,16 @@ impl<T, const N: usize> ChannelState<T, N> {
} }
fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> { fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
if self.read_pos != self.write_pos || self.full { if self.queue.is_full() {
if self.full {
self.full = false;
self.senders_waker.wake(); self.senders_waker.wake();
} }
let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() };
self.read_pos = (self.read_pos + 1) % self.buf.len(); if let Some(message) = self.queue.pop_front() {
Ok(message) Ok(message)
} else if !self.closed { } else if !self.closed {
cx.into_iter() if let Some(cx) = cx {
.for_each(|cx| self.set_receiver_waker(&cx.waker())); self.set_receiver_waker(cx.waker());
}
Err(TryRecvError::Empty) Err(TryRecvError::Empty)
} else { } else {
Err(TryRecvError::Closed) Err(TryRecvError::Closed)
@ -505,22 +495,21 @@ impl<T, const N: usize> ChannelState<T, N> {
message: T, message: T,
cx: Option<&mut Context<'_>>, cx: Option<&mut Context<'_>>,
) -> Result<(), TrySendError<T>> { ) -> Result<(), TrySendError<T>> {
if !self.closed { if self.closed {
if !self.full { return Err(TrySendError::Closed(message));
self.buf[self.write_pos] = MaybeUninit::new(message.into());
self.write_pos = (self.write_pos + 1) % self.buf.len();
if self.write_pos == self.read_pos {
self.full = true;
} }
match self.queue.push_back(message) {
Ok(()) => {
self.receiver_waker.wake(); self.receiver_waker.wake();
Ok(()) Ok(())
} else { }
Err(message) => {
cx.into_iter() cx.into_iter()
.for_each(|cx| self.set_senders_waker(&cx.waker())); .for_each(|cx| self.set_senders_waker(&cx.waker()));
Err(TrySendError::Full(message)) Err(TrySendError::Full(message))
} }
} else {
Err(TrySendError::Closed(message))
} }
} }
@ -585,16 +574,6 @@ impl<T, const N: usize> ChannelState<T, N> {
} }
} }
impl<T, const N: usize> Drop for ChannelState<T, N> {
fn drop(&mut self) {
while self.read_pos != self.write_pos || self.full {
self.full = false;
unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) };
self.read_pos = (self.read_pos + 1) % N;
}
}
}
/// A a bounded mpsc channel for communicating between asynchronous tasks /// A a bounded mpsc channel for communicating between asynchronous tasks
/// with backpressure. /// with backpressure.
/// ///
@ -676,15 +655,7 @@ mod tests {
use super::*; use super::*;
fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize { fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
if !c.full { c.queue.capacity() - c.queue.len()
if c.write_pos > c.read_pos {
(c.buf.len() - c.write_pos) + c.read_pos
} else {
(c.buf.len() - c.read_pos) + c.write_pos
}
} else {
0
}
} }
#[test] #[test]