From 69c73454c182682d5ec132d7f0e0d3bfcb4652f9 Mon Sep 17 00:00:00 2001 From: Mark Poliakov Date: Tue, 2 Jan 2024 14:01:33 +0200 Subject: [PATCH] proc: shared memory + scheduler rework --- lib/kernel-util/src/mem/mod.rs | 6 +- lib/kernel-util/src/sync/mod.rs | 1 + lib/kernel-util/src/sync/spin_rwlock.rs | 162 ++++++++ lib/vfs/src/file/mod.rs | 25 +- lib/vfs/src/lib.rs | 2 + lib/vfs/src/shared_memory.rs | 38 ++ src/arch/mod.rs | 28 +- src/arch/x86_64/apic/mod.rs | 5 +- src/arch/x86_64/boot/mod.rs | 2 - src/arch/x86_64/cpu.rs | 37 +- src/arch/x86_64/exception.rs | 2 +- src/arch/x86_64/syscall.rs | 3 +- src/debug.rs | 2 +- src/init.rs | 4 +- src/mem/phys/mod.rs | 5 + src/syscall/mod.rs | 17 +- src/task/mod.rs | 2 +- src/task/process.rs | 12 +- src/task/sched.rs | 399 +++++++++---------- src/task/thread.rs | 504 ++++++++++++------------ 20 files changed, 732 insertions(+), 524 deletions(-) create mode 100644 lib/kernel-util/src/sync/spin_rwlock.rs create mode 100644 lib/vfs/src/shared_memory.rs diff --git a/lib/kernel-util/src/mem/mod.rs b/lib/kernel-util/src/mem/mod.rs index 972d56bc..bb96ffef 100644 --- a/lib/kernel-util/src/mem/mod.rs +++ b/lib/kernel-util/src/mem/mod.rs @@ -7,7 +7,7 @@ use core::{ use yggdrasil_abi::error::Error; -use crate::api::{__allocate_contiguous_pages, __free_page, __physicalize}; +use crate::api::{self, __allocate_contiguous_pages, __free_page, __physicalize}; use self::address::{AsPhysicalAddress, PhysicalAddress}; @@ -254,3 +254,7 @@ impl fmt::Display for PageBox { unsafe impl Send for PageBox {} unsafe impl Sync for PageBox {} + +pub fn allocate_page() -> Result { + unsafe { api::__allocate_page() } +} diff --git a/lib/kernel-util/src/sync/mod.rs b/lib/kernel-util/src/sync/mod.rs index b9df129b..3b8870eb 100644 --- a/lib/kernel-util/src/sync/mod.rs +++ b/lib/kernel-util/src/sync/mod.rs @@ -6,6 +6,7 @@ use yggdrasil_abi::error::Error; pub mod fence; pub mod guard; pub mod mutex; +pub mod spin_rwlock; pub mod spinlock; pub use fence::SpinFence; diff --git a/lib/kernel-util/src/sync/spin_rwlock.rs b/lib/kernel-util/src/sync/spin_rwlock.rs new file mode 100644 index 00000000..64e6f368 --- /dev/null +++ b/lib/kernel-util/src/sync/spin_rwlock.rs @@ -0,0 +1,162 @@ +use core::{ + cell::UnsafeCell, + ops::{Deref, DerefMut}, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use yggdrasil_abi::error::Error; + +use super::{IrqGuard, LockMethod}; + +struct RwLockInner { + value: AtomicUsize, +} + +pub struct IrqSafeRwLock { + value: UnsafeCell, + inner: RwLockInner, +} + +pub struct IrqSafeRwLockReadGuard<'a, T> { + lock: &'a IrqSafeRwLock, + guard: IrqGuard, +} + +pub struct IrqSafeRwLockWriteGuard<'a, T> { + lock: &'a IrqSafeRwLock, + guard: IrqGuard, +} + +impl RwLockInner { + const LOCKED_READ: usize = 1 << 2; + const LOCKED_WRITE: usize = 1; + + const fn new() -> Self { + Self { + value: AtomicUsize::new(0), + } + } + + #[inline] + fn acquire_read_raw(&self) -> usize { + let value = self.value.fetch_add(Self::LOCKED_READ, Ordering::Acquire); + + if value > usize::MAX / 2 { + self.value.fetch_sub(Self::LOCKED_READ, Ordering::Relaxed); + panic!("Too many read locks acquired"); + } + + value + } + + #[inline] + fn try_acquire_read(&self) -> bool { + let value = self.acquire_read_raw(); + let acquired = value & Self::LOCKED_WRITE != Self::LOCKED_WRITE; + + if !acquired { + unsafe { + self.release_read(); + } + } + + acquired + } + + #[inline] + fn try_acquire_write(&self) -> bool { + self.value + .compare_exchange(0, Self::LOCKED_WRITE, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + #[inline] + fn acquire_read(&self) { + while !self.try_acquire_read() { + core::hint::spin_loop(); + } + } + + #[inline] + fn acquire_write(&self) { + while !self.try_acquire_write() { + core::hint::spin_loop(); + } + } + + #[inline] + unsafe fn release_read(&self) { + self.value.fetch_sub(Self::LOCKED_READ, Ordering::Release); + } + + #[inline] + unsafe fn release_write(&self) { + self.value.fetch_and(!Self::LOCKED_WRITE, Ordering::Release); + } +} + +impl IrqSafeRwLock { + pub const fn new(value: T) -> Self { + Self { + value: UnsafeCell::new(value), + inner: RwLockInner::new(), + } + } + + pub fn read(&self) -> IrqSafeRwLockReadGuard { + let guard = IrqGuard::acquire(); + self.inner.acquire_read(); + IrqSafeRwLockReadGuard { lock: self, guard } + } + + pub fn write(&self) -> IrqSafeRwLockWriteGuard { + let guard = IrqGuard::acquire(); + self.inner.acquire_write(); + IrqSafeRwLockWriteGuard { lock: self, guard } + } + + unsafe fn release_read(&self) { + self.inner.release_read(); + } + + unsafe fn release_write(&self) { + self.inner.release_write(); + } +} + +unsafe impl Sync for IrqSafeRwLock {} +unsafe impl Send for IrqSafeRwLock {} + +impl<'a, T> Deref for IrqSafeRwLockReadGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.lock.value.get() } + } +} + +impl<'a, T> Drop for IrqSafeRwLockReadGuard<'a, T> { + fn drop(&mut self) { + unsafe { self.lock.release_read() } + } +} + +impl<'a, T> Deref for IrqSafeRwLockWriteGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.lock.value.get() } + } +} + +impl<'a, T> DerefMut for IrqSafeRwLockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.lock.value.get() } + } +} + +impl<'a, T> Drop for IrqSafeRwLockWriteGuard<'a, T> { + fn drop(&mut self) { + unsafe { self.lock.release_write() } + } +} diff --git a/lib/vfs/src/file/mod.rs b/lib/vfs/src/file/mod.rs index 4f0ad75b..44b0efc0 100644 --- a/lib/vfs/src/file/mod.rs +++ b/lib/vfs/src/file/mod.rs @@ -24,7 +24,7 @@ use crate::{ device::{BlockDeviceWrapper, CharDeviceWrapper}, node::NodeRef, traits::{Read, Seek, Write}, - FdPoll, FileReadiness, + FdPoll, FileReadiness, SharedMemory, }; use self::{ @@ -64,6 +64,7 @@ pub enum File { AnonymousPipe(PipeEnd), Poll(FdPoll), Channel(ChannelDescriptor), + SharedMemory(Arc), } /// Contains a per-process fd -> FileRef map @@ -92,6 +93,12 @@ impl File { Arc::new(Self::Channel(channel)) } + /// Creates a buffer of shared memory and associates a [File] with it + pub fn new_shared_memory(size: usize) -> Result, Error> { + let shm = SharedMemory::new(size)?; + Ok(Arc::new(Self::SharedMemory(Arc::new(shm)))) + } + pub(crate) fn directory(node: NodeRef, position: DirectoryOpenPosition) -> Arc { let position = IrqSafeSpinlock::new(position.into()); Arc::new(Self::Directory(DirectoryFile { node, position })) @@ -166,6 +173,7 @@ impl File { pub fn send(&self) -> Result, Error> { match self { Self::Regular(file) => Ok(Arc::new(Self::Regular(file.clone()))), + Self::SharedMemory(shm) => Ok(Arc::new(Self::SharedMemory(shm.clone()))), _ => Err(Error::InvalidOperation), } } @@ -185,7 +193,7 @@ impl File { Self::Regular(file) => Some(&file.node), Self::Block(file) => Some(&file.node), Self::Char(file) => Some(&file.node), - Self::AnonymousPipe(_) | Self::Poll(_) | Self::Channel(_) => None, + _ => None, } } @@ -222,6 +230,7 @@ impl PageProvider for File { fn get_page(&self, offset: u64) -> Result { match self { Self::Block(f) => f.device.0.get_page(offset), + Self::SharedMemory(f) => f.get_page(offset), _ => Err(Error::InvalidOperation), } } @@ -229,6 +238,7 @@ impl PageProvider for File { fn release_page(&self, offset: u64, phys: PhysicalAddress) -> Result<(), Error> { match self { Self::Block(f) => f.device.0.release_page(offset, phys), + Self::SharedMemory(f) => f.release_page(offset, phys), _ => Err(Error::InvalidOperation), } } @@ -245,6 +255,7 @@ impl Read for File { Self::Poll(_) => Err(Error::InvalidOperation), // TODO maybe allow reading messages from Channels? Self::Channel(_) => Err(Error::InvalidOperation), + Self::SharedMemory(_) => Err(Error::InvalidOperation), Self::Directory(_) => Err(Error::IsADirectory), } } @@ -261,6 +272,7 @@ impl Write for File { Self::Poll(_) => Err(Error::InvalidOperation), // TODO maybe allow writing messages to Channels? Self::Channel(_) => Err(Error::InvalidOperation), + Self::SharedMemory(_) => Err(Error::InvalidOperation), Self::Directory(_) => Err(Error::IsADirectory), } } @@ -271,10 +283,8 @@ impl Seek for File { match self { Self::Regular(file) => Ok(*file.position.lock()), Self::Block(file) => Ok(*file.position.lock()), - Self::Char(_) | Self::AnonymousPipe(_) | Self::Poll(_) | Self::Channel(_) => { - Err(Error::InvalidOperation) - } Self::Directory(_) => Err(Error::IsADirectory), + _ => Err(Error::InvalidOperation), } } @@ -282,10 +292,8 @@ impl Seek for File { match self { Self::Regular(file) => file.seek(from), Self::Block(file) => file.seek(from), - Self::Char(_) | Self::AnonymousPipe(_) | Self::Poll(_) | Self::Channel(_) => { - Err(Error::InvalidOperation) - } Self::Directory(_) => Err(Error::IsADirectory), + _ => Err(Error::InvalidOperation), } } } @@ -314,6 +322,7 @@ impl fmt::Debug for File { Self::AnonymousPipe(_) => f.debug_struct("AnonymousPipe").finish_non_exhaustive(), Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(), Self::Channel(_) => f.debug_struct("Channel").finish_non_exhaustive(), + Self::SharedMemory(_) => f.debug_struct("SharedMemory").finish_non_exhaustive(), } } } diff --git a/lib/vfs/src/lib.rs b/lib/vfs/src/lib.rs index 44665a0a..3e3ca8e1 100644 --- a/lib/vfs/src/lib.rs +++ b/lib/vfs/src/lib.rs @@ -17,6 +17,7 @@ pub(crate) mod ioctx; pub(crate) mod node; pub(crate) mod path; pub(crate) mod poll; +pub(crate) mod shared_memory; pub(crate) mod traits; pub use channel::MessagePayload; @@ -28,4 +29,5 @@ pub use node::{ RegularImpl, SymlinkImpl, }; pub use poll::FdPoll; +pub use shared_memory::SharedMemory; pub use traits::{FileReadiness, Read, Seek, Write}; diff --git a/lib/vfs/src/shared_memory.rs b/lib/vfs/src/shared_memory.rs new file mode 100644 index 00000000..5ab1d387 --- /dev/null +++ b/lib/vfs/src/shared_memory.rs @@ -0,0 +1,38 @@ +use alloc::vec::Vec; +use kernel_util::mem::{address::PhysicalAddress, allocate_page, PageProvider}; +use yggdrasil_abi::error::Error; + +/// Shared memory VFS object +pub struct SharedMemory { + pages: Vec, +} + +impl SharedMemory { + /// Creates a new buffer of shared memory + pub fn new(size: usize) -> Result { + assert_eq!(size & 0xFFF, 0); + let page_count = size / 0x1000; + + let pages = (0..page_count) + .map(|_| allocate_page()) + .collect::>()?; + + Ok(Self { pages }) + } +} + +impl PageProvider for SharedMemory { + fn get_page(&self, offset: u64) -> Result { + // TODO: magic numbers + let index = (offset / 0x1000) as usize; + self.pages + .get(index) + .copied() + .ok_or(Error::InvalidMemoryOperation) + } + + fn release_page(&self, _offset: u64, _phys: PhysicalAddress) -> Result<(), Error> { + // TODO track get/release? + Ok(()) + } +} diff --git a/src/arch/mod.rs b/src/arch/mod.rs index 8f21ee51..9641c84c 100644 --- a/src/arch/mod.rs +++ b/src/arch/mod.rs @@ -37,7 +37,7 @@ use kernel_util::{ use crate::{ mem::phys::PhysicalMemoryRegion, - task::{sched::CpuQueue, Cpu}, + task::{sched::CpuQueue, thread::ThreadId, Cpu}, }; cfg_if! { @@ -242,11 +242,17 @@ pub trait CpuAccess: Sized { /// Safe wrapper for accessing local CPU type Local: LocalCpuAccess; - /// Returns the local CPU wrapper - fn local() -> Self::Local; /// Returns the local CPU wrapper or None, if not yet initialized: fn try_local() -> Option; + /// Returns the local CPU wrapper + fn local() -> Self::Local { + Self::try_local().expect("Local CPU has not yet been initialized") + } + + /// Returns the ID of the local processor or 0 if the processor has not yet been initialized + fn local_id() -> u32; + /// Initializes the CPU's scheduling queue. /// /// # Panics @@ -254,11 +260,23 @@ pub trait CpuAccess: Sized { /// Will panic, if the initialization has already been performed. fn init_queue(&mut self, queue: &'static CpuQueue); - /// Returns the CPU's scheduling queue - fn queue(&self) -> &'static CpuQueue; + /// Returns the CPU's scheduling queue or None if it has not yet been initialized + fn get_queue(&self) -> Option<&'static CpuQueue>; + + /// Returns the CPU's scheduling queue or panics if it has not yet been initialized + fn queue(&self) -> &'static CpuQueue { + self.get_queue() + .expect("CPU's queue has not yet been initialized") + } /// Returns the CPU index fn id(&self) -> u32; + + /// Returns current thread ID or none if idle + fn current_thread_id(&self) -> Option; + + /// Update the current thread ID + unsafe fn set_current_thread_id(&mut self, id: Option); } // External API for architecture specifics diff --git a/src/arch/x86_64/apic/mod.rs b/src/arch/x86_64/apic/mod.rs index 982422e0..d816d388 100644 --- a/src/arch/x86_64/apic/mod.rs +++ b/src/arch/x86_64/apic/mod.rs @@ -100,7 +100,10 @@ unsafe extern "C" fn local_timer_irq_handler(frame: *mut IrqFrame) { let cpu = Cpu::local(); // Clear interrupt before switching, because otherwise we won't receive the next one cpu.local_apic().clear_interrupt(); - cpu.queue().yield_cpu(); + + if let Some(queue) = cpu.get_queue() { + queue.yield_cpu(); + } if let Some(thread) = Thread::get_current() { thread.handle_pending_signals(frame); diff --git a/src/arch/x86_64/boot/mod.rs b/src/arch/x86_64/boot/mod.rs index f7b891ca..4667b1ae 100644 --- a/src/arch/x86_64/boot/mod.rs +++ b/src/arch/x86_64/boot/mod.rs @@ -120,8 +120,6 @@ pub extern "C" fn __x86_64_ap_entry() -> ! { // Still not initialized: GDT, IDT, CPU features, syscall, kernel_gs_base - infoln!("cpu{} initializing", cpu_id); - unsafe { // Cpu::init_local(LocalApic::new(), cpu_id as u32); // syscall::init_syscall(); diff --git a/src/arch/x86_64/cpu.rs b/src/arch/x86_64/cpu.rs index f8689381..13bbecb5 100644 --- a/src/arch/x86_64/cpu.rs +++ b/src/arch/x86_64/cpu.rs @@ -17,7 +17,7 @@ use crate::{ x86_64::{cpuid, gdt, registers::MSR_IA32_KERNEL_GS_BASE, syscall}, CpuAccess, CpuMessage, LocalCpuAccess, }, - task::sched::CpuQueue, + task::{sched::CpuQueue, thread::ThreadId}, }; use super::{apic::local::LocalApic, smp::CPU_COUNT}; @@ -35,6 +35,7 @@ pub struct Cpu { id: u32, local_apic: LocalApic, + current_thread_id: Option, queue: OneTimeInit<&'static CpuQueue>, } @@ -79,8 +80,6 @@ impl Cpu { /// /// Only meant to be called once per each CPU during their init. pub(super) unsafe fn init_local(local_apic: LocalApic, id: u32) { - infoln!("Initialize CPU with id {}", id); - // Initialize CPU features cpuid::enable_features(); @@ -93,6 +92,7 @@ impl Cpu { id, local_apic, + current_thread_id: None, queue: OneTimeInit::new(), }); let this = Box::into_raw(this); @@ -101,6 +101,8 @@ impl Cpu { MSR_IA32_KERNEL_GS_BASE.set(this as u64); core::arch::asm!("wbinvd; swapgs"); + infoln!("cpu{} initialized", id); + syscall::init_syscall(); } @@ -140,10 +142,7 @@ impl LocalCpu { impl CpuAccess for Cpu { type Local = LocalCpu; - fn local() -> Self::Local { - Self::try_local().unwrap() - } - + #[inline] fn try_local() -> Option { let mut addr: usize; let guard = IrqGuard::acquire(); @@ -154,16 +153,36 @@ impl CpuAccess for Cpu { } } + #[inline] fn id(&self) -> u32 { self.id } + #[inline] + fn current_thread_id(&self) -> Option { + self.current_thread_id + } + + #[inline] + unsafe fn set_current_thread_id(&mut self, id: Option) { + self.current_thread_id = id; + } + + #[inline] + fn local_id() -> u32 { + if let Some(local) = Self::try_local() { + local.id() + } else { + 0 + } + } + fn init_queue(&mut self, queue: &'static CpuQueue) { self.queue.init(queue); } - fn queue(&self) -> &'static CpuQueue { - self.queue.get() + fn get_queue(&self) -> Option<&'static CpuQueue> { + self.queue.try_get().copied() } } diff --git a/src/arch/x86_64/exception.rs b/src/arch/x86_64/exception.rs index 7ebff9d7..71f3e4df 100644 --- a/src/arch/x86_64/exception.rs +++ b/src/arch/x86_64/exception.rs @@ -286,7 +286,7 @@ static mut IDT: [Entry; SIZE] = [Entry::NULL; SIZE]; fn user_exception_inner(kind: ExceptionKind, frame: &ExceptionFrame) { let thread = Thread::current(); - warnln!("{:?} in {} {:?}", kind, thread.id(), thread.name()); + warnln!("{:?} in {} {:?}", kind, thread.id, thread.name); warnln!("CS:RIP = {:#x}:{:#x}", frame.cs, frame.rip); warnln!("SS:RSP = {:#x}:{:#x}", frame.ss, frame.rsp); diff --git a/src/arch/x86_64/syscall.rs b/src/arch/x86_64/syscall.rs index fc46a106..7675b6b4 100644 --- a/src/arch/x86_64/syscall.rs +++ b/src/arch/x86_64/syscall.rs @@ -125,8 +125,9 @@ fn syscall_inner(frame: &mut SyscallFrame) { extern "C" fn __x86_64_syscall_handler(frame: *mut SyscallFrame) { let frame = unsafe { &mut *frame }; - let thread = Thread::current(); syscall_inner(frame); + + let thread = Thread::current(); unsafe { thread.handle_pending_signals(frame); } diff --git a/src/debug.rs b/src/debug.rs index a692daa5..57b3b14d 100644 --- a/src/debug.rs +++ b/src/debug.rs @@ -64,7 +64,7 @@ macro_rules! log_print_raw { macro_rules! log_print { ($level:expr, $($args:tt)+) => { - log_print_raw!($level, "cpu{}:{}:{}: {}", /* $crate::task::Cpu::local_id() */ 0, file!(), line!(), format_args!($($args)+)) + log_print_raw!($level, "cpu{}:{}:{}: {}", <$crate::task::Cpu as $crate::arch::CpuAccess>::local_id(), file!(), line!(), format_args!($($args)+)) }; } diff --git a/src/init.rs b/src/init.rs index 289ee5e8..c0f6a092 100644 --- a/src/init.rs +++ b/src/init.rs @@ -46,8 +46,6 @@ pub fn kinit() -> Result<(), Error> { let mut ioctx = IoContext::new(root); - let devfs = devfs::root(); - { let (user_init, user_init_main) = proc::exec::load(&mut ioctx, "/init", &["/init", "xxx"], &[])?; @@ -56,7 +54,7 @@ pub fn kinit() -> Result<(), Error> { io.set_ioctx(ioctx); drop(io); - user_init_main.enqueue_somewhere(); + user_init_main.enqueue(); } Ok(()) diff --git a/src/mem/phys/mod.rs b/src/mem/phys/mod.rs index 283e829a..3f760516 100644 --- a/src/mem/phys/mod.rs +++ b/src/mem/phys/mod.rs @@ -209,6 +209,11 @@ fn kernel_physical_memory_region() -> PhysicalMemoryRegion { PhysicalMemoryRegion { base, size } } +#[no_mangle] +fn __allocate_page() -> Result { + alloc_page() +} + #[no_mangle] fn __allocate_contiguous_pages(count: usize) -> Result { alloc_pages_contiguous(count) diff --git a/src/syscall/mod.rs b/src/syscall/mod.rs index 6a206946..688355ad 100644 --- a/src/syscall/mod.rs +++ b/src/syscall/mod.rs @@ -63,7 +63,7 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result match func { SyscallFunction::DebugTrace => { let pid = process.id(); - let tid = thread.id(); + let tid = thread.id; let arg = arg_user_str(args[0] as usize, args[1] as usize)?; @@ -422,6 +422,16 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result } }) } + SyscallFunction::CreateSharedMemory => { + let size = args[0] as usize; + let size = size.page_align_up::(); + + run_with_io(process, |mut io| { + let file = File::new_shared_memory(size)?; + let fd = io.files.place_file(file, true)?; + Ok(fd.0 as usize) + }) + } // Process management SyscallFunction::SpawnProcess => { let options = arg_user_ref::(args[0] as usize)?; @@ -475,7 +485,7 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result } drop(child_io); - child_main.enqueue_somewhere(); + child_main.enqueue(); Ok(pid as _) }) @@ -503,8 +513,7 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result } SyscallFunction::ExitThread => { let code = ExitCode::from(args[0] as i32); - thread.exit(code); - unreachable!() + thread.exit(code) } SyscallFunction::ExitProcess => { // A bit different from thread exit: wait for other threads to finish and exit only diff --git a/src/task/mod.rs b/src/task/mod.rs index fa1491d4..a81006a9 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -27,7 +27,7 @@ pub fn spawn_kernel_closure, T: Termination, F: Fn() -> T + Send f: F, ) -> Result<(), Error> { let thread = Thread::new_kthread(name, TaskContext::kernel_closure(f)?); - thread.enqueue_somewhere(); + thread.enqueue(); Ok(()) } diff --git a/src/task/process.rs b/src/task/process.rs index 446c4284..39ae143c 100644 --- a/src/task/process.rs +++ b/src/task/process.rs @@ -224,11 +224,11 @@ impl Process { tls_address, )?; let thread = Thread::new_uthread(self.clone(), space, context); - let id = thread.id(); + let id = thread.id; self.inner.lock().threads.push(thread.clone()); - thread.enqueue_somewhere(); + thread.enqueue(); Ok(id) } @@ -335,7 +335,7 @@ impl Process { // TODO make this cleaner let old_len = inner.threads.len(); - inner.threads.retain(|t| t.id() != thread); + inner.threads.retain(|t| t.id != thread); assert_ne!(inner.threads.len(), old_len); let last_thread = inner.threads.is_empty(); @@ -396,15 +396,15 @@ impl Process { let mut inner = self.inner.lock(); for thread in inner.threads.iter() { - if thread.id() == except { + if thread.id == except { continue; } - infoln!("Terminate thread {}", thread.id()); + infoln!("Terminate thread {}", thread.id); thread.terminate().await; } - inner.threads.retain(|t| t.id() == except); + inner.threads.retain(|t| t.id == except); } } diff --git a/src/task/sched.rs b/src/task/sched.rs index 3c839fcb..b9eba858 100644 --- a/src/task/sched.rs +++ b/src/task/sched.rs @@ -4,7 +4,11 @@ use core::sync::atomic::Ordering; use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; use cfg_if::cfg_if; -use kernel_util::{sync::IrqSafeSpinlock, util::OneTimeInit}; +use crossbeam_queue::SegQueue; +use kernel_util::{ + sync::{IrqGuard, IrqSafeSpinlock}, + util::OneTimeInit, +}; use crate::{ arch::{Architecture, ArchitectureImpl, CpuAccess}, @@ -17,32 +21,184 @@ use super::{ Cpu, TaskContext, }; -/// Per-CPU statistics -#[derive(Default)] -pub struct CpuQueueStats { - /// Ticks spent idling - pub idle_time: u64, - /// Ticks spent running CPU tasks - pub cpu_time: u64, - - /// Time since last measurement - measure_time: u64, -} - -struct CpuQueueInner { - current: Option, - queue: VecDeque, - stats: CpuQueueStats, -} +// /// Per-CPU statistics +// #[derive(Default)] +// pub struct CpuQueueStats { +// /// Ticks spent idling +// pub idle_time: u64, +// /// Ticks spent running CPU tasks +// pub cpu_time: u64, +// +// /// Time since last measurement +// measure_time: u64, +// } /// Per-CPU queue pub struct CpuQueue { - inner: IrqSafeSpinlock, - idle: TaskContext, + queue: SegQueue, index: usize, + idle: TaskContext, } -static QUEUES: OneTimeInit> = OneTimeInit::new(); +impl CpuQueue { + /// Creates a new [CpuQueue] for CPU with given `index` + pub fn new(index: usize) -> Self { + let idle = TaskContext::kernel(__idle, Cpu::local().id() as usize) + .expect("Could not construct an idle task"); + + Self { + queue: SegQueue::new(), + index, + idle, + } + } + + /// Returns the queue's associated CPU index + #[inline] + pub fn index(&self) -> usize { + self.index + } + + /// "Enters" the scheduler by selecting a first task to execute + pub unsafe fn enter(&self) -> ! { + let _guard = IrqGuard::acquire(); + + self.idle.enter() + } + + /// "Pushes" a thread to the queue for execution + pub fn push(&self, tid: ThreadId) { + self.queue.push(tid); + } + + fn pop(&self) -> (Option>, Option) { + while let Some(id) = self.queue.pop() { + let Some(thread) = Thread::get(id) else { + continue; + }; + + let mut sched = thread.sched.lock(); + + assert!(sched.in_queue); + assert!(core::ptr::eq(self, sched.queue.unwrap())); + + match sched.state { + ThreadState::Ready => { + sched.state = ThreadState::Running; + drop(sched); + return (Some(thread), Some(id)); + } + ThreadState::Running => { + panic!("Unexpected state: Running ({:?})", id); + } + ThreadState::Terminated => { + sched.in_queue = false; + sched.queue = None; + thread.set_terminated(); + } + ThreadState::Suspended => { + sched.queue = None; + sched.in_queue = false; + } + } + } + + (None, None) + } + + /// Selects a new thread from the queue and performs a context switch if necessary + pub unsafe fn yield_cpu(&self) -> bool { + assert!(ArchitectureImpl::interrupt_mask()); + + let mut cpu = Cpu::local(); + + let current_id = cpu.current_thread_id(); + let current = current_id.and_then(Thread::get); + + if let Some(current) = current.as_ref() { + let mut sched = current.sched.lock(); + + let q = sched.queue.unwrap(); + assert!(core::ptr::eq(q, self)); + assert!(sched.in_queue); + + match sched.state { + ThreadState::Ready => { + panic!("Thread {:?} was running, but is marked Ready", current.id); + } + ThreadState::Running => { + sched.state = ThreadState::Ready; + + self.queue.push(current.id); + } + ThreadState::Terminated => { + sched.in_queue = false; + sched.queue = None; + + current.set_terminated(); + } + ThreadState::Suspended => { + sched.queue = None; + sched.in_queue = false; + } + } + } + + let (next, next_id) = self.pop(); + + let current_ctx = if let Some(current) = current.as_ref() { + ¤t.context + } else { + &self.idle + }; + + let next_ctx = if let Some(next) = next.as_ref() { + &next.context + } else { + &self.idle + }; + + cpu.set_current_thread_id(next_id); + + if !core::ptr::eq(current_ctx, next_ctx) { + // Perform the switch + next_ctx.switch(current_ctx); + + true + } else { + false + } + } + + /// Returns a queue for given CPU `index` + pub fn for_cpu(index: usize) -> &'static CpuQueue { + &QUEUES.get()[index] + } + + /// Returns a queue for the local CPU + pub fn local() -> &'static CpuQueue { + Cpu::local().queue() + } + + /// Returns the preferred queue for a thread to be scheduled into. Takes affinity mask into + /// account when picking a queue. + pub fn for_affinity_mask(mask: u64) -> &'static CpuQueue { + debug_assert_ne!(mask, 0); + + QUEUES + .get() + .iter() + .filter(|c| mask & (1 << c.index) != 0) + .min_by_key(|c| c.queue.len()) + .unwrap() + } + + /// Returns `true` if the queue is local to the current CPU + pub fn is_local(&self) -> bool { + assert!(ArchitectureImpl::interrupt_mask()); + core::ptr::eq(Self::local(), self) + } +} #[naked] extern "C" fn __idle(_x: usize) -> ! { @@ -63,206 +219,7 @@ extern "C" fn __idle(_x: usize) -> ! { } } -impl CpuQueueStats { - /// Reset the stats to zero values - pub fn reset(&mut self) { - self.cpu_time = 0; - self.idle_time = 0; - } -} - -impl CpuQueueInner { - /// Picks a next task for execution, skipping (dropping) those that were suspended. May return - /// None if the queue is empty or no valid task was found, in which case the scheduler should - /// go idle. - pub fn next_ready_task(&mut self) -> Option> { - #[allow(clippy::never_loop)] - while !self.queue.is_empty() { - let task = self.queue.pop_front().unwrap(); - let task_t = Thread::get(task).unwrap(); - - match task_t.state.load(Ordering::Acquire) { - ThreadState::Ready => { - return Some(task_t); - } - e => panic!( - "Unexpected thread state in CpuQueue: {:?} ({} {:?}, cpu_id={})", - e, - task_t.id(), - task_t.name(), - 1234 // task_t.cpu_id() - ), - } - } - - None - } - - /// Returns an iterator over all the threads in the queue plus the currently running thread, - /// if there is one. - pub fn iter(&self) -> impl Iterator { - Iterator::chain(self.queue.iter(), self.current.iter()) - } -} - -impl CpuQueue { - /// Constructs an empty queue with its own idle task - pub fn new(index: usize) -> Self { - let idle = TaskContext::kernel(__idle, Cpu::local().id() as usize) - .expect("Could not construct an idle task"); - - Self { - inner: { - IrqSafeSpinlock::new(CpuQueueInner { - current: None, - queue: VecDeque::new(), - stats: CpuQueueStats::default(), - }) - }, - idle, - index, - } - } - - /// Starts queue execution on current CPU. - /// - /// # Safety - /// - /// Only meant to be called from [crate::task::enter()] function. - pub unsafe fn enter(&self) -> ! { - assert!(ArchitectureImpl::interrupt_mask()); - // Start from idle thread to avoid having a Arc stuck here without getting dropped - // let t = CNTPCT_EL0.get(); - // self.lock().stats.measure_time = t; - self.idle.enter() - } - - /// Yields CPU execution to the next task in queue (or idle task if there aren't any). - /// - /// # Safety - /// - /// The function is only meant to be called from kernel threads (e.g. if they want to yield - /// CPU execution to wait for something) or interrupt handlers. - pub unsafe fn yield_cpu(&self) { - // Will also disable interrupts in this section - let cpu = Cpu::local(); - - let mut inner = self.inner.lock(); - - let current = inner.current; - let current_t = current.and_then(Thread::get); - - if let Some(current_t) = current_t.as_ref() { - if current_t.state.load(Ordering::Acquire) == ThreadState::Terminated { - current_t.exit_notify.wake_all(); - } - - if current_t - .state - .compare_exchange( - ThreadState::Running, - ThreadState::Ready, - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_ok() - { - inner.queue.push_back(current_t.id()); - } - } - - let next_t = inner.next_ready_task(); - - inner.current = next_t.as_deref().map(Thread::id); - - // Can drop the lock, we hold current and next Arc's - drop(inner); - - let (from, _from_rc) = if let Some(current_t) = current_t.as_ref() { - (current_t.context(), Arc::strong_count(current_t)) - } else { - (&self.idle, 0) - }; - - let (to, _to_rc) = if let Some(next_t) = next_t.as_ref() { - next_t.set_running(cpu.id()); - (next_t.context(), Arc::strong_count(next_t)) - } else { - (&self.idle, 0) - }; - - assert!(ArchitectureImpl::interrupt_mask()); - to.switch(from) - } - - /// Pushes the process to the back of the execution queue. - /// - /// # Safety - /// - /// Only meant to be called from Process impl. The function does not set any process accounting - /// information, which may lead to invalid states. - pub unsafe fn enqueue(&self, tid: ThreadId) { - let mut inner = self.inner.lock(); - assert!(ArchitectureImpl::interrupt_mask()); - // assert_eq!(p.state(), ProcessState::Ready); - - inner.queue.push_back(tid); - } - - /// Removes thread with given TID from the exeuction queue. - pub fn dequeue(&self, tid: ThreadId) { - assert!(ArchitectureImpl::interrupt_mask()); - - let mut inner = self.inner.lock(); - inner.queue.retain(|&p| p != tid) - } - - /// Returns the queue length at this moment. - /// - /// # Note - /// - /// This value may immediately change. - pub fn len(&self) -> usize { - self.inner.lock().queue.len() - } - - /// Returns `true` if the queue is empty at the moment. - /// - /// # Note - /// - /// This may immediately change. - pub fn is_empty(&self) -> bool { - self.inner.lock().queue.is_empty() - } - - /// Returns the current [ThreadId], or [None] if idle - pub fn current_id(&self) -> Option { - self.inner.lock().current - } - - /// Returns a queue for given CPU index - pub fn for_cpu(id: usize) -> &'static CpuQueue { - &QUEUES.get()[id] - } - - /// Returns an iterator over all queues of the system - #[inline] - pub fn all() -> impl Iterator { - QUEUES.get().iter() - } - - /// Picks a queue with the least amount of tasks in it - pub fn least_loaded() -> Option<(usize, &'static CpuQueue)> { - let queues = QUEUES.get(); - - queues.iter().enumerate().min_by_key(|(_, q)| q.len()) - } - - /// Returns the CPU index of the queue - pub fn index(&self) -> usize { - self.index - } -} +static QUEUES: OneTimeInit> = OneTimeInit::new(); /// Initializes the global queue list pub fn init_queues(queues: Vec) { diff --git a/src/task/thread.rs b/src/task/thread.rs index 5beb802f..fe12c438 100644 --- a/src/task/thread.rs +++ b/src/task/thread.rs @@ -1,3 +1,5 @@ +// TODO XXX TODO XXX +#![allow(missing_docs)] //! Thread data structures and management use core::{ @@ -5,7 +7,7 @@ use core::{ mem::size_of, ops::Deref, pin::Pin, - sync::atomic::{AtomicU64, Ordering}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, task::{Context, Poll}, }; @@ -13,17 +15,14 @@ use abi::{ error::Error, process::{ExitCode, Signal, SignalEntryData}, }; -use alloc::{ - collections::{BTreeMap, VecDeque}, - string::String, - sync::Arc, -}; +use alloc::{collections::BTreeMap, string::String, sync::Arc}; use atomic_enum::atomic_enum; -use futures_util::{task::ArcWake, Future}; +use crossbeam_queue::SegQueue; +use futures_util::Future; use kernel_util::{ block, runtime::QueueWaker, - sync::{IrqGuard, IrqSafeSpinlock}, + sync::{spin_rwlock::IrqSafeRwLock, IrqGuard, IrqSafeSpinlock}, }; use crate::{ @@ -61,35 +60,90 @@ pub enum ThreadId { #[repr(C)] pub struct CurrentThread(Arc, IrqGuard); +#[derive(Debug)] +#[repr(transparent)] +pub struct ThreadAffinity(AtomicU64); + struct SignalEntry { entry: usize, stack: usize, } struct ThreadInner { - queue: Option<&'static CpuQueue>, - signal_entry: Option, - signal_stack: VecDeque, } +struct GlobalThreadList { + data: BTreeMap>, +} + +pub struct ThreadSchedulingInfo { + pub state: ThreadState, + + pub in_queue: bool, + pub queue: Option<&'static CpuQueue>, +} + +static THREADS: IrqSafeRwLock = IrqSafeRwLock::new(GlobalThreadList::new()); + /// Describes a single thread within the system pub struct Thread { - context: TaskContext, - name: Option, - id: ThreadId, - - pub(super) state: AtomicThreadState, + pub id: ThreadId, + pub name: Option, + pub sched: IrqSafeSpinlock, + pub context: TaskContext, process: Option>, space: Option>, inner: IrqSafeSpinlock, + signal_queue: SegQueue, + pub(super) terminated: AtomicBool, pub(super) exit_notify: Arc, + pub affinity: ThreadAffinity, } -static THREADS: IrqSafeSpinlock>> = - IrqSafeSpinlock::new(BTreeMap::new()); +impl GlobalThreadList { + pub const fn new() -> Self { + Self { + data: BTreeMap::new(), + } + } + + #[inline] + pub fn get(&self, id: ThreadId) -> Option<&Arc> { + self.data.get(&id) + } + + pub fn insert(&mut self, thread: Arc) { + let id = thread.id; + debug_assert!(!self.data.contains_key(&id)); + self.data.insert(id, thread); + } +} + +impl ThreadAffinity { + pub const ANY_CPU: u64 = u64::MAX; + + pub const fn any_cpu() -> Self { + Self(AtomicU64::new(Self::ANY_CPU)) + } + + pub const fn only_cpu(index: usize) -> Self { + Self(AtomicU64::new(1 << index)) + } + + #[inline] + pub fn get(&self) -> u64 { + self.0.load(Ordering::Relaxed) + } + + #[inline] + pub fn set(&self, value: u64) { + debug_assert_ne!(value, 0); + self.0.store(value, Ordering::Relaxed); + } +} impl Thread { fn new( @@ -100,30 +154,40 @@ impl Thread { context: TaskContext, ) -> Arc { let thread = Arc::new(Self { - context, - name, id, - - state: AtomicThreadState::new(ThreadState::Suspended), - + name, + sched: IrqSafeSpinlock::new(ThreadSchedulingInfo { + state: ThreadState::Suspended, + in_queue: false, + queue: None, + }), + context, process, space, - inner: IrqSafeSpinlock::new(ThreadInner { - queue: None, - - signal_stack: VecDeque::new(), - signal_entry: None, - }), - + inner: IrqSafeSpinlock::new(ThreadInner { signal_entry: None }), + signal_queue: SegQueue::new(), exit_notify: Arc::new(QueueWaker::new()), + terminated: AtomicBool::new(false), + affinity: ThreadAffinity::any_cpu(), }); - THREADS.lock().insert(id, thread.clone()); + THREADS.write().insert(thread.clone()); thread } + /// Constructs a new kernel-space thread + pub fn new_kthread>(name: S, context: TaskContext) -> Arc { + Self::new( + ThreadId::next_kernel(), + Some(name.into()), + None, + None, + context, + ) + } + /// Constructs a new user-space thread pub fn new_uthread( parent: Arc, @@ -139,53 +203,151 @@ impl Thread { ) } - /// Constructs a new kernel-space thread - pub fn new_kthread>(name: S, context: TaskContext) -> Arc { - Self::new( - ThreadId::next_kernel(), - Some(name.into()), - None, - None, - context, - ) + // Get/Set + + /// Updates the thread affinity to run on a specific set of CPUs + pub fn set_affinity(&self, affinity: u64) { + self.affinity.set(affinity); } - // Info - - /// Returns the thread's ID - pub fn id(&self) -> ThreadId { - self.id + /// Updates the thread signal entry/stack information + pub fn set_signal_entry(&self, entry: usize, stack: usize) { + let mut inner = self.inner.lock(); + inner.signal_entry.replace(SignalEntry { entry, stack }); } - /// Returns the thread's name, if set - pub fn name(&self) -> Option<&String> { - self.name.as_ref() - } - - /// Returns the thread's [TaskContext] - pub fn context(&self) -> &TaskContext { - &self.context - } - - /// Returns the thread's [ProcessAddressSpace] reference. - /// - /// # Panics - /// - /// Will panic if the thread has no associated address space. - pub fn address_space(&self) -> &Arc { + /// Returns the thread address space (usually provided by its parent process). If none exists, + /// panics. + pub fn address_space(&self) -> &ProcessAddressSpace { self.space.as_ref().unwrap() } - /// Returns the thread's parent [Process] reference. - /// - /// # Panics - /// - /// Will panic if the thread has no process associated (i.e. it's a kernel thread). + /// Returns the thread's parent process, panics if there's none pub fn process(&self) -> &Arc { self.process.as_ref().unwrap() } - // Queue operation + /// Updates the thread's terminated status and wakes up any other threads waiting for it to + /// exit + pub fn set_terminated(&self) { + if !self.terminated.swap(true, Ordering::Release) { + self.exit_notify.wake_all(); + } + } + + // Signals + + /// Pushes a signal to the thread's signal queue + pub fn raise_signal(&self, signal: Signal) { + self.signal_queue.push(signal); + } + + // Scheduling + + /// Changes thread state to "Ready" and inserts it into given `queue`, if it's not yet in one + pub fn enqueue_to(&self, queue: &'static CpuQueue) { + let mut sched = self.sched.lock(); + + assert_ne!(sched.state, ThreadState::Terminated); + + match sched.state { + ThreadState::Running | ThreadState::Ready => { + assert!(sched.in_queue); + } + ThreadState::Suspended => { + sched.state = ThreadState::Ready; + + if !sched.in_queue { + assert!(sched.queue.is_none()); + + sched.in_queue = true; + sched.queue = Some(queue); + + queue.push(self.id); + } + } + ThreadState::Terminated => panic!("Cannot enqueue a terminated thread"), + } + } + + /// Changes thread state to `state` and removes it from its queue. If the thread is currently + /// running on local CPU, will yield control to the next thread + pub fn dequeue(&self, state: ThreadState) { + let mut sched = self.sched.lock(); + + debug_assert_ne!(state, ThreadState::Running); + debug_assert_ne!(state, ThreadState::Ready); + + let old_state = sched.state; + sched.state = state; + + if let Some(queue) = sched.queue { + match (queue.is_local(), old_state) { + (true, ThreadState::Running) => unsafe { + debug_assert!(sched.in_queue); + drop(sched); + queue.yield_cpu(); + }, + (false, ThreadState::Running) => { + debugln!("deq remote {:?}", self.id); + debug_assert!(sched.in_queue); + } + (_, ThreadState::Ready) => { + debug_assert!(sched.in_queue); + } + (_, ThreadState::Suspended | ThreadState::Terminated) => { + todo!() + } + } + } else { + assert!(!sched.in_queue); + assert_ne!(old_state, ThreadState::Running); + assert_ne!(old_state, ThreadState::Ready); + } + } + + /// Inserts the thread as "Ready" into the best queue (based on affinity and queue load) + pub fn enqueue(&self) { + let queue = CpuQueue::for_affinity_mask(self.affinity.get()); + self.enqueue_to(queue); + } + + fn is_terminated(&self) -> bool { + self.terminated.load(Ordering::Acquire) + } + + /// Waits until thread is terminated and removed from scheduling queues + pub fn wait_for_exit(self: &Arc) -> impl Future { + struct F(Arc); + + impl Future for F { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let F(thread) = self.deref(); + + thread.exit_notify.register(cx.waker()); + + if thread.is_terminated() { + thread.exit_notify.remove(cx.waker()); + Poll::Ready(()) + } else { + Poll::Pending + } + } + } + + infoln!("wait_for_exit: pending {:?}", self.id); + F(self.clone()) + } + + /// Requests thread termination and blocks until said thread finishes fully + pub async fn terminate(self: &Arc) { + // Will not abort the execution: called from another thread + self.dequeue(ThreadState::Terminated); + + self.wait_for_exit().await + } /// Returns the current thread on the CPU. /// @@ -201,204 +363,30 @@ impl Thread { pub fn get_current() -> Option { // IrqGuard is held throughout let cpu = Cpu::local(); - let thread = cpu.queue().current_id().and_then(Self::get); + let thread = cpu.current_thread_id().and_then(Self::get); thread.map(|t| CurrentThread(t, cpu.into_guard())) } - /// Enqueues the thread onto any (usually the least loaded) CPU queue and returns its index. - /// See [Thread::enqueue_to]. - pub fn enqueue_somewhere(&self) -> usize { - // Doesn't have to be precise, so even if something changes, we can still be rebalanced - // to another CPU - let (index, queue) = CpuQueue::least_loaded().unwrap(); - - self.enqueue_to(queue); - - index - } - - /// Enqueues the thread onto the specific CPU's queue. - /// - /// # Panics - /// - /// Will panic if the process is in some weird state while being queued. - pub fn enqueue_to(&self, queue: &'static CpuQueue) { - let _irq = IrqGuard::acquire(); - - { - let mut inner = self.inner.lock(); - let old_queue = inner.queue.replace(queue); - if old_queue.is_some() { - // Already in some queue - return; - } - } - - match self.state.compare_exchange( - ThreadState::Suspended, - ThreadState::Ready, - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Err(ThreadState::Terminated) => { - // Process might've been killed while `await`ing in a `block!` - debugln!( - "Thread {} {:?} already terminated, dropping", - self.id(), - self.name() - ); - } - Err(state) => { - todo!("Unexpected process state when enqueueing: {:?}", state) - } - Ok(_) => unsafe { - queue.enqueue(self.id); - }, - } - } - - // TODO maybe separate dequeue for current and "other" threads - fn dequeue(&self, new_state: ThreadState) { - let _irq = IrqGuard::acquire(); - assert_ne!(new_state, ThreadState::Ready); - assert_ne!(new_state, ThreadState::Running); - - let current_state = self.state.swap(new_state, Ordering::SeqCst); - let mut inner = self.inner.lock(); - let proc_queue = inner.queue.take().unwrap(); - - proc_queue.dequeue(self.id()); - - match current_state { - // NOTE: I'm not sure if the process could've been queued between the store and this - // but most likely not (if I'm not that bad with atomics) - // Do nothing, its queue will just drop the process - ThreadState::Ready => (), - // Do nothing, not in a queue already - ThreadState::Suspended => (), - ThreadState::Terminated => panic!("Thread is terminated"), - ThreadState::Running => { - let queue = Cpu::local().queue(); - - if core::ptr::eq(queue, proc_queue) { - drop(inner); - // Suspending a process running on local CPU - unsafe { queue.yield_cpu() } - } else { - todo!(); - } - } - } - } - - /// Marks the thread as running and sets its "current" CPU index. - /// - /// # Safety - /// - /// Only meant to be called from within the scheduler. - pub unsafe fn set_running(&self, _cpu: u32) { - self.state.store(ThreadState::Running, Ordering::Release); - } - - // Accounting - - /// Returns the thread with given [ThreadId], if it exists pub fn get(id: ThreadId) -> Option> { - THREADS.lock().get(&id).cloned() - } - - // Thread inner - - /// Changes the thread's signal entry point information - pub fn set_signal_entry(&self, entry: usize, stack: usize) { - let mut inner = self.inner.lock(); - inner.signal_entry.replace(SignalEntry { entry, stack }); - } - - /// Pushes a [Signal] onto the thread's signal stack. - /// - /// When executed on a current thread, the signal is guaranteed to be handled exactly before - /// returning to user context (i.e. from syscall). Otherwise, the signal handling order and - /// whether it will be delivered at all is not guaranteed. - pub fn raise_signal(self: &Arc, signal: Signal) { - self.inner.lock().signal_stack.push_back(signal); - - if self.state.load(Ordering::Acquire) == ThreadState::Suspended { - self.clone().enqueue_somewhere(); - } - } - - /// Requests thread termination and blocks until said thread finishes fully: - pub fn terminate(self: &Arc) -> impl Future { - struct F(Arc); - - impl Future for F { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let F(thread) = self.deref(); - - thread.exit_notify.register(cx.waker()); - - if thread.state.load(Ordering::Acquire) == ThreadState::Terminated { - thread.exit_notify.remove(cx.waker()); - Poll::Ready(()) - } else { - Poll::Pending - } - } - } - - // Will not abort the execution: called from another thread - self.dequeue(ThreadState::Terminated); - - F(self.clone()) - } -} - -impl ArcWake for Thread { - fn wake_by_ref(arc_self: &Arc) { - arc_self.clone().enqueue_somewhere(); + THREADS.read().get(id).cloned() } } impl CurrentThread { - fn dequeue_terminate(&self, code: ExitCode) { - self.state - .compare_exchange( - ThreadState::Running, - ThreadState::Terminated, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .unwrap(); - + /// Terminate the current thread + pub fn exit(&self, code: ExitCode) -> ! { if let Some(process) = self.process.as_ref() { - process.handle_thread_exit(self.id(), code); + process.handle_thread_exit(self.id, code); } - - let queue = Cpu::local().queue(); - - queue.dequeue(self.id()); - unsafe { queue.yield_cpu() } - + self.dequeue(ThreadState::Terminated); unreachable!() } - /// Terminate the current thread - pub fn exit(&self, code: ExitCode) { - // May already have been terminated by process exit - if self.state.load(Ordering::Acquire) == ThreadState::Terminated { - return; - } - - self.dequeue_terminate(code) - } - + // TODO: test multithreaded process exit /// Terminate the parent process of the thread, including all other threads and the current /// thread itself - pub fn exit_process(&self, code: ExitCode) { + pub fn exit_process(&self, code: ExitCode) -> ! { let _guard = IrqGuard::acquire(); let process = self @@ -409,20 +397,17 @@ impl CurrentThread { let p = process.clone(); block! { - p.terminate_others(self.id()).await; + p.terminate_others(self.id).await; } .unwrap(); - self.exit(code); - unreachable!(); + self.exit(code) } - /// Suspends the current thread until it is waken up again. Guaranteed to happen immediately. pub fn suspend(&self) -> Result<(), Error> { self.dequeue(ThreadState::Suspended); - let inner = self.inner.lock(); - if !inner.signal_stack.is_empty() { + if !self.signal_queue.is_empty() { return Err(Error::Interrupted); } @@ -440,9 +425,9 @@ impl CurrentThread { return; } - let mut inner = self.inner.lock(); + if let Some(signal) = self.signal_queue.pop() { + let inner = self.inner.lock(); - if let Some(signal) = inner.signal_stack.pop_front() { let Some(entry) = inner.signal_entry.as_ref() else { todo!(); }; @@ -546,7 +531,7 @@ fn __create_kthread( #[no_mangle] fn __enqueue(t: &Arc) { - t.enqueue_somewhere(); + t.enqueue(); } #[no_mangle] @@ -562,11 +547,10 @@ fn __suspend_current(t: &CurrentThread) -> Result<(), Error> { #[no_mangle] fn __exit_current(t: &CurrentThread, code: ExitCode) -> ! { t.exit(code); - unreachable!(); } #[no_mangle] unsafe fn __yield() { let _irq = IrqGuard::acquire(); - Cpu::local().queue().yield_cpu() + Cpu::local().queue().yield_cpu(); }