executor: miri fixes
This commit is contained in:
parent
bd6bab1625
commit
8d24cba72d
3 changed files with 26 additions and 32 deletions
|
@ -70,24 +70,6 @@ impl TaskHeader {
|
||||||
timer_queue_item: timer_queue::TimerQueueItem::new(),
|
timer_queue_item: timer_queue::TimerQueueItem::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) unsafe fn enqueue(&self) {
|
|
||||||
critical_section::with(|cs| {
|
|
||||||
let state = self.state.load(Ordering::Relaxed);
|
|
||||||
|
|
||||||
// If already scheduled, or if not started,
|
|
||||||
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark it as scheduled
|
|
||||||
self.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
|
|
||||||
|
|
||||||
// We have just marked the task as scheduled, so enqueue it.
|
|
||||||
let executor = &*self.executor.get();
|
|
||||||
executor.enqueue(cs, self as *const TaskHeader as *mut TaskHeader);
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Raw storage in which a task can be spawned.
|
/// Raw storage in which a task can be spawned.
|
||||||
|
@ -155,7 +137,7 @@ impl<F: Future + 'static> TaskStorage<F> {
|
||||||
// Initialize the task
|
// Initialize the task
|
||||||
self.raw.poll_fn.write(Self::poll);
|
self.raw.poll_fn.write(Self::poll);
|
||||||
self.future.write(future());
|
self.future.write(future());
|
||||||
NonNull::new_unchecked(&self.raw as *const TaskHeader as *mut TaskHeader)
|
NonNull::new_unchecked(self as *const TaskStorage<F> as *const TaskHeader as *mut TaskHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn poll(p: NonNull<TaskHeader>) {
|
unsafe fn poll(p: NonNull<TaskHeader>) {
|
||||||
|
@ -323,7 +305,7 @@ impl Executor {
|
||||||
/// - `task` must be set up to run in this executor.
|
/// - `task` must be set up to run in this executor.
|
||||||
/// - `task` must NOT be already enqueued (in this executor or another one).
|
/// - `task` must NOT be already enqueued (in this executor or another one).
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
unsafe fn enqueue(&self, cs: CriticalSection, task: *mut TaskHeader) {
|
unsafe fn enqueue(&self, cs: CriticalSection, task: NonNull<TaskHeader>) {
|
||||||
if self.run_queue.enqueue(cs, task) {
|
if self.run_queue.enqueue(cs, task) {
|
||||||
(self.signal_fn)(self.signal_ctx)
|
(self.signal_fn)(self.signal_ctx)
|
||||||
}
|
}
|
||||||
|
@ -339,11 +321,10 @@ impl Executor {
|
||||||
/// In this case, the task's Future must be Send. This is because this is effectively
|
/// In this case, the task's Future must be Send. This is because this is effectively
|
||||||
/// sending the task to the executor thread.
|
/// sending the task to the executor thread.
|
||||||
pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
|
pub(super) unsafe fn spawn(&'static self, task: NonNull<TaskHeader>) {
|
||||||
let task = task.as_ref();
|
task.as_ref().executor.set(self);
|
||||||
task.executor.set(self);
|
|
||||||
|
|
||||||
critical_section::with(|cs| {
|
critical_section::with(|cs| {
|
||||||
self.enqueue(cs, task as *const _ as _);
|
self.enqueue(cs, task);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,9 +347,7 @@ impl Executor {
|
||||||
/// no `poll()` already running.
|
/// no `poll()` already running.
|
||||||
pub unsafe fn poll(&'static self) {
|
pub unsafe fn poll(&'static self) {
|
||||||
#[cfg(feature = "time")]
|
#[cfg(feature = "time")]
|
||||||
self.timer_queue.dequeue_expired(Instant::now(), |p| {
|
self.timer_queue.dequeue_expired(Instant::now(), |task| wake_task(task));
|
||||||
p.as_ref().enqueue();
|
|
||||||
});
|
|
||||||
|
|
||||||
self.run_queue.dequeue_all(|p| {
|
self.run_queue.dequeue_all(|p| {
|
||||||
let task = p.as_ref();
|
let task = p.as_ref();
|
||||||
|
@ -421,7 +400,22 @@ impl Executor {
|
||||||
///
|
///
|
||||||
/// `task` must be a valid task pointer obtained from [`task_from_waker`].
|
/// `task` must be a valid task pointer obtained from [`task_from_waker`].
|
||||||
pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
|
pub unsafe fn wake_task(task: NonNull<TaskHeader>) {
|
||||||
task.as_ref().enqueue();
|
critical_section::with(|cs| {
|
||||||
|
let header = task.as_ref();
|
||||||
|
let state = header.state.load(Ordering::Relaxed);
|
||||||
|
|
||||||
|
// If already scheduled, or if not started,
|
||||||
|
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark it as scheduled
|
||||||
|
header.state.store(state | STATE_RUN_QUEUED, Ordering::Relaxed);
|
||||||
|
|
||||||
|
// We have just marked the task as scheduled, so enqueue it.
|
||||||
|
let executor = &*header.executor.get();
|
||||||
|
executor.enqueue(cs, task);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "time")]
|
#[cfg(feature = "time")]
|
||||||
|
|
|
@ -46,10 +46,10 @@ impl RunQueue {
|
||||||
///
|
///
|
||||||
/// `item` must NOT be already enqueued in any queue.
|
/// `item` must NOT be already enqueued in any queue.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: *mut TaskHeader) -> bool {
|
pub(crate) unsafe fn enqueue(&self, _cs: CriticalSection, task: NonNull<TaskHeader>) -> bool {
|
||||||
let prev = self.head.load(Ordering::Relaxed);
|
let prev = self.head.load(Ordering::Relaxed);
|
||||||
(*task).run_queue_item.next.store(prev, Ordering::Relaxed);
|
task.as_ref().run_queue_item.next.store(prev, Ordering::Relaxed);
|
||||||
self.head.store(task, Ordering::Relaxed);
|
self.head.store(task.as_ptr(), Ordering::Relaxed);
|
||||||
prev.is_null()
|
prev.is_null()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ use core::mem;
|
||||||
use core::ptr::NonNull;
|
use core::ptr::NonNull;
|
||||||
use core::task::{RawWaker, RawWakerVTable, Waker};
|
use core::task::{RawWaker, RawWakerVTable, Waker};
|
||||||
|
|
||||||
use super::TaskHeader;
|
use super::{wake_task, TaskHeader};
|
||||||
|
|
||||||
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
|
const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, drop);
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ unsafe fn clone(p: *const ()) -> RawWaker {
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn wake(p: *const ()) {
|
unsafe fn wake(p: *const ()) {
|
||||||
(*(p as *mut TaskHeader)).enqueue()
|
wake_task(NonNull::new_unchecked(p as *mut TaskHeader))
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn drop(_: *const ()) {
|
unsafe fn drop(_: *const ()) {
|
||||||
|
|
Loading…
Reference in a new issue