Merge pull request #396 from embassy-rs/channel-fixes
embassy/channel: several improvements
This commit is contained in:
commit
f1c35b40c7
5 changed files with 116 additions and 185 deletions
|
@ -42,6 +42,7 @@ embassy-traits = { version = "0.1.0", path = "../embassy-traits"}
|
||||||
atomic-polyfill = "0.1.3"
|
atomic-polyfill = "0.1.3"
|
||||||
critical-section = "0.2.1"
|
critical-section = "0.2.1"
|
||||||
embedded-hal = "0.2.6"
|
embedded-hal = "0.2.6"
|
||||||
|
heapless = "0.7.5"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
embassy = { path = ".", features = ["executor-agnostic"] }
|
embassy = { path = ".", features = ["executor-agnostic"] }
|
||||||
|
|
19
embassy/src/blocking_mutex/kind.rs
Normal file
19
embassy/src/blocking_mutex/kind.rs
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
use super::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex};
|
||||||
|
|
||||||
|
pub trait MutexKind {
|
||||||
|
type Mutex<T>: Mutex<Data = T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum CriticalSection {}
|
||||||
|
impl MutexKind for CriticalSection {
|
||||||
|
type Mutex<T> = CriticalSectionMutex<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ThreadMode {}
|
||||||
|
impl MutexKind for ThreadMode {
|
||||||
|
type Mutex<T> = ThreadModeMutex<T>;
|
||||||
|
}
|
||||||
|
pub enum Noop {}
|
||||||
|
impl MutexKind for Noop {
|
||||||
|
type Mutex<T> = NoopMutex<T>;
|
||||||
|
}
|
|
@ -1,5 +1,7 @@
|
||||||
//! Blocking mutex (not async)
|
//! Blocking mutex (not async)
|
||||||
|
|
||||||
|
pub mod kind;
|
||||||
|
|
||||||
use core::cell::UnsafeCell;
|
use core::cell::UnsafeCell;
|
||||||
use critical_section::CriticalSection;
|
use critical_section::CriticalSection;
|
||||||
|
|
||||||
|
@ -13,7 +15,7 @@ pub trait Mutex {
|
||||||
fn new(data: Self::Data) -> Self;
|
fn new(data: Self::Data) -> Self;
|
||||||
|
|
||||||
/// Creates a critical section and grants temporary access to the protected data.
|
/// Creates a critical section and grants temporary access to the protected data.
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R;
|
fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A "mutex" based on critical sections
|
/// A "mutex" based on critical sections
|
||||||
|
@ -55,7 +57,7 @@ impl<T> Mutex for CriticalSectionMutex<T> {
|
||||||
Self::new(data)
|
Self::new(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
||||||
critical_section::with(|cs| f(self.borrow(cs)))
|
critical_section::with(|cs| f(self.borrow(cs)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,7 +104,7 @@ impl<T> Mutex for ThreadModeMutex<T> {
|
||||||
Self::new(data)
|
Self::new(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
||||||
f(self.borrow())
|
f(self.borrow())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -155,7 +157,7 @@ impl<T> Mutex for NoopMutex<T> {
|
||||||
Self::new(data)
|
Self::new(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock<R>(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
fn lock<R>(&self, f: impl FnOnce(&Self::Data) -> R) -> R {
|
||||||
f(self.borrow())
|
f(self.borrow())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,19 +37,18 @@
|
||||||
//!
|
//!
|
||||||
//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html
|
//! This channel and its associated types were derived from https://docs.rs/tokio/0.1.22/tokio/sync/mpsc/fn.channel.html
|
||||||
|
|
||||||
use core::cell::UnsafeCell;
|
use core::cell::RefCell;
|
||||||
use core::fmt;
|
use core::fmt;
|
||||||
use core::marker::PhantomData;
|
|
||||||
use core::mem::MaybeUninit;
|
|
||||||
use core::pin::Pin;
|
use core::pin::Pin;
|
||||||
use core::ptr;
|
|
||||||
use core::task::Context;
|
use core::task::Context;
|
||||||
use core::task::Poll;
|
use core::task::Poll;
|
||||||
use core::task::Waker;
|
use core::task::Waker;
|
||||||
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
use heapless::Deque;
|
||||||
|
|
||||||
use crate::blocking_mutex::{CriticalSectionMutex, Mutex, NoopMutex, ThreadModeMutex};
|
use crate::blocking_mutex::kind::MutexKind;
|
||||||
|
use crate::blocking_mutex::Mutex;
|
||||||
use crate::waitqueue::WakerRegistration;
|
use crate::waitqueue::WakerRegistration;
|
||||||
|
|
||||||
/// Send values to the associated `Receiver`.
|
/// Send values to the associated `Receiver`.
|
||||||
|
@ -57,36 +56,19 @@ use crate::waitqueue::WakerRegistration;
|
||||||
/// Instances are created by the [`split`](split) function.
|
/// Instances are created by the [`split`](split) function.
|
||||||
pub struct Sender<'ch, M, T, const N: usize>
|
pub struct Sender<'ch, M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
|
channel: &'ch Channel<M, T, N>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Safe to pass the sender around
|
|
||||||
unsafe impl<'ch, M, T, const N: usize> Send for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
|
|
||||||
{}
|
|
||||||
unsafe impl<'ch, M, T, const N: usize> Sync for Sender<'ch, M, T, N> where M: Mutex<Data = ()> + Sync
|
|
||||||
{}
|
|
||||||
|
|
||||||
/// Receive values from the associated `Sender`.
|
/// Receive values from the associated `Sender`.
|
||||||
///
|
///
|
||||||
/// Instances are created by the [`split`](split) function.
|
/// Instances are created by the [`split`](split) function.
|
||||||
pub struct Receiver<'ch, M, T, const N: usize>
|
pub struct Receiver<'ch, M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
|
||||||
channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
|
|
||||||
_receiver_consumed: &'ch mut PhantomData<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Safe to pass the receiver around
|
|
||||||
unsafe impl<'ch, M, T, const N: usize> Send for Receiver<'ch, M, T, N> where
|
|
||||||
M: Mutex<Data = ()> + Sync
|
|
||||||
{
|
|
||||||
}
|
|
||||||
unsafe impl<'ch, M, T, const N: usize> Sync for Receiver<'ch, M, T, N> where
|
|
||||||
M: Mutex<Data = ()> + Sync
|
|
||||||
{
|
{
|
||||||
|
channel: &'ch Channel<M, T, N>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
|
/// Splits a bounded mpsc channel into a `Sender` and `Receiver`.
|
||||||
|
@ -117,16 +99,11 @@ pub fn split<M, T, const N: usize>(
|
||||||
channel: &mut Channel<M, T, N>,
|
channel: &mut Channel<M, T, N>,
|
||||||
) -> (Sender<M, T, N>, Receiver<M, T, N>)
|
) -> (Sender<M, T, N>, Receiver<M, T, N>)
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
let sender = Sender {
|
let sender = Sender { channel };
|
||||||
channel_cell: &channel.channel_cell,
|
let receiver = Receiver { channel };
|
||||||
};
|
channel.lock(|c| {
|
||||||
let receiver = Receiver {
|
|
||||||
channel_cell: &channel.channel_cell,
|
|
||||||
_receiver_consumed: &mut channel.receiver_consumed,
|
|
||||||
};
|
|
||||||
Channel::lock(&channel.channel_cell, |c| {
|
|
||||||
c.register_receiver();
|
c.register_receiver();
|
||||||
c.register_sender();
|
c.register_sender();
|
||||||
});
|
});
|
||||||
|
@ -135,7 +112,7 @@ where
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
/// Receives the next value for this receiver.
|
/// Receives the next value for this receiver.
|
||||||
///
|
///
|
||||||
|
@ -155,7 +132,7 @@ where
|
||||||
/// [`close`]: Self::close
|
/// [`close`]: Self::close
|
||||||
pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> {
|
pub fn recv<'m>(&'m mut self) -> RecvFuture<'m, M, T, N> {
|
||||||
RecvFuture {
|
RecvFuture {
|
||||||
channel_cell: self.channel_cell,
|
channel: self.channel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +141,7 @@ where
|
||||||
/// This method will either receive a message from the channel immediately or return an error
|
/// This method will either receive a message from the channel immediately or return an error
|
||||||
/// if the channel is empty.
|
/// if the channel is empty.
|
||||||
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
||||||
Channel::lock(self.channel_cell, |c| c.try_recv())
|
self.channel.lock(|c| c.try_recv())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Closes the receiving half of a channel without dropping it.
|
/// Closes the receiving half of a channel without dropping it.
|
||||||
|
@ -178,56 +155,45 @@ where
|
||||||
/// until those are released.
|
/// until those are released.
|
||||||
///
|
///
|
||||||
pub fn close(&mut self) {
|
pub fn close(&mut self) {
|
||||||
Channel::lock(self.channel_cell, |c| c.close())
|
self.channel.lock(|c| c.close())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Drop for Receiver<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
Channel::lock(self.channel_cell, |c| c.deregister_receiver())
|
self.channel.lock(|c| c.deregister_receiver())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RecvFuture<'ch, M, T, const N: usize>
|
pub struct RecvFuture<'ch, M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
channel_cell: &'ch UnsafeCell<ChannelCell<M, T, N>>,
|
channel: &'ch Channel<M, T, N>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Future for RecvFuture<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
type Output = Option<T>;
|
type Output = Option<T>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
||||||
Channel::lock(self.channel_cell, |c| {
|
self.channel
|
||||||
match c.try_recv_with_context(Some(cx)) {
|
.lock(|c| match c.try_recv_with_context(Some(cx)) {
|
||||||
Ok(v) => Poll::Ready(Some(v)),
|
Ok(v) => Poll::Ready(Some(v)),
|
||||||
Err(TryRecvError::Closed) => Poll::Ready(None),
|
Err(TryRecvError::Closed) => Poll::Ready(None),
|
||||||
Err(TryRecvError::Empty) => Poll::Pending,
|
Err(TryRecvError::Empty) => Poll::Pending,
|
||||||
}
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Safe to pass the receive future around since it locks channel whenever polled
|
|
||||||
unsafe impl<'ch, M, T, const N: usize> Send for RecvFuture<'ch, M, T, N> where
|
|
||||||
M: Mutex<Data = ()> + Sync
|
|
||||||
{
|
|
||||||
}
|
|
||||||
unsafe impl<'ch, M, T, const N: usize> Sync for RecvFuture<'ch, M, T, N> where
|
|
||||||
M: Mutex<Data = ()> + Sync
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
/// Sends a value, waiting until there is capacity.
|
/// Sends a value, waiting until there is capacity.
|
||||||
///
|
///
|
||||||
|
@ -249,7 +215,7 @@ where
|
||||||
/// [`Receiver`]: Receiver
|
/// [`Receiver`]: Receiver
|
||||||
pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
|
pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
|
||||||
SendFuture {
|
SendFuture {
|
||||||
sender: self.clone(),
|
channel: self.channel,
|
||||||
message: Some(message),
|
message: Some(message),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,7 +241,7 @@ where
|
||||||
/// [`channel`]: channel
|
/// [`channel`]: channel
|
||||||
/// [`close`]: Receiver::close
|
/// [`close`]: Receiver::close
|
||||||
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
|
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
|
||||||
Channel::lock(self.channel_cell, |c| c.try_send(message))
|
self.channel.lock(|c| c.try_send(message))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Completes when the receiver has dropped.
|
/// Completes when the receiver has dropped.
|
||||||
|
@ -284,7 +250,7 @@ where
|
||||||
/// values is canceled and immediately stop doing work.
|
/// values is canceled and immediately stop doing work.
|
||||||
pub async fn closed(&self) {
|
pub async fn closed(&self) {
|
||||||
CloseFuture {
|
CloseFuture {
|
||||||
sender: self.clone(),
|
channel: self.channel,
|
||||||
}
|
}
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -296,29 +262,27 @@ where
|
||||||
/// [`Receiver`]: crate::sync::mpsc::Receiver
|
/// [`Receiver`]: crate::sync::mpsc::Receiver
|
||||||
/// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
|
/// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
|
||||||
pub fn is_closed(&self) -> bool {
|
pub fn is_closed(&self) -> bool {
|
||||||
Channel::lock(self.channel_cell, |c| c.is_closed())
|
self.channel.lock(|c| c.is_closed())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SendFuture<'ch, M, T, const N: usize>
|
pub struct SendFuture<'ch, M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
sender: Sender<'ch, M, T, N>,
|
channel: &'ch Channel<M, T, N>,
|
||||||
message: Option<T>,
|
message: Option<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
type Output = Result<(), SendError<T>>;
|
type Output = Result<(), SendError<T>>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.message.take() {
|
match self.message.take() {
|
||||||
Some(m) => match Channel::lock(self.sender.channel_cell, |c| {
|
Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
|
||||||
c.try_send_with_context(m, Some(cx))
|
|
||||||
}) {
|
|
||||||
Ok(..) => Poll::Ready(Ok(())),
|
Ok(..) => Poll::Ready(Ok(())),
|
||||||
Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
|
Err(TrySendError::Closed(m)) => Poll::Ready(Err(SendError(m))),
|
||||||
Err(TrySendError::Full(m)) => {
|
Err(TrySendError::Full(m)) => {
|
||||||
|
@ -331,25 +295,23 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: Mutex<Data = ()> {}
|
impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: MutexKind {}
|
||||||
|
|
||||||
struct CloseFuture<'ch, M, T, const N: usize>
|
struct CloseFuture<'ch, M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
sender: Sender<'ch, M, T, N>,
|
channel: &'ch Channel<M, T, N>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Future for CloseFuture<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
type Output = ();
|
type Output = ();
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
if Channel::lock(self.sender.channel_cell, |c| {
|
if self.channel.lock(|c| c.is_closed_with_context(Some(cx))) {
|
||||||
c.is_closed_with_context(Some(cx))
|
|
||||||
}) {
|
|
||||||
Poll::Ready(())
|
Poll::Ready(())
|
||||||
} else {
|
} else {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
@ -359,22 +321,21 @@ where
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Drop for Sender<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
Channel::lock(self.channel_cell, |c| c.deregister_sender())
|
self.channel.lock(|c| c.deregister_sender())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
|
impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
#[allow(clippy::clone_double_ref)]
|
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Channel::lock(self.channel_cell, |c| c.register_sender());
|
self.channel.lock(|c| c.register_sender());
|
||||||
Sender {
|
Sender {
|
||||||
channel_cell: self.channel_cell.clone(),
|
channel: self.channel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -446,10 +407,7 @@ impl<T> defmt::Format for TrySendError<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ChannelState<T, const N: usize> {
|
struct ChannelState<T, const N: usize> {
|
||||||
buf: [MaybeUninit<UnsafeCell<T>>; N],
|
queue: Deque<T, N>,
|
||||||
read_pos: usize,
|
|
||||||
write_pos: usize,
|
|
||||||
full: bool,
|
|
||||||
closed: bool,
|
closed: bool,
|
||||||
receiver_registered: bool,
|
receiver_registered: bool,
|
||||||
senders_registered: u32,
|
senders_registered: u32,
|
||||||
|
@ -458,14 +416,9 @@ struct ChannelState<T, const N: usize> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, const N: usize> ChannelState<T, N> {
|
impl<T, const N: usize> ChannelState<T, N> {
|
||||||
const INIT: MaybeUninit<UnsafeCell<T>> = MaybeUninit::uninit();
|
|
||||||
|
|
||||||
const fn new() -> Self {
|
const fn new() -> Self {
|
||||||
ChannelState {
|
ChannelState {
|
||||||
buf: [Self::INIT; N],
|
queue: Deque::new(),
|
||||||
read_pos: 0,
|
|
||||||
write_pos: 0,
|
|
||||||
full: false,
|
|
||||||
closed: false,
|
closed: false,
|
||||||
receiver_registered: false,
|
receiver_registered: false,
|
||||||
senders_registered: 0,
|
senders_registered: 0,
|
||||||
|
@ -479,17 +432,16 @@ impl<T, const N: usize> ChannelState<T, N> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||||
if self.read_pos != self.write_pos || self.full {
|
if self.queue.is_full() {
|
||||||
if self.full {
|
self.senders_waker.wake();
|
||||||
self.full = false;
|
}
|
||||||
self.senders_waker.wake();
|
|
||||||
}
|
if let Some(message) = self.queue.pop_front() {
|
||||||
let message = unsafe { (self.buf[self.read_pos]).assume_init_mut().get().read() };
|
|
||||||
self.read_pos = (self.read_pos + 1) % self.buf.len();
|
|
||||||
Ok(message)
|
Ok(message)
|
||||||
} else if !self.closed {
|
} else if !self.closed {
|
||||||
cx.into_iter()
|
if let Some(cx) = cx {
|
||||||
.for_each(|cx| self.set_receiver_waker(&cx.waker()));
|
self.set_receiver_waker(cx.waker());
|
||||||
|
}
|
||||||
Err(TryRecvError::Empty)
|
Err(TryRecvError::Empty)
|
||||||
} else {
|
} else {
|
||||||
Err(TryRecvError::Closed)
|
Err(TryRecvError::Closed)
|
||||||
|
@ -505,22 +457,21 @@ impl<T, const N: usize> ChannelState<T, N> {
|
||||||
message: T,
|
message: T,
|
||||||
cx: Option<&mut Context<'_>>,
|
cx: Option<&mut Context<'_>>,
|
||||||
) -> Result<(), TrySendError<T>> {
|
) -> Result<(), TrySendError<T>> {
|
||||||
if !self.closed {
|
if self.closed {
|
||||||
if !self.full {
|
return Err(TrySendError::Closed(message));
|
||||||
self.buf[self.write_pos] = MaybeUninit::new(message.into());
|
}
|
||||||
self.write_pos = (self.write_pos + 1) % self.buf.len();
|
|
||||||
if self.write_pos == self.read_pos {
|
match self.queue.push_back(message) {
|
||||||
self.full = true;
|
Ok(()) => {
|
||||||
}
|
|
||||||
self.receiver_waker.wake();
|
self.receiver_waker.wake();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
}
|
||||||
|
Err(message) => {
|
||||||
cx.into_iter()
|
cx.into_iter()
|
||||||
.for_each(|cx| self.set_senders_waker(&cx.waker()));
|
.for_each(|cx| self.set_senders_waker(&cx.waker()));
|
||||||
Err(TrySendError::Full(message))
|
Err(TrySendError::Full(message))
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
Err(TrySendError::Closed(message))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,16 +536,6 @@ impl<T, const N: usize> ChannelState<T, N> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, const N: usize> Drop for ChannelState<T, N> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
while self.read_pos != self.write_pos || self.full {
|
|
||||||
self.full = false;
|
|
||||||
unsafe { ptr::drop_in_place(self.buf[self.read_pos].as_mut_ptr()) };
|
|
||||||
self.read_pos = (self.read_pos + 1) % N;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A a bounded mpsc channel for communicating between asynchronous tasks
|
/// A a bounded mpsc channel for communicating between asynchronous tasks
|
||||||
/// with backpressure.
|
/// with backpressure.
|
||||||
///
|
///
|
||||||
|
@ -605,61 +546,35 @@ impl<T, const N: usize> Drop for ChannelState<T, N> {
|
||||||
/// All data sent will become available in the same order as it was sent.
|
/// All data sent will become available in the same order as it was sent.
|
||||||
pub struct Channel<M, T, const N: usize>
|
pub struct Channel<M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
channel_cell: UnsafeCell<ChannelCell<M, T, N>>,
|
inner: M::Mutex<RefCell<ChannelState<T, N>>>,
|
||||||
receiver_consumed: PhantomData<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ChannelCell<M, T, const N: usize>
|
|
||||||
where
|
|
||||||
M: Mutex<Data = ()>,
|
|
||||||
{
|
|
||||||
mutex: M,
|
|
||||||
state: ChannelState<T, N>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type WithCriticalSections = CriticalSectionMutex<()>;
|
|
||||||
|
|
||||||
pub type WithThreadModeOnly = ThreadModeMutex<()>;
|
|
||||||
|
|
||||||
pub type WithNoThreads = NoopMutex<()>;
|
|
||||||
|
|
||||||
impl<M, T, const N: usize> Channel<M, T, N>
|
impl<M, T, const N: usize> Channel<M, T, N>
|
||||||
where
|
where
|
||||||
M: Mutex<Data = ()>,
|
M: MutexKind,
|
||||||
{
|
{
|
||||||
/// Establish a new bounded channel. For example, to create one with a NoopMutex:
|
/// Establish a new bounded channel. For example, to create one with a NoopMutex:
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// use embassy::channel::mpsc;
|
/// use embassy::channel::mpsc;
|
||||||
/// use embassy::channel::mpsc::{Channel, WithNoThreads};
|
/// use embassy::blocking_mutex::kind::Noop;
|
||||||
|
/// use embassy::channel::mpsc::Channel;
|
||||||
///
|
///
|
||||||
/// // Declare a bounded channel of 3 u32s.
|
/// // Declare a bounded channel of 3 u32s.
|
||||||
/// let mut channel = Channel::<WithNoThreads, u32, 3>::new();
|
/// let mut channel = Channel::<Noop, u32, 3>::new();
|
||||||
/// // once we have a channel, obtain its sender and receiver
|
/// // once we have a channel, obtain its sender and receiver
|
||||||
/// let (sender, receiver) = mpsc::split(&mut channel);
|
/// let (sender, receiver) = mpsc::split(&mut channel);
|
||||||
/// ```
|
/// ```
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let mutex = M::new(());
|
Self {
|
||||||
let state = ChannelState::new();
|
inner: M::Mutex::new(RefCell::new(ChannelState::new())),
|
||||||
let channel_cell = ChannelCell { mutex, state };
|
|
||||||
Channel {
|
|
||||||
channel_cell: UnsafeCell::new(channel_cell),
|
|
||||||
receiver_consumed: PhantomData,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock<R>(
|
fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
|
||||||
channel_cell: &UnsafeCell<ChannelCell<M, T, N>>,
|
self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
|
||||||
f: impl FnOnce(&mut ChannelState<T, N>) -> R,
|
|
||||||
) -> R {
|
|
||||||
unsafe {
|
|
||||||
let channel_cell = &mut *(channel_cell.get());
|
|
||||||
let mutex = &mut channel_cell.mutex;
|
|
||||||
let mut state = &mut channel_cell.state;
|
|
||||||
mutex.lock(|_| f(&mut state))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -671,20 +586,13 @@ mod tests {
|
||||||
use futures_executor::ThreadPool;
|
use futures_executor::ThreadPool;
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
|
|
||||||
|
use crate::blocking_mutex::kind::{CriticalSection, Noop};
|
||||||
use crate::util::Forever;
|
use crate::util::Forever;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
|
fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
|
||||||
if !c.full {
|
c.queue.capacity() - c.queue.len()
|
||||||
if c.write_pos > c.read_pos {
|
|
||||||
(c.buf.len() - c.write_pos) + c.read_pos
|
|
||||||
} else {
|
|
||||||
(c.buf.len() - c.read_pos) + c.write_pos
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -747,7 +655,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn simple_send_and_receive() {
|
fn simple_send_and_receive() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
let mut c = Channel::<Noop, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
assert!(s.clone().try_send(1).is_ok());
|
assert!(s.clone().try_send(1).is_ok());
|
||||||
assert_eq!(r.try_recv().unwrap(), 1);
|
assert_eq!(r.try_recv().unwrap(), 1);
|
||||||
|
@ -755,7 +663,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_close_without_sender() {
|
fn should_close_without_sender() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
let mut c = Channel::<Noop, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
drop(s);
|
drop(s);
|
||||||
match r.try_recv() {
|
match r.try_recv() {
|
||||||
|
@ -766,7 +674,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_close_once_drained() {
|
fn should_close_once_drained() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
let mut c = Channel::<Noop, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
assert!(s.try_send(1).is_ok());
|
assert!(s.try_send(1).is_ok());
|
||||||
drop(s);
|
drop(s);
|
||||||
|
@ -779,7 +687,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_reject_send_when_receiver_dropped() {
|
fn should_reject_send_when_receiver_dropped() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
let mut c = Channel::<Noop, u32, 3>::new();
|
||||||
let (s, r) = split(&mut c);
|
let (s, r) = split(&mut c);
|
||||||
drop(r);
|
drop(r);
|
||||||
match s.try_send(1) {
|
match s.try_send(1) {
|
||||||
|
@ -790,7 +698,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_reject_send_when_channel_closed() {
|
fn should_reject_send_when_channel_closed() {
|
||||||
let mut c = Channel::<WithNoThreads, u32, 3>::new();
|
let mut c = Channel::<Noop, u32, 3>::new();
|
||||||
let (s, mut r) = split(&mut c);
|
let (s, mut r) = split(&mut c);
|
||||||
assert!(s.try_send(1).is_ok());
|
assert!(s.try_send(1).is_ok());
|
||||||
r.close();
|
r.close();
|
||||||
|
@ -806,7 +714,7 @@ mod tests {
|
||||||
async fn receiver_closes_when_sender_dropped_async() {
|
async fn receiver_closes_when_sender_dropped_async() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
||||||
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
|
static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new();
|
||||||
let c = CHANNEL.put(Channel::new());
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, mut r) = split(c);
|
let (s, mut r) = split(c);
|
||||||
assert!(executor
|
assert!(executor
|
||||||
|
@ -821,7 +729,7 @@ mod tests {
|
||||||
async fn receiver_receives_given_try_send_async() {
|
async fn receiver_receives_given_try_send_async() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
||||||
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 3>> = Forever::new();
|
static CHANNEL: Forever<Channel<CriticalSection, u32, 3>> = Forever::new();
|
||||||
let c = CHANNEL.put(Channel::new());
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, mut r) = split(c);
|
let (s, mut r) = split(c);
|
||||||
assert!(executor
|
assert!(executor
|
||||||
|
@ -834,7 +742,7 @@ mod tests {
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_send_completes_if_capacity() {
|
async fn sender_send_completes_if_capacity() {
|
||||||
let mut c = Channel::<WithCriticalSections, u32, 1>::new();
|
let mut c = Channel::<CriticalSection, u32, 1>::new();
|
||||||
let (s, mut r) = split(&mut c);
|
let (s, mut r) = split(&mut c);
|
||||||
assert!(s.send(1).await.is_ok());
|
assert!(s.send(1).await.is_ok());
|
||||||
assert_eq!(r.recv().await, Some(1));
|
assert_eq!(r.recv().await, Some(1));
|
||||||
|
@ -842,7 +750,7 @@ mod tests {
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_send_completes_if_closed() {
|
async fn sender_send_completes_if_closed() {
|
||||||
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
|
||||||
let c = CHANNEL.put(Channel::new());
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, r) = split(c);
|
let (s, r) = split(c);
|
||||||
drop(r);
|
drop(r);
|
||||||
|
@ -856,7 +764,7 @@ mod tests {
|
||||||
async fn senders_sends_wait_until_capacity() {
|
async fn senders_sends_wait_until_capacity() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
||||||
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
|
||||||
let c = CHANNEL.put(Channel::new());
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s0, mut r) = split(c);
|
let (s0, mut r) = split(c);
|
||||||
assert!(s0.try_send(1).is_ok());
|
assert!(s0.try_send(1).is_ok());
|
||||||
|
@ -876,7 +784,7 @@ mod tests {
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_close_completes_if_closing() {
|
async fn sender_close_completes_if_closing() {
|
||||||
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
|
||||||
let c = CHANNEL.put(Channel::new());
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, mut r) = split(c);
|
let (s, mut r) = split(c);
|
||||||
r.close();
|
r.close();
|
||||||
|
@ -885,7 +793,7 @@ mod tests {
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn sender_close_completes_if_closed() {
|
async fn sender_close_completes_if_closed() {
|
||||||
static CHANNEL: Forever<Channel<WithCriticalSections, u32, 1>> = Forever::new();
|
static CHANNEL: Forever<Channel<CriticalSection, u32, 1>> = Forever::new();
|
||||||
let c = CHANNEL.put(Channel::new());
|
let c = CHANNEL.put(Channel::new());
|
||||||
let (s, r) = split(c);
|
let (s, r) = split(c);
|
||||||
drop(r);
|
drop(r);
|
||||||
|
|
|
@ -6,7 +6,8 @@
|
||||||
mod example_common;
|
mod example_common;
|
||||||
|
|
||||||
use defmt::unwrap;
|
use defmt::unwrap;
|
||||||
use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError, WithNoThreads};
|
use embassy::blocking_mutex::kind::Noop;
|
||||||
|
use embassy::channel::mpsc::{self, Channel, Sender, TryRecvError};
|
||||||
use embassy::executor::Spawner;
|
use embassy::executor::Spawner;
|
||||||
use embassy::time::{Duration, Timer};
|
use embassy::time::{Duration, Timer};
|
||||||
use embassy::util::Forever;
|
use embassy::util::Forever;
|
||||||
|
@ -19,10 +20,10 @@ enum LedState {
|
||||||
Off,
|
Off,
|
||||||
}
|
}
|
||||||
|
|
||||||
static CHANNEL: Forever<Channel<WithNoThreads, LedState, 1>> = Forever::new();
|
static CHANNEL: Forever<Channel<Noop, LedState, 1>> = Forever::new();
|
||||||
|
|
||||||
#[embassy::task(pool_size = 1)]
|
#[embassy::task(pool_size = 1)]
|
||||||
async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) {
|
async fn my_task(sender: Sender<'static, Noop, LedState, 1>) {
|
||||||
loop {
|
loop {
|
||||||
let _ = sender.send(LedState::On).await;
|
let _ = sender.send(LedState::On).await;
|
||||||
Timer::after(Duration::from_secs(1)).await;
|
Timer::after(Duration::from_secs(1)).await;
|
||||||
|
|
Loading…
Reference in a new issue