proc: shared memory + scheduler rework

This commit is contained in:
Mark Poliakov 2024-01-02 14:01:33 +02:00
parent ae7ba554d4
commit 69c73454c1
20 changed files with 732 additions and 524 deletions

View File

@ -7,7 +7,7 @@ use core::{
use yggdrasil_abi::error::Error;
use crate::api::{__allocate_contiguous_pages, __free_page, __physicalize};
use crate::api::{self, __allocate_contiguous_pages, __free_page, __physicalize};
use self::address::{AsPhysicalAddress, PhysicalAddress};
@ -254,3 +254,7 @@ impl<T: ?Sized + fmt::Display> fmt::Display for PageBox<T> {
unsafe impl<T: ?Sized + Send> Send for PageBox<T> {}
unsafe impl<T: ?Sized + Sync> Sync for PageBox<T> {}
pub fn allocate_page() -> Result<PhysicalAddress, Error> {
unsafe { api::__allocate_page() }
}

View File

@ -6,6 +6,7 @@ use yggdrasil_abi::error::Error;
pub mod fence;
pub mod guard;
pub mod mutex;
pub mod spin_rwlock;
pub mod spinlock;
pub use fence::SpinFence;

View File

@ -0,0 +1,162 @@
use core::{
cell::UnsafeCell,
ops::{Deref, DerefMut},
sync::atomic::{AtomicUsize, Ordering},
};
use yggdrasil_abi::error::Error;
use super::{IrqGuard, LockMethod};
struct RwLockInner {
value: AtomicUsize,
}
pub struct IrqSafeRwLock<T> {
value: UnsafeCell<T>,
inner: RwLockInner,
}
pub struct IrqSafeRwLockReadGuard<'a, T> {
lock: &'a IrqSafeRwLock<T>,
guard: IrqGuard,
}
pub struct IrqSafeRwLockWriteGuard<'a, T> {
lock: &'a IrqSafeRwLock<T>,
guard: IrqGuard,
}
impl RwLockInner {
const LOCKED_READ: usize = 1 << 2;
const LOCKED_WRITE: usize = 1;
const fn new() -> Self {
Self {
value: AtomicUsize::new(0),
}
}
#[inline]
fn acquire_read_raw(&self) -> usize {
let value = self.value.fetch_add(Self::LOCKED_READ, Ordering::Acquire);
if value > usize::MAX / 2 {
self.value.fetch_sub(Self::LOCKED_READ, Ordering::Relaxed);
panic!("Too many read locks acquired");
}
value
}
#[inline]
fn try_acquire_read(&self) -> bool {
let value = self.acquire_read_raw();
let acquired = value & Self::LOCKED_WRITE != Self::LOCKED_WRITE;
if !acquired {
unsafe {
self.release_read();
}
}
acquired
}
#[inline]
fn try_acquire_write(&self) -> bool {
self.value
.compare_exchange(0, Self::LOCKED_WRITE, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
#[inline]
fn acquire_read(&self) {
while !self.try_acquire_read() {
core::hint::spin_loop();
}
}
#[inline]
fn acquire_write(&self) {
while !self.try_acquire_write() {
core::hint::spin_loop();
}
}
#[inline]
unsafe fn release_read(&self) {
self.value.fetch_sub(Self::LOCKED_READ, Ordering::Release);
}
#[inline]
unsafe fn release_write(&self) {
self.value.fetch_and(!Self::LOCKED_WRITE, Ordering::Release);
}
}
impl<T> IrqSafeRwLock<T> {
pub const fn new(value: T) -> Self {
Self {
value: UnsafeCell::new(value),
inner: RwLockInner::new(),
}
}
pub fn read(&self) -> IrqSafeRwLockReadGuard<T> {
let guard = IrqGuard::acquire();
self.inner.acquire_read();
IrqSafeRwLockReadGuard { lock: self, guard }
}
pub fn write(&self) -> IrqSafeRwLockWriteGuard<T> {
let guard = IrqGuard::acquire();
self.inner.acquire_write();
IrqSafeRwLockWriteGuard { lock: self, guard }
}
unsafe fn release_read(&self) {
self.inner.release_read();
}
unsafe fn release_write(&self) {
self.inner.release_write();
}
}
unsafe impl<T> Sync for IrqSafeRwLock<T> {}
unsafe impl<T> Send for IrqSafeRwLock<T> {}
impl<'a, T> Deref for IrqSafeRwLockReadGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.value.get() }
}
}
impl<'a, T> Drop for IrqSafeRwLockReadGuard<'a, T> {
fn drop(&mut self) {
unsafe { self.lock.release_read() }
}
}
impl<'a, T> Deref for IrqSafeRwLockWriteGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.lock.value.get() }
}
}
impl<'a, T> DerefMut for IrqSafeRwLockWriteGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.lock.value.get() }
}
}
impl<'a, T> Drop for IrqSafeRwLockWriteGuard<'a, T> {
fn drop(&mut self) {
unsafe { self.lock.release_write() }
}
}

View File

@ -24,7 +24,7 @@ use crate::{
device::{BlockDeviceWrapper, CharDeviceWrapper},
node::NodeRef,
traits::{Read, Seek, Write},
FdPoll, FileReadiness,
FdPoll, FileReadiness, SharedMemory,
};
use self::{
@ -64,6 +64,7 @@ pub enum File {
AnonymousPipe(PipeEnd),
Poll(FdPoll),
Channel(ChannelDescriptor),
SharedMemory(Arc<SharedMemory>),
}
/// Contains a per-process fd -> FileRef map
@ -92,6 +93,12 @@ impl File {
Arc::new(Self::Channel(channel))
}
/// Creates a buffer of shared memory and associates a [File] with it
pub fn new_shared_memory(size: usize) -> Result<Arc<Self>, Error> {
let shm = SharedMemory::new(size)?;
Ok(Arc::new(Self::SharedMemory(Arc::new(shm))))
}
pub(crate) fn directory(node: NodeRef, position: DirectoryOpenPosition) -> Arc<Self> {
let position = IrqSafeSpinlock::new(position.into());
Arc::new(Self::Directory(DirectoryFile { node, position }))
@ -166,6 +173,7 @@ impl File {
pub fn send(&self) -> Result<Arc<Self>, Error> {
match self {
Self::Regular(file) => Ok(Arc::new(Self::Regular(file.clone()))),
Self::SharedMemory(shm) => Ok(Arc::new(Self::SharedMemory(shm.clone()))),
_ => Err(Error::InvalidOperation),
}
}
@ -185,7 +193,7 @@ impl File {
Self::Regular(file) => Some(&file.node),
Self::Block(file) => Some(&file.node),
Self::Char(file) => Some(&file.node),
Self::AnonymousPipe(_) | Self::Poll(_) | Self::Channel(_) => None,
_ => None,
}
}
@ -222,6 +230,7 @@ impl PageProvider for File {
fn get_page(&self, offset: u64) -> Result<PhysicalAddress, Error> {
match self {
Self::Block(f) => f.device.0.get_page(offset),
Self::SharedMemory(f) => f.get_page(offset),
_ => Err(Error::InvalidOperation),
}
}
@ -229,6 +238,7 @@ impl PageProvider for File {
fn release_page(&self, offset: u64, phys: PhysicalAddress) -> Result<(), Error> {
match self {
Self::Block(f) => f.device.0.release_page(offset, phys),
Self::SharedMemory(f) => f.release_page(offset, phys),
_ => Err(Error::InvalidOperation),
}
}
@ -245,6 +255,7 @@ impl Read for File {
Self::Poll(_) => Err(Error::InvalidOperation),
// TODO maybe allow reading messages from Channels?
Self::Channel(_) => Err(Error::InvalidOperation),
Self::SharedMemory(_) => Err(Error::InvalidOperation),
Self::Directory(_) => Err(Error::IsADirectory),
}
}
@ -261,6 +272,7 @@ impl Write for File {
Self::Poll(_) => Err(Error::InvalidOperation),
// TODO maybe allow writing messages to Channels?
Self::Channel(_) => Err(Error::InvalidOperation),
Self::SharedMemory(_) => Err(Error::InvalidOperation),
Self::Directory(_) => Err(Error::IsADirectory),
}
}
@ -271,10 +283,8 @@ impl Seek for File {
match self {
Self::Regular(file) => Ok(*file.position.lock()),
Self::Block(file) => Ok(*file.position.lock()),
Self::Char(_) | Self::AnonymousPipe(_) | Self::Poll(_) | Self::Channel(_) => {
Err(Error::InvalidOperation)
}
Self::Directory(_) => Err(Error::IsADirectory),
_ => Err(Error::InvalidOperation),
}
}
@ -282,10 +292,8 @@ impl Seek for File {
match self {
Self::Regular(file) => file.seek(from),
Self::Block(file) => file.seek(from),
Self::Char(_) | Self::AnonymousPipe(_) | Self::Poll(_) | Self::Channel(_) => {
Err(Error::InvalidOperation)
}
Self::Directory(_) => Err(Error::IsADirectory),
_ => Err(Error::InvalidOperation),
}
}
}
@ -314,6 +322,7 @@ impl fmt::Debug for File {
Self::AnonymousPipe(_) => f.debug_struct("AnonymousPipe").finish_non_exhaustive(),
Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
Self::Channel(_) => f.debug_struct("Channel").finish_non_exhaustive(),
Self::SharedMemory(_) => f.debug_struct("SharedMemory").finish_non_exhaustive(),
}
}
}

View File

@ -17,6 +17,7 @@ pub(crate) mod ioctx;
pub(crate) mod node;
pub(crate) mod path;
pub(crate) mod poll;
pub(crate) mod shared_memory;
pub(crate) mod traits;
pub use channel::MessagePayload;
@ -28,4 +29,5 @@ pub use node::{
RegularImpl, SymlinkImpl,
};
pub use poll::FdPoll;
pub use shared_memory::SharedMemory;
pub use traits::{FileReadiness, Read, Seek, Write};

View File

@ -0,0 +1,38 @@
use alloc::vec::Vec;
use kernel_util::mem::{address::PhysicalAddress, allocate_page, PageProvider};
use yggdrasil_abi::error::Error;
/// Shared memory VFS object
pub struct SharedMemory {
pages: Vec<PhysicalAddress>,
}
impl SharedMemory {
/// Creates a new buffer of shared memory
pub fn new(size: usize) -> Result<SharedMemory, Error> {
assert_eq!(size & 0xFFF, 0);
let page_count = size / 0x1000;
let pages = (0..page_count)
.map(|_| allocate_page())
.collect::<Result<_, _>>()?;
Ok(Self { pages })
}
}
impl PageProvider for SharedMemory {
fn get_page(&self, offset: u64) -> Result<PhysicalAddress, Error> {
// TODO: magic numbers
let index = (offset / 0x1000) as usize;
self.pages
.get(index)
.copied()
.ok_or(Error::InvalidMemoryOperation)
}
fn release_page(&self, _offset: u64, _phys: PhysicalAddress) -> Result<(), Error> {
// TODO track get/release?
Ok(())
}
}

View File

@ -37,7 +37,7 @@ use kernel_util::{
use crate::{
mem::phys::PhysicalMemoryRegion,
task::{sched::CpuQueue, Cpu},
task::{sched::CpuQueue, thread::ThreadId, Cpu},
};
cfg_if! {
@ -242,11 +242,17 @@ pub trait CpuAccess: Sized {
/// Safe wrapper for accessing local CPU
type Local: LocalCpuAccess<Self>;
/// Returns the local CPU wrapper
fn local() -> Self::Local;
/// Returns the local CPU wrapper or None, if not yet initialized:
fn try_local() -> Option<Self::Local>;
/// Returns the local CPU wrapper
fn local() -> Self::Local {
Self::try_local().expect("Local CPU has not yet been initialized")
}
/// Returns the ID of the local processor or 0 if the processor has not yet been initialized
fn local_id() -> u32;
/// Initializes the CPU's scheduling queue.
///
/// # Panics
@ -254,11 +260,23 @@ pub trait CpuAccess: Sized {
/// Will panic, if the initialization has already been performed.
fn init_queue(&mut self, queue: &'static CpuQueue);
/// Returns the CPU's scheduling queue
fn queue(&self) -> &'static CpuQueue;
/// Returns the CPU's scheduling queue or None if it has not yet been initialized
fn get_queue(&self) -> Option<&'static CpuQueue>;
/// Returns the CPU's scheduling queue or panics if it has not yet been initialized
fn queue(&self) -> &'static CpuQueue {
self.get_queue()
.expect("CPU's queue has not yet been initialized")
}
/// Returns the CPU index
fn id(&self) -> u32;
/// Returns current thread ID or none if idle
fn current_thread_id(&self) -> Option<ThreadId>;
/// Update the current thread ID
unsafe fn set_current_thread_id(&mut self, id: Option<ThreadId>);
}
// External API for architecture specifics

View File

@ -100,7 +100,10 @@ unsafe extern "C" fn local_timer_irq_handler(frame: *mut IrqFrame) {
let cpu = Cpu::local();
// Clear interrupt before switching, because otherwise we won't receive the next one
cpu.local_apic().clear_interrupt();
cpu.queue().yield_cpu();
if let Some(queue) = cpu.get_queue() {
queue.yield_cpu();
}
if let Some(thread) = Thread::get_current() {
thread.handle_pending_signals(frame);

View File

@ -120,8 +120,6 @@ pub extern "C" fn __x86_64_ap_entry() -> ! {
// Still not initialized: GDT, IDT, CPU features, syscall, kernel_gs_base
infoln!("cpu{} initializing", cpu_id);
unsafe {
// Cpu::init_local(LocalApic::new(), cpu_id as u32);
// syscall::init_syscall();

View File

@ -17,7 +17,7 @@ use crate::{
x86_64::{cpuid, gdt, registers::MSR_IA32_KERNEL_GS_BASE, syscall},
CpuAccess, CpuMessage, LocalCpuAccess,
},
task::sched::CpuQueue,
task::{sched::CpuQueue, thread::ThreadId},
};
use super::{apic::local::LocalApic, smp::CPU_COUNT};
@ -35,6 +35,7 @@ pub struct Cpu {
id: u32,
local_apic: LocalApic,
current_thread_id: Option<ThreadId>,
queue: OneTimeInit<&'static CpuQueue>,
}
@ -79,8 +80,6 @@ impl Cpu {
///
/// Only meant to be called once per each CPU during their init.
pub(super) unsafe fn init_local(local_apic: LocalApic, id: u32) {
infoln!("Initialize CPU with id {}", id);
// Initialize CPU features
cpuid::enable_features();
@ -93,6 +92,7 @@ impl Cpu {
id,
local_apic,
current_thread_id: None,
queue: OneTimeInit::new(),
});
let this = Box::into_raw(this);
@ -101,6 +101,8 @@ impl Cpu {
MSR_IA32_KERNEL_GS_BASE.set(this as u64);
core::arch::asm!("wbinvd; swapgs");
infoln!("cpu{} initialized", id);
syscall::init_syscall();
}
@ -140,10 +142,7 @@ impl LocalCpu {
impl CpuAccess for Cpu {
type Local = LocalCpu;
fn local() -> Self::Local {
Self::try_local().unwrap()
}
#[inline]
fn try_local() -> Option<Self::Local> {
let mut addr: usize;
let guard = IrqGuard::acquire();
@ -154,16 +153,36 @@ impl CpuAccess for Cpu {
}
}
#[inline]
fn id(&self) -> u32 {
self.id
}
#[inline]
fn current_thread_id(&self) -> Option<ThreadId> {
self.current_thread_id
}
#[inline]
unsafe fn set_current_thread_id(&mut self, id: Option<ThreadId>) {
self.current_thread_id = id;
}
#[inline]
fn local_id() -> u32 {
if let Some(local) = Self::try_local() {
local.id()
} else {
0
}
}
fn init_queue(&mut self, queue: &'static CpuQueue) {
self.queue.init(queue);
}
fn queue(&self) -> &'static CpuQueue {
self.queue.get()
fn get_queue(&self) -> Option<&'static CpuQueue> {
self.queue.try_get().copied()
}
}

View File

@ -286,7 +286,7 @@ static mut IDT: [Entry; SIZE] = [Entry::NULL; SIZE];
fn user_exception_inner(kind: ExceptionKind, frame: &ExceptionFrame) {
let thread = Thread::current();
warnln!("{:?} in {} {:?}", kind, thread.id(), thread.name());
warnln!("{:?} in {} {:?}", kind, thread.id, thread.name);
warnln!("CS:RIP = {:#x}:{:#x}", frame.cs, frame.rip);
warnln!("SS:RSP = {:#x}:{:#x}", frame.ss, frame.rsp);

View File

@ -125,8 +125,9 @@ fn syscall_inner(frame: &mut SyscallFrame) {
extern "C" fn __x86_64_syscall_handler(frame: *mut SyscallFrame) {
let frame = unsafe { &mut *frame };
let thread = Thread::current();
syscall_inner(frame);
let thread = Thread::current();
unsafe {
thread.handle_pending_signals(frame);
}

View File

@ -64,7 +64,7 @@ macro_rules! log_print_raw {
macro_rules! log_print {
($level:expr, $($args:tt)+) => {
log_print_raw!($level, "cpu{}:{}:{}: {}", /* $crate::task::Cpu::local_id() */ 0, file!(), line!(), format_args!($($args)+))
log_print_raw!($level, "cpu{}:{}:{}: {}", <$crate::task::Cpu as $crate::arch::CpuAccess>::local_id(), file!(), line!(), format_args!($($args)+))
};
}

View File

@ -46,8 +46,6 @@ pub fn kinit() -> Result<(), Error> {
let mut ioctx = IoContext::new(root);
let devfs = devfs::root();
{
let (user_init, user_init_main) =
proc::exec::load(&mut ioctx, "/init", &["/init", "xxx"], &[])?;
@ -56,7 +54,7 @@ pub fn kinit() -> Result<(), Error> {
io.set_ioctx(ioctx);
drop(io);
user_init_main.enqueue_somewhere();
user_init_main.enqueue();
}
Ok(())

View File

@ -209,6 +209,11 @@ fn kernel_physical_memory_region() -> PhysicalMemoryRegion {
PhysicalMemoryRegion { base, size }
}
#[no_mangle]
fn __allocate_page() -> Result<PhysicalAddress, Error> {
alloc_page()
}
#[no_mangle]
fn __allocate_contiguous_pages(count: usize) -> Result<PhysicalAddress, Error> {
alloc_pages_contiguous(count)

View File

@ -63,7 +63,7 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
match func {
SyscallFunction::DebugTrace => {
let pid = process.id();
let tid = thread.id();
let tid = thread.id;
let arg = arg_user_str(args[0] as usize, args[1] as usize)?;
@ -422,6 +422,16 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
}
})
}
SyscallFunction::CreateSharedMemory => {
let size = args[0] as usize;
let size = size.page_align_up::<L3>();
run_with_io(process, |mut io| {
let file = File::new_shared_memory(size)?;
let fd = io.files.place_file(file, true)?;
Ok(fd.0 as usize)
})
}
// Process management
SyscallFunction::SpawnProcess => {
let options = arg_user_ref::<SpawnOptions>(args[0] as usize)?;
@ -475,7 +485,7 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
}
drop(child_io);
child_main.enqueue_somewhere();
child_main.enqueue();
Ok(pid as _)
})
@ -503,8 +513,7 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
}
SyscallFunction::ExitThread => {
let code = ExitCode::from(args[0] as i32);
thread.exit(code);
unreachable!()
thread.exit(code)
}
SyscallFunction::ExitProcess => {
// A bit different from thread exit: wait for other threads to finish and exit only

View File

@ -27,7 +27,7 @@ pub fn spawn_kernel_closure<S: Into<String>, T: Termination, F: Fn() -> T + Send
f: F,
) -> Result<(), Error> {
let thread = Thread::new_kthread(name, TaskContext::kernel_closure(f)?);
thread.enqueue_somewhere();
thread.enqueue();
Ok(())
}

View File

@ -224,11 +224,11 @@ impl Process {
tls_address,
)?;
let thread = Thread::new_uthread(self.clone(), space, context);
let id = thread.id();
let id = thread.id;
self.inner.lock().threads.push(thread.clone());
thread.enqueue_somewhere();
thread.enqueue();
Ok(id)
}
@ -335,7 +335,7 @@ impl Process {
// TODO make this cleaner
let old_len = inner.threads.len();
inner.threads.retain(|t| t.id() != thread);
inner.threads.retain(|t| t.id != thread);
assert_ne!(inner.threads.len(), old_len);
let last_thread = inner.threads.is_empty();
@ -396,15 +396,15 @@ impl Process {
let mut inner = self.inner.lock();
for thread in inner.threads.iter() {
if thread.id() == except {
if thread.id == except {
continue;
}
infoln!("Terminate thread {}", thread.id());
infoln!("Terminate thread {}", thread.id);
thread.terminate().await;
}
inner.threads.retain(|t| t.id() == except);
inner.threads.retain(|t| t.id == except);
}
}

View File

@ -4,7 +4,11 @@ use core::sync::atomic::Ordering;
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
use cfg_if::cfg_if;
use kernel_util::{sync::IrqSafeSpinlock, util::OneTimeInit};
use crossbeam_queue::SegQueue;
use kernel_util::{
sync::{IrqGuard, IrqSafeSpinlock},
util::OneTimeInit,
};
use crate::{
arch::{Architecture, ArchitectureImpl, CpuAccess},
@ -17,32 +21,184 @@ use super::{
Cpu, TaskContext,
};
/// Per-CPU statistics
#[derive(Default)]
pub struct CpuQueueStats {
/// Ticks spent idling
pub idle_time: u64,
/// Ticks spent running CPU tasks
pub cpu_time: u64,
/// Time since last measurement
measure_time: u64,
}
struct CpuQueueInner {
current: Option<ThreadId>,
queue: VecDeque<ThreadId>,
stats: CpuQueueStats,
}
// /// Per-CPU statistics
// #[derive(Default)]
// pub struct CpuQueueStats {
// /// Ticks spent idling
// pub idle_time: u64,
// /// Ticks spent running CPU tasks
// pub cpu_time: u64,
//
// /// Time since last measurement
// measure_time: u64,
// }
/// Per-CPU queue
pub struct CpuQueue {
inner: IrqSafeSpinlock<CpuQueueInner>,
idle: TaskContext,
queue: SegQueue<ThreadId>,
index: usize,
idle: TaskContext,
}
static QUEUES: OneTimeInit<Vec<CpuQueue>> = OneTimeInit::new();
impl CpuQueue {
/// Creates a new [CpuQueue] for CPU with given `index`
pub fn new(index: usize) -> Self {
let idle = TaskContext::kernel(__idle, Cpu::local().id() as usize)
.expect("Could not construct an idle task");
Self {
queue: SegQueue::new(),
index,
idle,
}
}
/// Returns the queue's associated CPU index
#[inline]
pub fn index(&self) -> usize {
self.index
}
/// "Enters" the scheduler by selecting a first task to execute
pub unsafe fn enter(&self) -> ! {
let _guard = IrqGuard::acquire();
self.idle.enter()
}
/// "Pushes" a thread to the queue for execution
pub fn push(&self, tid: ThreadId) {
self.queue.push(tid);
}
fn pop(&self) -> (Option<Arc<Thread>>, Option<ThreadId>) {
while let Some(id) = self.queue.pop() {
let Some(thread) = Thread::get(id) else {
continue;
};
let mut sched = thread.sched.lock();
assert!(sched.in_queue);
assert!(core::ptr::eq(self, sched.queue.unwrap()));
match sched.state {
ThreadState::Ready => {
sched.state = ThreadState::Running;
drop(sched);
return (Some(thread), Some(id));
}
ThreadState::Running => {
panic!("Unexpected state: Running ({:?})", id);
}
ThreadState::Terminated => {
sched.in_queue = false;
sched.queue = None;
thread.set_terminated();
}
ThreadState::Suspended => {
sched.queue = None;
sched.in_queue = false;
}
}
}
(None, None)
}
/// Selects a new thread from the queue and performs a context switch if necessary
pub unsafe fn yield_cpu(&self) -> bool {
assert!(ArchitectureImpl::interrupt_mask());
let mut cpu = Cpu::local();
let current_id = cpu.current_thread_id();
let current = current_id.and_then(Thread::get);
if let Some(current) = current.as_ref() {
let mut sched = current.sched.lock();
let q = sched.queue.unwrap();
assert!(core::ptr::eq(q, self));
assert!(sched.in_queue);
match sched.state {
ThreadState::Ready => {
panic!("Thread {:?} was running, but is marked Ready", current.id);
}
ThreadState::Running => {
sched.state = ThreadState::Ready;
self.queue.push(current.id);
}
ThreadState::Terminated => {
sched.in_queue = false;
sched.queue = None;
current.set_terminated();
}
ThreadState::Suspended => {
sched.queue = None;
sched.in_queue = false;
}
}
}
let (next, next_id) = self.pop();
let current_ctx = if let Some(current) = current.as_ref() {
&current.context
} else {
&self.idle
};
let next_ctx = if let Some(next) = next.as_ref() {
&next.context
} else {
&self.idle
};
cpu.set_current_thread_id(next_id);
if !core::ptr::eq(current_ctx, next_ctx) {
// Perform the switch
next_ctx.switch(current_ctx);
true
} else {
false
}
}
/// Returns a queue for given CPU `index`
pub fn for_cpu(index: usize) -> &'static CpuQueue {
&QUEUES.get()[index]
}
/// Returns a queue for the local CPU
pub fn local() -> &'static CpuQueue {
Cpu::local().queue()
}
/// Returns the preferred queue for a thread to be scheduled into. Takes affinity mask into
/// account when picking a queue.
pub fn for_affinity_mask(mask: u64) -> &'static CpuQueue {
debug_assert_ne!(mask, 0);
QUEUES
.get()
.iter()
.filter(|c| mask & (1 << c.index) != 0)
.min_by_key(|c| c.queue.len())
.unwrap()
}
/// Returns `true` if the queue is local to the current CPU
pub fn is_local(&self) -> bool {
assert!(ArchitectureImpl::interrupt_mask());
core::ptr::eq(Self::local(), self)
}
}
#[naked]
extern "C" fn __idle(_x: usize) -> ! {
@ -63,206 +219,7 @@ extern "C" fn __idle(_x: usize) -> ! {
}
}
impl CpuQueueStats {
/// Reset the stats to zero values
pub fn reset(&mut self) {
self.cpu_time = 0;
self.idle_time = 0;
}
}
impl CpuQueueInner {
/// Picks a next task for execution, skipping (dropping) those that were suspended. May return
/// None if the queue is empty or no valid task was found, in which case the scheduler should
/// go idle.
pub fn next_ready_task(&mut self) -> Option<Arc<Thread>> {
#[allow(clippy::never_loop)]
while !self.queue.is_empty() {
let task = self.queue.pop_front().unwrap();
let task_t = Thread::get(task).unwrap();
match task_t.state.load(Ordering::Acquire) {
ThreadState::Ready => {
return Some(task_t);
}
e => panic!(
"Unexpected thread state in CpuQueue: {:?} ({} {:?}, cpu_id={})",
e,
task_t.id(),
task_t.name(),
1234 // task_t.cpu_id()
),
}
}
None
}
/// Returns an iterator over all the threads in the queue plus the currently running thread,
/// if there is one.
pub fn iter(&self) -> impl Iterator<Item = &ThreadId> {
Iterator::chain(self.queue.iter(), self.current.iter())
}
}
impl CpuQueue {
/// Constructs an empty queue with its own idle task
pub fn new(index: usize) -> Self {
let idle = TaskContext::kernel(__idle, Cpu::local().id() as usize)
.expect("Could not construct an idle task");
Self {
inner: {
IrqSafeSpinlock::new(CpuQueueInner {
current: None,
queue: VecDeque::new(),
stats: CpuQueueStats::default(),
})
},
idle,
index,
}
}
/// Starts queue execution on current CPU.
///
/// # Safety
///
/// Only meant to be called from [crate::task::enter()] function.
pub unsafe fn enter(&self) -> ! {
assert!(ArchitectureImpl::interrupt_mask());
// Start from idle thread to avoid having a Arc stuck here without getting dropped
// let t = CNTPCT_EL0.get();
// self.lock().stats.measure_time = t;
self.idle.enter()
}
/// Yields CPU execution to the next task in queue (or idle task if there aren't any).
///
/// # Safety
///
/// The function is only meant to be called from kernel threads (e.g. if they want to yield
/// CPU execution to wait for something) or interrupt handlers.
pub unsafe fn yield_cpu(&self) {
// Will also disable interrupts in this section
let cpu = Cpu::local();
let mut inner = self.inner.lock();
let current = inner.current;
let current_t = current.and_then(Thread::get);
if let Some(current_t) = current_t.as_ref() {
if current_t.state.load(Ordering::Acquire) == ThreadState::Terminated {
current_t.exit_notify.wake_all();
}
if current_t
.state
.compare_exchange(
ThreadState::Running,
ThreadState::Ready,
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_ok()
{
inner.queue.push_back(current_t.id());
}
}
let next_t = inner.next_ready_task();
inner.current = next_t.as_deref().map(Thread::id);
// Can drop the lock, we hold current and next Arc's
drop(inner);
let (from, _from_rc) = if let Some(current_t) = current_t.as_ref() {
(current_t.context(), Arc::strong_count(current_t))
} else {
(&self.idle, 0)
};
let (to, _to_rc) = if let Some(next_t) = next_t.as_ref() {
next_t.set_running(cpu.id());
(next_t.context(), Arc::strong_count(next_t))
} else {
(&self.idle, 0)
};
assert!(ArchitectureImpl::interrupt_mask());
to.switch(from)
}
/// Pushes the process to the back of the execution queue.
///
/// # Safety
///
/// Only meant to be called from Process impl. The function does not set any process accounting
/// information, which may lead to invalid states.
pub unsafe fn enqueue(&self, tid: ThreadId) {
let mut inner = self.inner.lock();
assert!(ArchitectureImpl::interrupt_mask());
// assert_eq!(p.state(), ProcessState::Ready);
inner.queue.push_back(tid);
}
/// Removes thread with given TID from the exeuction queue.
pub fn dequeue(&self, tid: ThreadId) {
assert!(ArchitectureImpl::interrupt_mask());
let mut inner = self.inner.lock();
inner.queue.retain(|&p| p != tid)
}
/// Returns the queue length at this moment.
///
/// # Note
///
/// This value may immediately change.
pub fn len(&self) -> usize {
self.inner.lock().queue.len()
}
/// Returns `true` if the queue is empty at the moment.
///
/// # Note
///
/// This may immediately change.
pub fn is_empty(&self) -> bool {
self.inner.lock().queue.is_empty()
}
/// Returns the current [ThreadId], or [None] if idle
pub fn current_id(&self) -> Option<ThreadId> {
self.inner.lock().current
}
/// Returns a queue for given CPU index
pub fn for_cpu(id: usize) -> &'static CpuQueue {
&QUEUES.get()[id]
}
/// Returns an iterator over all queues of the system
#[inline]
pub fn all() -> impl Iterator<Item = &'static CpuQueue> {
QUEUES.get().iter()
}
/// Picks a queue with the least amount of tasks in it
pub fn least_loaded() -> Option<(usize, &'static CpuQueue)> {
let queues = QUEUES.get();
queues.iter().enumerate().min_by_key(|(_, q)| q.len())
}
/// Returns the CPU index of the queue
pub fn index(&self) -> usize {
self.index
}
}
static QUEUES: OneTimeInit<Vec<CpuQueue>> = OneTimeInit::new();
/// Initializes the global queue list
pub fn init_queues(queues: Vec<CpuQueue>) {

View File

@ -1,3 +1,5 @@
// TODO XXX TODO XXX
#![allow(missing_docs)]
//! Thread data structures and management
use core::{
@ -5,7 +7,7 @@ use core::{
mem::size_of,
ops::Deref,
pin::Pin,
sync::atomic::{AtomicU64, Ordering},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
task::{Context, Poll},
};
@ -13,17 +15,14 @@ use abi::{
error::Error,
process::{ExitCode, Signal, SignalEntryData},
};
use alloc::{
collections::{BTreeMap, VecDeque},
string::String,
sync::Arc,
};
use alloc::{collections::BTreeMap, string::String, sync::Arc};
use atomic_enum::atomic_enum;
use futures_util::{task::ArcWake, Future};
use crossbeam_queue::SegQueue;
use futures_util::Future;
use kernel_util::{
block,
runtime::QueueWaker,
sync::{IrqGuard, IrqSafeSpinlock},
sync::{spin_rwlock::IrqSafeRwLock, IrqGuard, IrqSafeSpinlock},
};
use crate::{
@ -61,35 +60,90 @@ pub enum ThreadId {
#[repr(C)]
pub struct CurrentThread(Arc<Thread>, IrqGuard);
#[derive(Debug)]
#[repr(transparent)]
pub struct ThreadAffinity(AtomicU64);
struct SignalEntry {
entry: usize,
stack: usize,
}
struct ThreadInner {
queue: Option<&'static CpuQueue>,
signal_entry: Option<SignalEntry>,
signal_stack: VecDeque<Signal>,
}
struct GlobalThreadList {
data: BTreeMap<ThreadId, Arc<Thread>>,
}
pub struct ThreadSchedulingInfo {
pub state: ThreadState,
pub in_queue: bool,
pub queue: Option<&'static CpuQueue>,
}
static THREADS: IrqSafeRwLock<GlobalThreadList> = IrqSafeRwLock::new(GlobalThreadList::new());
/// Describes a single thread within the system
pub struct Thread {
context: TaskContext,
name: Option<String>,
id: ThreadId,
pub(super) state: AtomicThreadState,
pub id: ThreadId,
pub name: Option<String>,
pub sched: IrqSafeSpinlock<ThreadSchedulingInfo>,
pub context: TaskContext,
process: Option<Arc<Process>>,
space: Option<Arc<ProcessAddressSpace>>,
inner: IrqSafeSpinlock<ThreadInner>,
signal_queue: SegQueue<Signal>,
pub(super) terminated: AtomicBool,
pub(super) exit_notify: Arc<QueueWaker>,
pub affinity: ThreadAffinity,
}
static THREADS: IrqSafeSpinlock<BTreeMap<ThreadId, Arc<Thread>>> =
IrqSafeSpinlock::new(BTreeMap::new());
impl GlobalThreadList {
pub const fn new() -> Self {
Self {
data: BTreeMap::new(),
}
}
#[inline]
pub fn get(&self, id: ThreadId) -> Option<&Arc<Thread>> {
self.data.get(&id)
}
pub fn insert(&mut self, thread: Arc<Thread>) {
let id = thread.id;
debug_assert!(!self.data.contains_key(&id));
self.data.insert(id, thread);
}
}
impl ThreadAffinity {
pub const ANY_CPU: u64 = u64::MAX;
pub const fn any_cpu() -> Self {
Self(AtomicU64::new(Self::ANY_CPU))
}
pub const fn only_cpu(index: usize) -> Self {
Self(AtomicU64::new(1 << index))
}
#[inline]
pub fn get(&self) -> u64 {
self.0.load(Ordering::Relaxed)
}
#[inline]
pub fn set(&self, value: u64) {
debug_assert_ne!(value, 0);
self.0.store(value, Ordering::Relaxed);
}
}
impl Thread {
fn new(
@ -100,30 +154,40 @@ impl Thread {
context: TaskContext,
) -> Arc<Self> {
let thread = Arc::new(Self {
context,
name,
id,
state: AtomicThreadState::new(ThreadState::Suspended),
name,
sched: IrqSafeSpinlock::new(ThreadSchedulingInfo {
state: ThreadState::Suspended,
in_queue: false,
queue: None,
}),
context,
process,
space,
inner: IrqSafeSpinlock::new(ThreadInner {
queue: None,
signal_stack: VecDeque::new(),
signal_entry: None,
}),
inner: IrqSafeSpinlock::new(ThreadInner { signal_entry: None }),
signal_queue: SegQueue::new(),
exit_notify: Arc::new(QueueWaker::new()),
terminated: AtomicBool::new(false),
affinity: ThreadAffinity::any_cpu(),
});
THREADS.lock().insert(id, thread.clone());
THREADS.write().insert(thread.clone());
thread
}
/// Constructs a new kernel-space thread
pub fn new_kthread<S: Into<String>>(name: S, context: TaskContext) -> Arc<Self> {
Self::new(
ThreadId::next_kernel(),
Some(name.into()),
None,
None,
context,
)
}
/// Constructs a new user-space thread
pub fn new_uthread(
parent: Arc<Process>,
@ -139,53 +203,151 @@ impl Thread {
)
}
/// Constructs a new kernel-space thread
pub fn new_kthread<S: Into<String>>(name: S, context: TaskContext) -> Arc<Self> {
Self::new(
ThreadId::next_kernel(),
Some(name.into()),
None,
None,
context,
)
// Get/Set
/// Updates the thread affinity to run on a specific set of CPUs
pub fn set_affinity(&self, affinity: u64) {
self.affinity.set(affinity);
}
// Info
/// Returns the thread's ID
pub fn id(&self) -> ThreadId {
self.id
/// Updates the thread signal entry/stack information
pub fn set_signal_entry(&self, entry: usize, stack: usize) {
let mut inner = self.inner.lock();
inner.signal_entry.replace(SignalEntry { entry, stack });
}
/// Returns the thread's name, if set
pub fn name(&self) -> Option<&String> {
self.name.as_ref()
}
/// Returns the thread's [TaskContext]
pub fn context(&self) -> &TaskContext {
&self.context
}
/// Returns the thread's [ProcessAddressSpace] reference.
///
/// # Panics
///
/// Will panic if the thread has no associated address space.
pub fn address_space(&self) -> &Arc<ProcessAddressSpace> {
/// Returns the thread address space (usually provided by its parent process). If none exists,
/// panics.
pub fn address_space(&self) -> &ProcessAddressSpace {
self.space.as_ref().unwrap()
}
/// Returns the thread's parent [Process] reference.
///
/// # Panics
///
/// Will panic if the thread has no process associated (i.e. it's a kernel thread).
/// Returns the thread's parent process, panics if there's none
pub fn process(&self) -> &Arc<Process> {
self.process.as_ref().unwrap()
}
// Queue operation
/// Updates the thread's terminated status and wakes up any other threads waiting for it to
/// exit
pub fn set_terminated(&self) {
if !self.terminated.swap(true, Ordering::Release) {
self.exit_notify.wake_all();
}
}
// Signals
/// Pushes a signal to the thread's signal queue
pub fn raise_signal(&self, signal: Signal) {
self.signal_queue.push(signal);
}
// Scheduling
/// Changes thread state to "Ready" and inserts it into given `queue`, if it's not yet in one
pub fn enqueue_to(&self, queue: &'static CpuQueue) {
let mut sched = self.sched.lock();
assert_ne!(sched.state, ThreadState::Terminated);
match sched.state {
ThreadState::Running | ThreadState::Ready => {
assert!(sched.in_queue);
}
ThreadState::Suspended => {
sched.state = ThreadState::Ready;
if !sched.in_queue {
assert!(sched.queue.is_none());
sched.in_queue = true;
sched.queue = Some(queue);
queue.push(self.id);
}
}
ThreadState::Terminated => panic!("Cannot enqueue a terminated thread"),
}
}
/// Changes thread state to `state` and removes it from its queue. If the thread is currently
/// running on local CPU, will yield control to the next thread
pub fn dequeue(&self, state: ThreadState) {
let mut sched = self.sched.lock();
debug_assert_ne!(state, ThreadState::Running);
debug_assert_ne!(state, ThreadState::Ready);
let old_state = sched.state;
sched.state = state;
if let Some(queue) = sched.queue {
match (queue.is_local(), old_state) {
(true, ThreadState::Running) => unsafe {
debug_assert!(sched.in_queue);
drop(sched);
queue.yield_cpu();
},
(false, ThreadState::Running) => {
debugln!("deq remote {:?}", self.id);
debug_assert!(sched.in_queue);
}
(_, ThreadState::Ready) => {
debug_assert!(sched.in_queue);
}
(_, ThreadState::Suspended | ThreadState::Terminated) => {
todo!()
}
}
} else {
assert!(!sched.in_queue);
assert_ne!(old_state, ThreadState::Running);
assert_ne!(old_state, ThreadState::Ready);
}
}
/// Inserts the thread as "Ready" into the best queue (based on affinity and queue load)
pub fn enqueue(&self) {
let queue = CpuQueue::for_affinity_mask(self.affinity.get());
self.enqueue_to(queue);
}
fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::Acquire)
}
/// Waits until thread is terminated and removed from scheduling queues
pub fn wait_for_exit(self: &Arc<Self>) -> impl Future<Output = ()> {
struct F(Arc<Thread>);
impl Future for F {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let F(thread) = self.deref();
thread.exit_notify.register(cx.waker());
if thread.is_terminated() {
thread.exit_notify.remove(cx.waker());
Poll::Ready(())
} else {
Poll::Pending
}
}
}
infoln!("wait_for_exit: pending {:?}", self.id);
F(self.clone())
}
/// Requests thread termination and blocks until said thread finishes fully
pub async fn terminate(self: &Arc<Self>) {
// Will not abort the execution: called from another thread
self.dequeue(ThreadState::Terminated);
self.wait_for_exit().await
}
/// Returns the current thread on the CPU.
///
@ -201,204 +363,30 @@ impl Thread {
pub fn get_current() -> Option<CurrentThread> {
// IrqGuard is held throughout
let cpu = Cpu::local();
let thread = cpu.queue().current_id().and_then(Self::get);
let thread = cpu.current_thread_id().and_then(Self::get);
thread.map(|t| CurrentThread(t, cpu.into_guard()))
}
/// Enqueues the thread onto any (usually the least loaded) CPU queue and returns its index.
/// See [Thread::enqueue_to].
pub fn enqueue_somewhere(&self) -> usize {
// Doesn't have to be precise, so even if something changes, we can still be rebalanced
// to another CPU
let (index, queue) = CpuQueue::least_loaded().unwrap();
self.enqueue_to(queue);
index
}
/// Enqueues the thread onto the specific CPU's queue.
///
/// # Panics
///
/// Will panic if the process is in some weird state while being queued.
pub fn enqueue_to(&self, queue: &'static CpuQueue) {
let _irq = IrqGuard::acquire();
{
let mut inner = self.inner.lock();
let old_queue = inner.queue.replace(queue);
if old_queue.is_some() {
// Already in some queue
return;
}
}
match self.state.compare_exchange(
ThreadState::Suspended,
ThreadState::Ready,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Err(ThreadState::Terminated) => {
// Process might've been killed while `await`ing in a `block!`
debugln!(
"Thread {} {:?} already terminated, dropping",
self.id(),
self.name()
);
}
Err(state) => {
todo!("Unexpected process state when enqueueing: {:?}", state)
}
Ok(_) => unsafe {
queue.enqueue(self.id);
},
}
}
// TODO maybe separate dequeue for current and "other" threads
fn dequeue(&self, new_state: ThreadState) {
let _irq = IrqGuard::acquire();
assert_ne!(new_state, ThreadState::Ready);
assert_ne!(new_state, ThreadState::Running);
let current_state = self.state.swap(new_state, Ordering::SeqCst);
let mut inner = self.inner.lock();
let proc_queue = inner.queue.take().unwrap();
proc_queue.dequeue(self.id());
match current_state {
// NOTE: I'm not sure if the process could've been queued between the store and this
// but most likely not (if I'm not that bad with atomics)
// Do nothing, its queue will just drop the process
ThreadState::Ready => (),
// Do nothing, not in a queue already
ThreadState::Suspended => (),
ThreadState::Terminated => panic!("Thread is terminated"),
ThreadState::Running => {
let queue = Cpu::local().queue();
if core::ptr::eq(queue, proc_queue) {
drop(inner);
// Suspending a process running on local CPU
unsafe { queue.yield_cpu() }
} else {
todo!();
}
}
}
}
/// Marks the thread as running and sets its "current" CPU index.
///
/// # Safety
///
/// Only meant to be called from within the scheduler.
pub unsafe fn set_running(&self, _cpu: u32) {
self.state.store(ThreadState::Running, Ordering::Release);
}
// Accounting
/// Returns the thread with given [ThreadId], if it exists
pub fn get(id: ThreadId) -> Option<Arc<Thread>> {
THREADS.lock().get(&id).cloned()
}
// Thread inner
/// Changes the thread's signal entry point information
pub fn set_signal_entry(&self, entry: usize, stack: usize) {
let mut inner = self.inner.lock();
inner.signal_entry.replace(SignalEntry { entry, stack });
}
/// Pushes a [Signal] onto the thread's signal stack.
///
/// When executed on a current thread, the signal is guaranteed to be handled exactly before
/// returning to user context (i.e. from syscall). Otherwise, the signal handling order and
/// whether it will be delivered at all is not guaranteed.
pub fn raise_signal(self: &Arc<Self>, signal: Signal) {
self.inner.lock().signal_stack.push_back(signal);
if self.state.load(Ordering::Acquire) == ThreadState::Suspended {
self.clone().enqueue_somewhere();
}
}
/// Requests thread termination and blocks until said thread finishes fully:
pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
struct F(Arc<Thread>);
impl Future for F {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let F(thread) = self.deref();
thread.exit_notify.register(cx.waker());
if thread.state.load(Ordering::Acquire) == ThreadState::Terminated {
thread.exit_notify.remove(cx.waker());
Poll::Ready(())
} else {
Poll::Pending
}
}
}
// Will not abort the execution: called from another thread
self.dequeue(ThreadState::Terminated);
F(self.clone())
}
}
impl ArcWake for Thread {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.clone().enqueue_somewhere();
THREADS.read().get(id).cloned()
}
}
impl CurrentThread {
fn dequeue_terminate(&self, code: ExitCode) {
self.state
.compare_exchange(
ThreadState::Running,
ThreadState::Terminated,
Ordering::AcqRel,
Ordering::Relaxed,
)
.unwrap();
/// Terminate the current thread
pub fn exit(&self, code: ExitCode) -> ! {
if let Some(process) = self.process.as_ref() {
process.handle_thread_exit(self.id(), code);
process.handle_thread_exit(self.id, code);
}
let queue = Cpu::local().queue();
queue.dequeue(self.id());
unsafe { queue.yield_cpu() }
self.dequeue(ThreadState::Terminated);
unreachable!()
}
/// Terminate the current thread
pub fn exit(&self, code: ExitCode) {
// May already have been terminated by process exit
if self.state.load(Ordering::Acquire) == ThreadState::Terminated {
return;
}
self.dequeue_terminate(code)
}
// TODO: test multithreaded process exit
/// Terminate the parent process of the thread, including all other threads and the current
/// thread itself
pub fn exit_process(&self, code: ExitCode) {
pub fn exit_process(&self, code: ExitCode) -> ! {
let _guard = IrqGuard::acquire();
let process = self
@ -409,20 +397,17 @@ impl CurrentThread {
let p = process.clone();
block! {
p.terminate_others(self.id()).await;
p.terminate_others(self.id).await;
}
.unwrap();
self.exit(code);
unreachable!();
self.exit(code)
}
/// Suspends the current thread until it is waken up again. Guaranteed to happen immediately.
pub fn suspend(&self) -> Result<(), Error> {
self.dequeue(ThreadState::Suspended);
let inner = self.inner.lock();
if !inner.signal_stack.is_empty() {
if !self.signal_queue.is_empty() {
return Err(Error::Interrupted);
}
@ -440,9 +425,9 @@ impl CurrentThread {
return;
}
let mut inner = self.inner.lock();
if let Some(signal) = self.signal_queue.pop() {
let inner = self.inner.lock();
if let Some(signal) = inner.signal_stack.pop_front() {
let Some(entry) = inner.signal_entry.as_ref() else {
todo!();
};
@ -546,7 +531,7 @@ fn __create_kthread(
#[no_mangle]
fn __enqueue(t: &Arc<Thread>) {
t.enqueue_somewhere();
t.enqueue();
}
#[no_mangle]
@ -562,11 +547,10 @@ fn __suspend_current(t: &CurrentThread) -> Result<(), Error> {
#[no_mangle]
fn __exit_current(t: &CurrentThread, code: ExitCode) -> ! {
t.exit(code);
unreachable!();
}
#[no_mangle]
unsafe fn __yield() {
let _irq = IrqGuard::acquire();
Cpu::local().queue().yield_cpu()
Cpu::local().queue().yield_cpu();
}