Clear owner on dequeue

This commit is contained in:
Dániel Buga 2024-12-16 21:17:31 +01:00
parent 6c12a967b0
commit 7ecbe7366a
No known key found for this signature in database
2 changed files with 112 additions and 24 deletions

View File

@ -256,16 +256,14 @@ impl Driver for EmbassyTimer {
unsafe { unsafe {
// If we have multiple queues, we have integrated timers and our own timer queue // If we have multiple queues, we have integrated timers and our own timer queue
// implementation. // implementation.
use embassy_executor::raw::{Executor as RawExecutor, TaskRef}; use embassy_executor::raw::Executor as RawExecutor;
use portable_atomic::{AtomicPtr, Ordering}; use portable_atomic::{AtomicPtr, Ordering};
let task = embassy_executor::raw::task_from_waker(waker); let task = embassy_executor::raw::task_from_waker(waker);
let mut executor = unsafe { // SAFETY: it is impossible to schedule a task that has not yet been spawned,
// SAFETY: it is impossible to schedule a task that has not yet been spawned, // so the executor is guaranteed to be set to a non-null value.
// so the executor is guaranteed to be set to a non-null value. let mut executor = task.executor().unwrap_unchecked() as *const RawExecutor;
task.executor().unwrap_unchecked() as *const RawExecutor
};
let owner = task let owner = task
.timer_queue_item() .timer_queue_item()

View File

@ -64,13 +64,119 @@ impl TimerQueue {
next_expiration = self.inner.lock(|q| adapter::dequeue(q, next_expiration)); next_expiration = self.inner.lock(|q| adapter::dequeue(q, next_expiration));
} }
} }
pub fn schedule_wake(&self, at: u64, waker: &core::task::Waker) {
if self.inner.lock(|q| q.schedule_wake(at, waker)) {
self.dispatch();
}
}
} }
#[cfg(integrated_timers)] #[cfg(integrated_timers)]
mod adapter { mod adapter {
use core::cell::RefCell; use core::{
cell::{Cell, RefCell},
cmp::min,
ptr,
task::Waker,
};
type Q = embassy_time_queue_driver::queue_integrated::Queue; use embassy_executor::raw::TaskRef;
use portable_atomic::{AtomicPtr, Ordering};
/// Copy of the embassy integrated timer queue, that clears the owner upon
/// dequeueing.
pub struct Q {
head: Cell<Option<TaskRef>>,
}
impl Q {
/// Creates a new timer queue.
pub const fn new() -> Self {
Self {
head: Cell::new(None),
}
}
/// Schedules a task to run at a specific time.
///
/// If this function returns `true`, the called should find the next
/// expiration time and set a new alarm for that time.
pub fn schedule_wake(&mut self, at: u64, waker: &Waker) -> bool {
let task = embassy_executor::raw::task_from_waker(waker);
let item = task.timer_queue_item();
if item.next.get().is_none() {
// If not in the queue, add it and update.
let prev = self.head.replace(Some(task));
item.next.set(if prev.is_none() {
Some(unsafe { TaskRef::dangling() })
} else {
prev
});
item.expires_at.set(at);
true
} else if at <= item.expires_at.get() {
// If expiration is sooner than previously set, update.
item.expires_at.set(at);
true
} else {
// Task does not need to be updated.
false
}
}
/// Dequeues expired timers and returns the next alarm time.
///
/// The provided callback will be called for each expired task. Tasks
/// that never expire will be removed, but the callback will not
/// be called.
pub fn next_expiration(&mut self, now: u64) -> u64 {
let mut next_expiration = u64::MAX;
self.retain(|p| {
let item = p.timer_queue_item();
let expires = item.expires_at.get();
if expires <= now {
// Timer expired, process task.
embassy_executor::raw::wake_task(p);
false
} else {
// Timer didn't yet expire, or never expires.
next_expiration = min(next_expiration, expires);
expires != u64::MAX
}
});
next_expiration
}
fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
let mut prev = &self.head;
while let Some(p) = prev.get() {
if unsafe { p == TaskRef::dangling() } {
// prev was the last item, stop
break;
}
let item = p.timer_queue_item();
if f(p) {
// Skip to next
prev = &item.next;
} else {
// Remove it
prev.set(item.next.get());
// Clear owner
unsafe {
// SAFETY: our payload is an AtomicPtr.
item.payload
.as_ref::<AtomicPtr<()>>()
.store(ptr::null_mut(), Ordering::Relaxed);
}
item.next.set(None);
}
}
}
}
/// A simple wrapper around a `Queue` to provide interior mutability. /// A simple wrapper around a `Queue` to provide interior mutability.
pub struct RefCellQueue { pub struct RefCellQueue {
@ -102,14 +208,6 @@ mod adapter {
pub(super) fn dequeue(q: &RawQueue, now: u64) -> u64 { pub(super) fn dequeue(q: &RawQueue, now: u64) -> u64 {
q.next_expiration(now) q.next_expiration(now)
} }
impl super::TimerQueue {
pub fn schedule_wake(&self, at: u64, task: &core::task::Waker) {
if self.inner.lock(|q| q.schedule_wake(at, task)) {
self.dispatch();
}
}
}
} }
#[cfg(generic_timers)] #[cfg(generic_timers)]
@ -150,12 +248,4 @@ mod adapter {
pub(super) fn dequeue(q: &RawQueue, now: u64) -> u64 { pub(super) fn dequeue(q: &RawQueue, now: u64) -> u64 {
q.next_expiration(now) q.next_expiration(now)
} }
impl super::TimerQueue {
pub fn schedule_wake(&self, at: u64, waker: &Waker) {
if self.inner.lock(|q| q.schedule_wake(at, waker)) {
self.dispatch();
}
}
}
} }