From 78c546f3566d4289ebd9512fa3e0a03eee6fd87f Mon Sep 17 00:00:00 2001 From: Dion Dokter Date: Tue, 21 Jun 2022 15:47:20 +0200 Subject: [PATCH] Added example and some defmt --- embassy/src/channel/pubsub/mod.rs | 2 + examples/nrf/src/bin/pubsub.rs | 106 ++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 examples/nrf/src/bin/pubsub.rs diff --git a/embassy/src/channel/pubsub/mod.rs b/embassy/src/channel/pubsub/mod.rs index 12f0d24ab..9bfb845e0 100644 --- a/embassy/src/channel/pubsub/mod.rs +++ b/embassy/src/channel/pubsub/mod.rs @@ -370,6 +370,7 @@ impl PubSubSta /// Error type for the [PubSubChannel] #[derive(Debug, PartialEq, Clone)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum Error { /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or /// the capacity of the channels must be increased. @@ -404,6 +405,7 @@ pub trait PubSubBehavior { /// The result of the subscriber wait procedure #[derive(Debug, Clone, PartialEq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum WaitResult { /// The subscriber did not receive all messages and lagged by the given amount of messages. /// (This is the amount of messages that were missed) diff --git a/examples/nrf/src/bin/pubsub.rs b/examples/nrf/src/bin/pubsub.rs new file mode 100644 index 000000000..2c3a355c2 --- /dev/null +++ b/examples/nrf/src/bin/pubsub.rs @@ -0,0 +1,106 @@ +#![no_std] +#![no_main] +#![feature(type_alias_impl_trait)] + +use defmt::unwrap; +use embassy::blocking_mutex::raw::ThreadModeRawMutex; +use embassy::channel::pubsub::{DynSubscriber, PubSubChannel, Subscriber}; +use embassy::executor::Spawner; +use embassy::time::{Duration, Timer}; +use {defmt_rtt as _, panic_probe as _}; + +/// Create the message bus. It has a queue of 4, supports 3 subscribers and 1 publisher +static MESSAGE_BUS: PubSubChannel = PubSubChannel::new(); + +#[derive(Clone, defmt::Format)] +enum Message { + A, + B, + C, +} + +#[embassy::main] +async fn main(spawner: Spawner, _p: embassy_nrf::Peripherals) { + defmt::info!("Hello World!"); + + // It's good to set up the subscribers before publishing anything. + // A subscriber will only yield messages that have been published after its creation. + + spawner.must_spawn(fast_logger(unwrap!(MESSAGE_BUS.subscriber()))); + spawner.must_spawn(slow_logger(unwrap!(MESSAGE_BUS.dyn_subscriber()))); + spawner.must_spawn(slow_logger_pure(unwrap!(MESSAGE_BUS.dyn_subscriber()))); + + // Get a publisher + let message_publisher = unwrap!(MESSAGE_BUS.publisher()); + // We can't get more (normal) publishers + // We can have an infinite amount of immediate publishers. They can't await a publish, only do an immediate publish + defmt::assert!(MESSAGE_BUS.publisher().is_err()); + + let mut index = 0; + loop { + Timer::after(Duration::from_millis(500)).await; + + let message = match index % 3 { + 0 => Message::A, + 1 => Message::B, + 2..=u32::MAX => Message::C, + }; + + // We publish immediately and don't await anything. + // If the queue is full, it will cause the oldest message to not be received by some/all subscribers + message_publisher.publish_immediate(message); + + // Try to comment out the last one and uncomment this line below. + // The behaviour will change: + // - The subscribers won't miss any messages any more + // - Trying to publish now has some wait time when the queue is full + + // message_publisher.publish(message).await; + + index += 1; + } +} + +/// A logger task that just awaits the messages it receives +/// +/// This takes the generic `Subscriber`. This is most performant, but requires you to write down all of the generics +#[embassy::task] +async fn fast_logger(mut messages: Subscriber<'static, ThreadModeRawMutex, Message, 4, 3, 1>) { + loop { + let message = messages.next_message().await; + defmt::info!("Received message at fast logger: {:?}", message); + } +} + +/// A logger task that awaits the messages, but also does some other work. +/// Because of this, depeding on how the messages were published, the subscriber might miss some messages +/// +/// This takes the dynamic `DynSubscriber`. This is not as performant as the generic version, but let's you ignore some of the generics +#[embassy::task] +async fn slow_logger(mut messages: DynSubscriber<'static, Message>) { + loop { + // Do some work + Timer::after(Duration::from_millis(2000)).await; + + // If the publisher has used the `publish_immediate` function, then we may receive a lag message here + let message = messages.next_message().await; + defmt::info!("Received message at slow logger: {:?}", message); + + // If the previous one was a lag message, then we should receive the next message here immediately + let message = messages.next_message().await; + defmt::info!("Received message at slow logger: {:?}", message); + } +} + +/// Same as `slow_logger` but it ignores lag results +#[embassy::task] +async fn slow_logger_pure(mut messages: DynSubscriber<'static, Message>) { + loop { + // Do some work + Timer::after(Duration::from_millis(2000)).await; + + // Instead of receiving lags here, we just ignore that and read the next message + let message = messages.next_message_pure().await; + defmt::info!("Received message at slow logger pure: {:?}", message); + } +}