fix(pubsub): Pop messages which count is 0 after unsubscribe
This commit is contained in:
parent
2c9f289f40
commit
472df3fad6
1 changed files with 47 additions and 0 deletions
|
@ -371,6 +371,20 @@ impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubSta
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.skip(current_message_index)
|
.skip(current_message_index)
|
||||||
.for_each(|(_, counter)| *counter -= 1);
|
.for_each(|(_, counter)| *counter -= 1);
|
||||||
|
|
||||||
|
let mut wake_publishers = false;
|
||||||
|
while let Some((_, count)) = self.queue.front() {
|
||||||
|
if *count == 0 {
|
||||||
|
self.queue.pop_front().unwrap();
|
||||||
|
wake_publishers = true;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if wake_publishers {
|
||||||
|
self.publisher_wakers.wake();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -612,4 +626,37 @@ mod tests {
|
||||||
sub1.next_message().await;
|
sub1.next_message().await;
|
||||||
assert_eq!(pub0.space(), 4);
|
assert_eq!(pub0.space(), 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[futures_test::test]
|
||||||
|
async fn empty_channel_when_last_subscriber_is_dropped() {
|
||||||
|
let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
|
||||||
|
|
||||||
|
let pub0 = channel.publisher().unwrap();
|
||||||
|
let mut sub0 = channel.subscriber().unwrap();
|
||||||
|
let mut sub1 = channel.subscriber().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(4, pub0.space());
|
||||||
|
|
||||||
|
pub0.publish(1).await;
|
||||||
|
pub0.publish(2).await;
|
||||||
|
|
||||||
|
assert_eq!(2, channel.space());
|
||||||
|
|
||||||
|
assert_eq!(1, sub0.try_next_message_pure().unwrap());
|
||||||
|
assert_eq!(2, sub0.try_next_message_pure().unwrap());
|
||||||
|
|
||||||
|
assert_eq!(2, channel.space());
|
||||||
|
|
||||||
|
drop(sub0);
|
||||||
|
|
||||||
|
assert_eq!(2, channel.space());
|
||||||
|
|
||||||
|
assert_eq!(1, sub1.try_next_message_pure().unwrap());
|
||||||
|
|
||||||
|
assert_eq!(3, channel.space());
|
||||||
|
|
||||||
|
drop(sub1);
|
||||||
|
|
||||||
|
assert_eq!(4, channel.space());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue