rt: fix OneTimeInit<T> init race

This commit is contained in:
Mark Poliakov 2023-09-06 10:57:02 +03:00
parent cb65a1cff2
commit 8ff58a48d2
11 changed files with 71 additions and 118 deletions

View File

@ -4,7 +4,7 @@ use core::{
mem::MaybeUninit,
ops::{Deref, DerefMut},
panic,
sync::atomic::{AtomicBool, Ordering},
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
};
/// Statically-allocated "dynamic" vector
@ -17,32 +17,43 @@ pub struct StaticVector<T, const N: usize> {
#[repr(C)]
pub struct OneTimeInit<T> {
value: UnsafeCell<MaybeUninit<T>>,
state: AtomicBool,
state: AtomicUsize,
}
unsafe impl<T> Sync for OneTimeInit<T> {}
unsafe impl<T> Send for OneTimeInit<T> {}
impl<T> OneTimeInit<T> {
const STATE_UNINITIALIZED: usize = 0;
const STATE_INITIALIZING: usize = 1;
const STATE_INITIALIZED: usize = 2;
/// Wraps the value in an [OneTimeInit]
pub const fn new() -> Self {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicBool::new(false),
state: AtomicUsize::new(Self::STATE_UNINITIALIZED),
}
}
/// Returns `true` if the value has already been initialized
#[inline]
pub fn is_initialized(&self) -> bool {
self.state.load(Ordering::Acquire)
self.state.load(Ordering::Acquire) == Self::STATE_INITIALIZED
}
/// Sets the underlying value of the [OneTimeInit]. If already initialized, panics.
#[track_caller]
pub fn init(&self, value: T) {
// Transition to "initializing" state
if self
.state
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
.compare_exchange(
Self::STATE_UNINITIALIZED,
Self::STATE_INITIALIZING,
Ordering::Release,
Ordering::Relaxed,
)
.is_err()
{
panic!(
@ -54,13 +65,24 @@ impl<T> OneTimeInit<T> {
unsafe {
(*self.value.get()).write(value);
}
// Transition to "initialized" state. This must not fail
self.state
.compare_exchange(
Self::STATE_INITIALIZING,
Self::STATE_INITIALIZED,
Ordering::Release,
Ordering::Relaxed,
)
.unwrap();
}
/// Returns an immutable reference to the underlying value and panics if it hasn't yet been
/// initialized
#[track_caller]
pub fn get(&self) -> &T {
if !self.state.load(Ordering::Acquire) {
// TODO check for INITIALIZING state and wait until it becomes INITIALIZED?
if !self.is_initialized() {
panic!(
"{:?}: Attempt to dereference an uninitialized value",
panic::Location::caller()
@ -70,28 +92,10 @@ impl<T> OneTimeInit<T> {
unsafe { (*self.value.get()).assume_init_ref() }
}
#[track_caller]
pub fn or_init_with<F: FnOnce() -> T>(&self, init: F) -> &T {
if self
.state
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
.is_err()
{
// Already initialized
unsafe { (*self.value.get()).assume_init_ref() }
} else {
// Initialize
unsafe {
(*self.value.get()).write((init)());
(*self.value.get()).assume_init_ref()
}
}
}
/// Returns an immutable reference to the underlying value and [None] if the value hasn't yet
/// been initialized
pub fn try_get(&self) -> Option<&T> {
if self.state.load(Ordering::Acquire) {
if self.is_initialized() {
Some(self.get())
} else {
None

View File

@ -19,6 +19,7 @@ use crate::{
phys::{self, PageUsage},
ConvertAddress, KERNEL_VIRT_OFFSET,
},
task::runtime,
};
use super::smp::CPU_COUNT;
@ -91,6 +92,9 @@ unsafe extern "C" fn __x86_64_upper_entry() -> ! {
exception::init_exceptions(0);
// Initialize async executor queue
runtime::init_task_queue();
devfs::init();
// Initializes: local CPU, platform devices (timers/serials/etc), debug output

View File

@ -198,7 +198,10 @@ impl Architecture for X86_64 {
// CPU management
unsafe fn reset(&self) -> ! {
todo!()
Self::set_interrupt_mask(true);
loop {
Self::wait_for_interrupt();
}
}
unsafe fn send_ipi(&self, target: IpiDeliveryTarget, msg: CpuMessage) -> Result<(), Error> {

View File

@ -1,5 +1,5 @@
//! Console device interfaces
use core::mem::size_of;
use core::{mem::size_of, time::Duration};
use abi::{error::Error, primitive_enum};
use alloc::vec::Vec;
@ -13,7 +13,7 @@ use crate::{
ConvertAddress,
},
sync::IrqSafeSpinlock,
task::tasklet::TaskFlow,
task::runtime,
};
const CONSOLE_ROW_LEN: usize = 80;
@ -522,7 +522,10 @@ pub fn flush_consoles() {
}
/// Periodically flushes data from console buffers onto their displays
pub fn task_update_consoles() -> TaskFlow {
flush_consoles();
TaskFlow::Continue
pub async fn update_consoles_task() {
loop {
flush_consoles();
runtime::sleep(Duration::from_millis(20)).await;
}
}

View File

@ -1,10 +1,7 @@
use core::time::Duration;
use crate::task::{runtime, tasklet};
use crate::task::runtime;
pub fn tick(now: Duration) {
runtime::tick(now);
// TODO tasklets are no longer needed
tasklet::tick(now);
}

View File

@ -28,16 +28,12 @@ pub fn kinit() {
#[cfg(feature = "fb_console")]
{
use core::time::Duration;
use crate::{device::display::console::update_consoles_task, task::runtime};
use crate::device::display::console::task_update_consoles;
use crate::task::tasklet;
tasklet::add_periodic(
"update-console",
Duration::from_millis(15),
task_update_consoles,
);
runtime::spawn(async move {
update_consoles_task().await;
})
.expect("Could not start periodic console auto-flush task");
}
let root = match setup_root() {

View File

@ -17,7 +17,6 @@ pub mod context;
pub mod process;
pub mod runtime;
pub mod sched;
pub mod tasklet;
pub use context::{Cpu, TaskContext};

View File

@ -8,25 +8,21 @@ use crate::task::{process::Process, spawn_kernel_closure};
use super::{
task::Task,
task_queue::{TaskQueue, TASK_QUEUE},
task_queue::{self},
};
fn init_async_queue() -> TaskQueue {
TaskQueue::new(128)
}
pub fn enqueue(task: Arc<Task>) -> Result<(), Error> {
TASK_QUEUE.or_init_with(init_async_queue).enqueue(task)
task_queue::push_task(task)
}
pub fn spawn_async_worker(index: usize) -> Result<(), Error> {
let name = format!("[async-worker-{}]", index);
spawn_kernel_closure(name, move || {
let queue = TASK_QUEUE.or_init_with(init_async_queue);
// let queue = TASK_QUEUE.or_init_with(task_queue::init_async_queue);
loop {
let task = queue.block_pop().unwrap();
let task = task_queue::pop_task().unwrap(); // queue.block_pop().unwrap();
let mut future_slot = task.future.lock();
if let Some(mut future) = future_slot.take() {

View File

@ -15,6 +15,7 @@ mod task_queue;
mod waker;
pub use executor::{run_to_completion, spawn, spawn_async_worker};
pub use task_queue::init_task_queue;
pub use waker::QueueWaker;
pub struct PollFn<F> {

View File

@ -17,6 +17,7 @@ pub(super) struct TaskQueue {
impl TaskQueue {
pub fn new(task_capacity: usize) -> Self {
assert!(task_capacity > 0);
Self {
pending_workers: ArrayQueue::new(16),
task_queue: ArrayQueue::new(task_capacity),
@ -38,8 +39,7 @@ impl TaskQueue {
Ok(())
}
pub fn block_pop(&self) -> Result<Arc<Task>, Error> {
let _irq = IrqGuard::acquire();
pub fn dequeue(&self) -> Result<Arc<Task>, Error> {
let process = Process::current();
loop {
if let Some(task) = self.task_queue.pop() {
@ -49,7 +49,20 @@ impl TaskQueue {
if self.pending_workers.push(process.clone()).is_err() {
panic!("Pending worker queue overflow");
}
process.suspend();
}
}
}
pub fn init_task_queue() {
TASK_QUEUE.init(TaskQueue::new(128));
}
pub(super) fn push_task(task: Arc<Task>) -> Result<(), Error> {
TASK_QUEUE.get().enqueue(task)
}
pub(super) fn pop_task() -> Result<Arc<Task>, Error> {
TASK_QUEUE.get().dequeue()
}

View File

@ -1,63 +0,0 @@
//! Small deferred tasks which run in interrupt context
use alloc::{boxed::Box, vec::Vec};
use core::time::Duration;
use crate::sync::IrqSafeSpinlock;
/// Specifies whether a periodic task should be cancelled after its current iteration
#[derive(Debug, PartialEq)]
pub enum TaskFlow {
/// The task will be executed again later
Continue,
/// The task is finished and will be removed from the list
Cancel,
}
struct PeriodicTaskState {
last_run: Duration,
next_run: Duration,
interval: Duration,
f: Box<dyn Fn() -> TaskFlow>,
}
struct OneTimeTask {
run_time: Duration,
f: Box<dyn Fn()>,
}
static PERIODIC_TASKS: IrqSafeSpinlock<Vec<PeriodicTaskState>> = IrqSafeSpinlock::new(Vec::new());
static ONE_TIME_TASKS: IrqSafeSpinlock<Vec<OneTimeTask>> = IrqSafeSpinlock::new(Vec::new());
/// Setup a periodic task to run at a specified interval
pub fn add_periodic<F: Fn() -> TaskFlow + 'static>(name: &str, interval: Duration, f: F) {
debugln!("Schedule {:?} every {:?}", name, interval);
let f = Box::new(f);
PERIODIC_TASKS.lock().push(PeriodicTaskState {
interval,
last_run: Duration::ZERO,
next_run: Duration::ZERO,
f,
});
}
/// Updates the state of the tasklets, running the ones which meet their deadline conditions
pub fn tick(now: Duration) {
PERIODIC_TASKS.lock().retain_mut(|task| {
if now >= task.next_run {
task.last_run = now;
task.next_run = now + task.interval;
(task.f)() == TaskFlow::Continue
} else {
true
}
});
ONE_TIME_TASKS.lock().retain(|task| {
if now >= task.run_time {
(task.f)();
false
} else {
true
}
})
}