Added example and some defmt
This commit is contained in:
parent
1eec7e69f1
commit
78c546f356
2 changed files with 108 additions and 0 deletions
|
@ -370,6 +370,7 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
|
|||
|
||||
/// Error type for the [PubSubChannel]
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
||||
pub enum Error {
|
||||
/// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
|
||||
/// the capacity of the channels must be increased.
|
||||
|
@ -404,6 +405,7 @@ pub trait PubSubBehavior<T> {
|
|||
|
||||
/// The result of the subscriber wait procedure
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
|
||||
pub enum WaitResult<T> {
|
||||
/// The subscriber did not receive all messages and lagged by the given amount of messages.
|
||||
/// (This is the amount of messages that were missed)
|
||||
|
|
106
examples/nrf/src/bin/pubsub.rs
Normal file
106
examples/nrf/src/bin/pubsub.rs
Normal file
|
@ -0,0 +1,106 @@
|
|||
#![no_std]
|
||||
#![no_main]
|
||||
#![feature(type_alias_impl_trait)]
|
||||
|
||||
use defmt::unwrap;
|
||||
use embassy::blocking_mutex::raw::ThreadModeRawMutex;
|
||||
use embassy::channel::pubsub::{DynSubscriber, PubSubChannel, Subscriber};
|
||||
use embassy::executor::Spawner;
|
||||
use embassy::time::{Duration, Timer};
|
||||
use {defmt_rtt as _, panic_probe as _};
|
||||
|
||||
/// Create the message bus. It has a queue of 4, supports 3 subscribers and 1 publisher
|
||||
static MESSAGE_BUS: PubSubChannel<ThreadModeRawMutex, Message, 4, 3, 1> = PubSubChannel::new();
|
||||
|
||||
#[derive(Clone, defmt::Format)]
|
||||
enum Message {
|
||||
A,
|
||||
B,
|
||||
C,
|
||||
}
|
||||
|
||||
#[embassy::main]
|
||||
async fn main(spawner: Spawner, _p: embassy_nrf::Peripherals) {
|
||||
defmt::info!("Hello World!");
|
||||
|
||||
// It's good to set up the subscribers before publishing anything.
|
||||
// A subscriber will only yield messages that have been published after its creation.
|
||||
|
||||
spawner.must_spawn(fast_logger(unwrap!(MESSAGE_BUS.subscriber())));
|
||||
spawner.must_spawn(slow_logger(unwrap!(MESSAGE_BUS.dyn_subscriber())));
|
||||
spawner.must_spawn(slow_logger_pure(unwrap!(MESSAGE_BUS.dyn_subscriber())));
|
||||
|
||||
// Get a publisher
|
||||
let message_publisher = unwrap!(MESSAGE_BUS.publisher());
|
||||
// We can't get more (normal) publishers
|
||||
// We can have an infinite amount of immediate publishers. They can't await a publish, only do an immediate publish
|
||||
defmt::assert!(MESSAGE_BUS.publisher().is_err());
|
||||
|
||||
let mut index = 0;
|
||||
loop {
|
||||
Timer::after(Duration::from_millis(500)).await;
|
||||
|
||||
let message = match index % 3 {
|
||||
0 => Message::A,
|
||||
1 => Message::B,
|
||||
2..=u32::MAX => Message::C,
|
||||
};
|
||||
|
||||
// We publish immediately and don't await anything.
|
||||
// If the queue is full, it will cause the oldest message to not be received by some/all subscribers
|
||||
message_publisher.publish_immediate(message);
|
||||
|
||||
// Try to comment out the last one and uncomment this line below.
|
||||
// The behaviour will change:
|
||||
// - The subscribers won't miss any messages any more
|
||||
// - Trying to publish now has some wait time when the queue is full
|
||||
|
||||
// message_publisher.publish(message).await;
|
||||
|
||||
index += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// A logger task that just awaits the messages it receives
|
||||
///
|
||||
/// This takes the generic `Subscriber`. This is most performant, but requires you to write down all of the generics
|
||||
#[embassy::task]
|
||||
async fn fast_logger(mut messages: Subscriber<'static, ThreadModeRawMutex, Message, 4, 3, 1>) {
|
||||
loop {
|
||||
let message = messages.next_message().await;
|
||||
defmt::info!("Received message at fast logger: {:?}", message);
|
||||
}
|
||||
}
|
||||
|
||||
/// A logger task that awaits the messages, but also does some other work.
|
||||
/// Because of this, depeding on how the messages were published, the subscriber might miss some messages
|
||||
///
|
||||
/// This takes the dynamic `DynSubscriber`. This is not as performant as the generic version, but let's you ignore some of the generics
|
||||
#[embassy::task]
|
||||
async fn slow_logger(mut messages: DynSubscriber<'static, Message>) {
|
||||
loop {
|
||||
// Do some work
|
||||
Timer::after(Duration::from_millis(2000)).await;
|
||||
|
||||
// If the publisher has used the `publish_immediate` function, then we may receive a lag message here
|
||||
let message = messages.next_message().await;
|
||||
defmt::info!("Received message at slow logger: {:?}", message);
|
||||
|
||||
// If the previous one was a lag message, then we should receive the next message here immediately
|
||||
let message = messages.next_message().await;
|
||||
defmt::info!("Received message at slow logger: {:?}", message);
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as `slow_logger` but it ignores lag results
|
||||
#[embassy::task]
|
||||
async fn slow_logger_pure(mut messages: DynSubscriber<'static, Message>) {
|
||||
loop {
|
||||
// Do some work
|
||||
Timer::after(Duration::from_millis(2000)).await;
|
||||
|
||||
// Instead of receiving lags here, we just ignore that and read the next message
|
||||
let message = messages.next_message_pure().await;
|
||||
defmt::info!("Received message at slow logger pure: {:?}", message);
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue