Add docs to zero-copy-channel
This commit is contained in:
parent
1eb03dc41a
commit
6e38b07642
1 changed files with 51 additions and 0 deletions
|
@ -1,3 +1,22 @@
|
||||||
|
//! A zero-copy queue for sending values between asynchronous tasks.
|
||||||
|
//!
|
||||||
|
//! It can be used concurrently by multiple producers (senders) and multiple
|
||||||
|
//! consumers (receivers), i.e. it is an "MPMC channel".
|
||||||
|
//!
|
||||||
|
//! Receivers are competing for messages. So a message that is received by
|
||||||
|
//! one receiver is not received by any other.
|
||||||
|
//!
|
||||||
|
//! This queue takes a Mutex type so that various
|
||||||
|
//! targets can be attained. For example, a ThreadModeMutex can be used
|
||||||
|
//! for single-core Cortex-M targets where messages are only passed
|
||||||
|
//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
|
||||||
|
//! can also be used for single-core targets where messages are to be
|
||||||
|
//! passed from exception mode e.g. out of an interrupt handler.
|
||||||
|
//!
|
||||||
|
//! This module provides a bounded channel that has a limit on the number of
|
||||||
|
//! messages that it can store, and if this limit is reached, trying to send
|
||||||
|
//! another message will result in an error being returned.
|
||||||
|
|
||||||
use core::cell::RefCell;
|
use core::cell::RefCell;
|
||||||
use core::future::poll_fn;
|
use core::future::poll_fn;
|
||||||
use core::marker::PhantomData;
|
use core::marker::PhantomData;
|
||||||
|
@ -7,6 +26,17 @@ use crate::blocking_mutex::raw::RawMutex;
|
||||||
use crate::blocking_mutex::Mutex;
|
use crate::blocking_mutex::Mutex;
|
||||||
use crate::waitqueue::WakerRegistration;
|
use crate::waitqueue::WakerRegistration;
|
||||||
|
|
||||||
|
/// A bounded zero-copy channel for communicating between asynchronous tasks
|
||||||
|
/// with backpressure.
|
||||||
|
///
|
||||||
|
/// The channel will buffer up to the provided number of messages. Once the
|
||||||
|
/// buffer is full, attempts to `send` new messages will wait until a message is
|
||||||
|
/// received from the channel.
|
||||||
|
///
|
||||||
|
/// All data sent will become available in the same order as it was sent.
|
||||||
|
///
|
||||||
|
/// The channel requires a buffer of recyclable elements. Writing to the channel is done through
|
||||||
|
/// an `&mut T`.
|
||||||
pub struct Channel<'a, M: RawMutex, T> {
|
pub struct Channel<'a, M: RawMutex, T> {
|
||||||
buf: *mut T,
|
buf: *mut T,
|
||||||
phantom: PhantomData<&'a mut T>,
|
phantom: PhantomData<&'a mut T>,
|
||||||
|
@ -14,6 +44,10 @@ pub struct Channel<'a, M: RawMutex, T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, M: RawMutex, T> Channel<'a, M, T> {
|
impl<'a, M: RawMutex, T> Channel<'a, M, T> {
|
||||||
|
/// Initialize a new [`Channel`].
|
||||||
|
///
|
||||||
|
/// The provided buffer will be used and reused by the channel's logic, and thus dictates the
|
||||||
|
/// channel's capacity.
|
||||||
pub fn new(buf: &'a mut [T]) -> Self {
|
pub fn new(buf: &'a mut [T]) -> Self {
|
||||||
let len = buf.len();
|
let len = buf.len();
|
||||||
assert!(len != 0);
|
assert!(len != 0);
|
||||||
|
@ -32,20 +66,27 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Creates a [`Sender`] and [`Receiver`] from an existing channel.
|
||||||
|
///
|
||||||
|
/// Further Senders and Receivers can be created through [`Sender::borrow`] and
|
||||||
|
/// [`Receiver::borrow`] respectively.
|
||||||
pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
|
pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
|
||||||
(Sender { channel: self }, Receiver { channel: self })
|
(Sender { channel: self }, Receiver { channel: self })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send-only access to a [`Channel`].
|
||||||
pub struct Sender<'a, M: RawMutex, T> {
|
pub struct Sender<'a, M: RawMutex, T> {
|
||||||
channel: &'a Channel<'a, M, T>,
|
channel: &'a Channel<'a, M, T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, M: RawMutex, T> Sender<'a, M, T> {
|
impl<'a, M: RawMutex, T> Sender<'a, M, T> {
|
||||||
|
/// Creates one further [`Sender`] over the same channel.
|
||||||
pub fn borrow(&mut self) -> Sender<'_, M, T> {
|
pub fn borrow(&mut self) -> Sender<'_, M, T> {
|
||||||
Sender { channel: self.channel }
|
Sender { channel: self.channel }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to send a value over the channel.
|
||||||
pub fn try_send(&mut self) -> Option<&mut T> {
|
pub fn try_send(&mut self) -> Option<&mut T> {
|
||||||
self.channel.state.lock(|s| {
|
self.channel.state.lock(|s| {
|
||||||
let s = &mut *s.borrow_mut();
|
let s = &mut *s.borrow_mut();
|
||||||
|
@ -56,6 +97,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to send a value over the channel.
|
||||||
pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
||||||
self.channel.state.lock(|s| {
|
self.channel.state.lock(|s| {
|
||||||
let s = &mut *s.borrow_mut();
|
let s = &mut *s.borrow_mut();
|
||||||
|
@ -69,6 +111,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Asynchronously send a value over the channel.
|
||||||
pub async fn send(&mut self) -> &mut T {
|
pub async fn send(&mut self) -> &mut T {
|
||||||
let i = poll_fn(|cx| {
|
let i = poll_fn(|cx| {
|
||||||
self.channel.state.lock(|s| {
|
self.channel.state.lock(|s| {
|
||||||
|
@ -86,19 +129,24 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> {
|
||||||
unsafe { &mut *self.channel.buf.add(i) }
|
unsafe { &mut *self.channel.buf.add(i) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Notify the channel that the sending of the value has been finalized.
|
||||||
pub fn send_done(&mut self) {
|
pub fn send_done(&mut self) {
|
||||||
self.channel.state.lock(|s| s.borrow_mut().push_done())
|
self.channel.state.lock(|s| s.borrow_mut().push_done())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Receive-only access to a [`Channel`].
|
||||||
pub struct Receiver<'a, M: RawMutex, T> {
|
pub struct Receiver<'a, M: RawMutex, T> {
|
||||||
channel: &'a Channel<'a, M, T>,
|
channel: &'a Channel<'a, M, T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
|
impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
|
||||||
|
/// Creates one further [`Sender`] over the same channel.
|
||||||
pub fn borrow(&mut self) -> Receiver<'_, M, T> {
|
pub fn borrow(&mut self) -> Receiver<'_, M, T> {
|
||||||
Receiver { channel: self.channel }
|
Receiver { channel: self.channel }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to receive a value over the channel.
|
||||||
pub fn try_receive(&mut self) -> Option<&mut T> {
|
pub fn try_receive(&mut self) -> Option<&mut T> {
|
||||||
self.channel.state.lock(|s| {
|
self.channel.state.lock(|s| {
|
||||||
let s = &mut *s.borrow_mut();
|
let s = &mut *s.borrow_mut();
|
||||||
|
@ -109,6 +157,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to asynchronously receive a value over the channel.
|
||||||
pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
||||||
self.channel.state.lock(|s| {
|
self.channel.state.lock(|s| {
|
||||||
let s = &mut *s.borrow_mut();
|
let s = &mut *s.borrow_mut();
|
||||||
|
@ -122,6 +171,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Asynchronously receive a value over the channel.
|
||||||
pub async fn receive(&mut self) -> &mut T {
|
pub async fn receive(&mut self) -> &mut T {
|
||||||
let i = poll_fn(|cx| {
|
let i = poll_fn(|cx| {
|
||||||
self.channel.state.lock(|s| {
|
self.channel.state.lock(|s| {
|
||||||
|
@ -139,6 +189,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
|
||||||
unsafe { &mut *self.channel.buf.add(i) }
|
unsafe { &mut *self.channel.buf.add(i) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Notify the channel that the receiving of the value has been finalized.
|
||||||
pub fn receive_done(&mut self) {
|
pub fn receive_done(&mut self) {
|
||||||
self.channel.state.lock(|s| s.borrow_mut().pop_done())
|
self.channel.state.lock(|s| s.borrow_mut().pop_done())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue