Merge #712
712: Add types for channel dynamic dispatch r=lulf a=lulf * Add internal DynamicChannel trait implemented by Channel that allows polling for internal state in a lock safe manner and does not require knowing the channel size. * Existing usage of Sender and Receiver is preserved and does not use dynamic dispatch. * Add DynamicSender and DynamicReceiver types that references the channel using the DynamicChannel trait and does not require the const generic channel size parameter. Having the ability not know the channel size is very convenient when you don't want to change all of your channel using code when tuning the size. With this change, existing usage can be kept, and those willing to pay the price for dynamic dispatch may do so. Co-authored-by: Ulf Lilleengen <lulf@redhat.com>
This commit is contained in:
commit
ac3986e40e
1 changed files with 178 additions and 6 deletions
|
@ -66,6 +66,48 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send-only access to a [`Channel`] without knowing channel size.
|
||||||
|
#[derive(Copy)]
|
||||||
|
pub struct DynamicSender<'ch, T> {
|
||||||
|
channel: &'ch dyn DynamicChannel<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, T> Clone for DynamicSender<'ch, T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
DynamicSender {
|
||||||
|
channel: self.channel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(s: Sender<'ch, M, T, N>) -> Self {
|
||||||
|
Self { channel: s.channel }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, T> DynamicSender<'ch, T> {
|
||||||
|
/// Sends a value.
|
||||||
|
///
|
||||||
|
/// See [`Channel::send()`]
|
||||||
|
pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
|
||||||
|
DynamicSendFuture {
|
||||||
|
channel: self.channel,
|
||||||
|
message: Some(message),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately send a message.
|
||||||
|
///
|
||||||
|
/// See [`Channel::send()`]
|
||||||
|
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
|
||||||
|
self.channel.try_send_with_context(message, None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Receive-only access to a [`Channel`].
|
/// Receive-only access to a [`Channel`].
|
||||||
#[derive(Copy)]
|
#[derive(Copy)]
|
||||||
pub struct Receiver<'ch, M, T, const N: usize>
|
pub struct Receiver<'ch, M, T, const N: usize>
|
||||||
|
@ -105,6 +147,47 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Receive-only access to a [`Channel`] without knowing channel size.
|
||||||
|
#[derive(Copy)]
|
||||||
|
pub struct DynamicReceiver<'ch, T> {
|
||||||
|
channel: &'ch dyn DynamicChannel<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
DynamicReceiver {
|
||||||
|
channel: self.channel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, T> DynamicReceiver<'ch, T> {
|
||||||
|
/// Receive the next value.
|
||||||
|
///
|
||||||
|
/// See [`Channel::recv()`].
|
||||||
|
pub fn recv(&self) -> DynamicRecvFuture<'_, T> {
|
||||||
|
DynamicRecvFuture {
|
||||||
|
channel: self.channel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately receive the next value.
|
||||||
|
///
|
||||||
|
/// See [`Channel::try_recv()`]
|
||||||
|
pub fn try_recv(&self) -> Result<T, TryRecvError> {
|
||||||
|
self.channel.try_recv_with_context(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(s: Receiver<'ch, M, T, N>) -> Self {
|
||||||
|
Self { channel: s.channel }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct RecvFuture<'ch, M, T, const N: usize>
|
pub struct RecvFuture<'ch, M, T, const N: usize>
|
||||||
where
|
where
|
||||||
M: RawMutex,
|
M: RawMutex,
|
||||||
|
@ -119,11 +202,25 @@ where
|
||||||
type Output = T;
|
type Output = T;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||||
self.channel
|
match self.channel.try_recv_with_context(Some(cx)) {
|
||||||
.lock(|c| match c.try_recv_with_context(Some(cx)) {
|
Ok(v) => Poll::Ready(v),
|
||||||
Ok(v) => Poll::Ready(v),
|
Err(TryRecvError::Empty) => Poll::Pending,
|
||||||
Err(TryRecvError::Empty) => Poll::Pending,
|
}
|
||||||
})
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DynamicRecvFuture<'ch, T> {
|
||||||
|
channel: &'ch dyn DynamicChannel<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, T> Future for DynamicRecvFuture<'ch, T> {
|
||||||
|
type Output = T;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
|
||||||
|
match self.channel.try_recv_with_context(Some(cx)) {
|
||||||
|
Ok(v) => Poll::Ready(v),
|
||||||
|
Err(TryRecvError::Empty) => Poll::Pending,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,7 +240,7 @@ where
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.message.take() {
|
match self.message.take() {
|
||||||
Some(m) => match self.channel.lock(|c| c.try_send_with_context(m, Some(cx))) {
|
Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
|
||||||
Ok(..) => Poll::Ready(()),
|
Ok(..) => Poll::Ready(()),
|
||||||
Err(TrySendError::Full(m)) => {
|
Err(TrySendError::Full(m)) => {
|
||||||
self.message = Some(m);
|
self.message = Some(m);
|
||||||
|
@ -157,6 +254,40 @@ where
|
||||||
|
|
||||||
impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
|
impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
|
||||||
|
|
||||||
|
pub struct DynamicSendFuture<'ch, T> {
|
||||||
|
channel: &'ch dyn DynamicChannel<T>,
|
||||||
|
message: Option<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match self.message.take() {
|
||||||
|
Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
|
||||||
|
Ok(..) => Poll::Ready(()),
|
||||||
|
Err(TrySendError::Full(m)) => {
|
||||||
|
self.message = Some(m);
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => panic!("Message cannot be None"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
|
||||||
|
|
||||||
|
trait DynamicChannel<T> {
|
||||||
|
fn try_send_with_context(
|
||||||
|
&self,
|
||||||
|
message: T,
|
||||||
|
cx: Option<&mut Context<'_>>,
|
||||||
|
) -> Result<(), TrySendError<T>>;
|
||||||
|
|
||||||
|
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError>;
|
||||||
|
}
|
||||||
|
|
||||||
/// Error returned by [`try_recv`](Channel::try_recv).
|
/// Error returned by [`try_recv`](Channel::try_recv).
|
||||||
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||||
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
||||||
|
@ -287,6 +418,18 @@ where
|
||||||
self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
|
self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||||
|
self.lock(|c| c.try_recv_with_context(cx))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_send_with_context(
|
||||||
|
&self,
|
||||||
|
m: T,
|
||||||
|
cx: Option<&mut Context<'_>>,
|
||||||
|
) -> Result<(), TrySendError<T>> {
|
||||||
|
self.lock(|c| c.try_send_with_context(m, cx))
|
||||||
|
}
|
||||||
|
|
||||||
/// Get a sender for this channel.
|
/// Get a sender for this channel.
|
||||||
pub fn sender(&self) -> Sender<'_, M, T, N> {
|
pub fn sender(&self) -> Sender<'_, M, T, N> {
|
||||||
Sender { channel: self }
|
Sender { channel: self }
|
||||||
|
@ -339,6 +482,25 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
|
||||||
|
/// tradeoff cost of dynamic dispatch.
|
||||||
|
impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn try_send_with_context(
|
||||||
|
&self,
|
||||||
|
m: T,
|
||||||
|
cx: Option<&mut Context<'_>>,
|
||||||
|
) -> Result<(), TrySendError<T>> {
|
||||||
|
Channel::try_send_with_context(self, m, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_recv_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||||
|
Channel::try_recv_with_context(self, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use core::time::Duration;
|
use core::time::Duration;
|
||||||
|
@ -411,6 +573,16 @@ mod tests {
|
||||||
let _ = s1.clone();
|
let _ = s1.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dynamic_dispatch() {
|
||||||
|
let c = Channel::<NoopRawMutex, u32, 3>::new();
|
||||||
|
let s: DynamicSender<'_, u32> = c.sender().into();
|
||||||
|
let r: DynamicReceiver<'_, u32> = c.receiver().into();
|
||||||
|
|
||||||
|
assert!(s.try_send(1).is_ok());
|
||||||
|
assert_eq!(r.try_recv().unwrap(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn receiver_receives_given_try_send_async() {
|
async fn receiver_receives_given_try_send_async() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|
Loading…
Reference in a new issue