diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index faaf99dc6..5989e86ec 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -371,6 +371,20 @@ impl PubSubSta .iter_mut() .skip(current_message_index) .for_each(|(_, counter)| *counter -= 1); + + let mut wake_publishers = false; + while let Some((_, count)) = self.queue.front() { + if *count == 0 { + self.queue.pop_front().unwrap(); + wake_publishers = true; + } else { + break; + } + } + + if wake_publishers { + self.publisher_wakers.wake(); + } } } @@ -612,4 +626,37 @@ mod tests { sub1.next_message().await; assert_eq!(pub0.space(), 4); } + + #[futures_test::test] + async fn empty_channel_when_last_subscriber_is_dropped() { + let channel = PubSubChannel::::new(); + + let pub0 = channel.publisher().unwrap(); + let mut sub0 = channel.subscriber().unwrap(); + let mut sub1 = channel.subscriber().unwrap(); + + assert_eq!(4, pub0.space()); + + pub0.publish(1).await; + pub0.publish(2).await; + + assert_eq!(2, channel.space()); + + assert_eq!(1, sub0.try_next_message_pure().unwrap()); + assert_eq!(2, sub0.try_next_message_pure().unwrap()); + + assert_eq!(2, channel.space()); + + drop(sub0); + + assert_eq!(2, channel.space()); + + assert_eq!(1, sub1.try_next_message_pure().unwrap()); + + assert_eq!(3, channel.space()); + + drop(sub1); + + assert_eq!(4, channel.space()); + } }