Removed the closing state as it was not required
This commit is contained in:
parent
a247fa4f2c
commit
d86892ca56
1 changed files with 13 additions and 22 deletions
|
@ -390,7 +390,6 @@ struct ChannelState<T, const N: usize> {
|
|||
read_pos: usize,
|
||||
write_pos: usize,
|
||||
full: bool,
|
||||
closing: bool,
|
||||
closed: bool,
|
||||
receiver_registered: bool,
|
||||
senders_registered: u32,
|
||||
|
@ -407,7 +406,6 @@ impl<T, const N: usize> ChannelState<T, N> {
|
|||
read_pos: 0,
|
||||
write_pos: 0,
|
||||
full: false,
|
||||
closing: false,
|
||||
closed: false,
|
||||
receiver_registered: false,
|
||||
senders_registered: 0,
|
||||
|
@ -528,25 +526,18 @@ where
|
|||
fn try_recv_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryRecvError> {
|
||||
let mut state = &mut self.state;
|
||||
self.mutex.lock(|_| {
|
||||
if !state.closed {
|
||||
if state.read_pos != state.write_pos || state.full {
|
||||
if state.full {
|
||||
state.full = false;
|
||||
state.senders_waker.wake();
|
||||
}
|
||||
let message =
|
||||
unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() };
|
||||
state.read_pos = (state.read_pos + 1) % state.buf.len();
|
||||
Ok(message)
|
||||
} else if !state.closing {
|
||||
cx.into_iter()
|
||||
.for_each(|cx| Self::set_receiver_waker(&mut state, &cx.waker()));
|
||||
Err(TryRecvError::Empty)
|
||||
} else {
|
||||
state.closed = true;
|
||||
if state.read_pos != state.write_pos || state.full {
|
||||
if state.full {
|
||||
state.full = false;
|
||||
state.senders_waker.wake();
|
||||
Err(TryRecvError::Closed)
|
||||
}
|
||||
let message = unsafe { (state.buf[state.read_pos]).assume_init_mut().get().read() };
|
||||
state.read_pos = (state.read_pos + 1) % state.buf.len();
|
||||
Ok(message)
|
||||
} else if !state.closed {
|
||||
cx.into_iter()
|
||||
.for_each(|cx| Self::set_receiver_waker(&mut state, &cx.waker()));
|
||||
Err(TryRecvError::Empty)
|
||||
} else {
|
||||
Err(TryRecvError::Closed)
|
||||
}
|
||||
|
@ -588,7 +579,7 @@ where
|
|||
let state = &mut self.state;
|
||||
self.mutex.lock(|_| {
|
||||
state.receiver_waker.wake();
|
||||
state.closing = true;
|
||||
state.closed = true;
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -599,7 +590,7 @@ where
|
|||
fn is_closed_with_context(&mut self, cx: Option<&mut Context<'_>>) -> bool {
|
||||
let mut state = &mut self.state;
|
||||
self.mutex.lock(|_| {
|
||||
if state.closing || state.closed {
|
||||
if state.closed {
|
||||
cx.into_iter()
|
||||
.for_each(|cx| Self::set_senders_waker(&mut state, &cx.waker()));
|
||||
true
|
||||
|
@ -642,7 +633,7 @@ where
|
|||
state.senders_registered -= 1;
|
||||
if state.senders_registered == 0 {
|
||||
state.receiver_waker.wake();
|
||||
state.closing = true;
|
||||
state.closed = true;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue