Removed all unsafe

This commit is contained in:
Dion Dokter 2022-06-16 22:13:26 +02:00
parent a614a55c7d
commit 2a4cdd05fa

View file

@ -356,12 +356,11 @@ impl<'a, T: Clone> Drop for Subscriber<'a, T> {
impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> { impl<'a, T: Clone> futures::Stream for Subscriber<'a, T> {
type Item = T; type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = unsafe { self.get_unchecked_mut() }; let sub_index = self.subscriber_index;
match self
match this
.channel .channel
.get_message_with_context(&mut this.next_message_id, this.subscriber_index, Some(cx)) .get_message_with_context(&mut self.next_message_id, sub_index, Some(cx))
{ {
Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)), Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
Poll::Ready(WaitResult::Lagged(_)) => { Poll::Ready(WaitResult::Lagged(_)) => {
@ -487,18 +486,16 @@ pub struct PublisherWaitFuture<'s, 'a, T: Clone> {
impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> { impl<'s, 'a, T: Clone> Future for PublisherWaitFuture<'s, 'a, T> {
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() }; let message = self.message.take().unwrap();
match self
let message = this.message.take().unwrap();
match this
.publisher .publisher
.channel .channel
.publish_with_context(message, this.publisher.publisher_index, Some(cx)) .publish_with_context(message, self.publisher.publisher_index, Some(cx))
{ {
Ok(()) => Poll::Ready(()), Ok(()) => Poll::Ready(()),
Err(message) => { Err(message) => {
this.message = Some(message); self.message = Some(message);
Poll::Pending Poll::Pending
} }
} }