From 6eef11a4e488494f3a159463e25cefd8336b4b6e Mon Sep 17 00:00:00 2001 From: Mark Poliakov Date: Sun, 26 Nov 2023 15:05:16 +0200 Subject: [PATCH] proc: separate thread and process exit --- src/device/serial/pl011.rs | 2 +- src/init.rs | 39 +++++------- src/syscall/mod.rs | 14 +++-- src/task/context.rs | 45 ++++++++++++-- src/task/mod.rs | 7 ++- src/task/process.rs | 15 +++++ src/task/sched.rs | 4 ++ src/task/thread.rs | 119 ++++++++++++++++++++++++++++--------- 8 files changed, 180 insertions(+), 65 deletions(-) diff --git a/src/device/serial/pl011.rs b/src/device/serial/pl011.rs index 8fe55aac..22136890 100644 --- a/src/device/serial/pl011.rs +++ b/src/device/serial/pl011.rs @@ -192,7 +192,7 @@ impl Device for Pl011 { self.inner.init(IrqSafeSpinlock::new(inner)); - // debug::add_sink(self, LogLevel::Debug); + debug::add_sink(self, LogLevel::Debug); devfs::add_char_device(self, CharDeviceType::TtySerial)?; Ok(()) diff --git a/src/init.rs b/src/init.rs index dfdcdbdc..37bc02cc 100644 --- a/src/init.rs +++ b/src/init.rs @@ -1,7 +1,7 @@ //! Kernel main process implementation: filesystem initialization and userspace init start use abi::{ error::Error, - io::{FileMode, OpenOptions, RawFd}, + io::{OpenOptions, RawFd}, }; use memfs::MemoryFilesystem; use vfs::{Filesystem, IoContext, VnodeRef}; @@ -23,7 +23,7 @@ fn setup_root() -> Result { /// /// This function is meant to be used as a kernel-space process after all the platform-specific /// initialization has finished. -pub fn kinit() { +pub fn kinit() -> Result<(), Error> { infoln!("In main"); #[cfg(feature = "fb_console")] @@ -32,46 +32,39 @@ pub fn kinit() { runtime::spawn(async move { update_consoles_task().await; - }) - .expect("Could not start periodic console auto-flush task"); + })?; } - let root = match setup_root() { - Ok(root) => root, - Err(err) => { - warnln!("Could not setup root from initrd: {:?}", err); - return; - } - }; + let root = setup_root()?; let ioctx = IoContext::new(root); - let node = ioctx.find(None, "/init", true, true).unwrap(); - let file = node.open(OpenOptions::READ).unwrap(); + let node = ioctx.find(None, "/init", true, true)?; + let file = node.open(OpenOptions::READ)?; let devfs = devfs::root(); #[cfg(target_arch = "x86_64")] - let console = ioctx.find(Some(devfs.clone()), "tty0", true, true).unwrap(); + let console = ioctx.find(Some(devfs.clone()), "tty0", true, true)?; #[cfg(target_arch = "aarch64")] - let console = ioctx - .find(Some(devfs.clone()), "ttyS0", true, true) - .unwrap(); - let stdin = console.open(OpenOptions::READ).unwrap(); - let stdout = console.open(OpenOptions::WRITE).unwrap(); + let console = ioctx.find(Some(devfs.clone()), "ttyS0", true, true)?; + let stdin = console.open(OpenOptions::READ)?; + let stdout = console.open(OpenOptions::WRITE)?; let stderr = stdout.clone(); { // XXX let (user_init, user_init_main) = - proc::exec::load_elf("init", file, &["/init", "xxx"], &[]).unwrap(); + proc::exec::load_elf("init", file, &["/init", "xxx"], &[])?; let mut io = user_init.io.lock(); io.set_ioctx(ioctx); - io.set_file(RawFd::STDIN, stdin).unwrap(); - io.set_file(RawFd::STDOUT, stdout).unwrap(); - io.set_file(RawFd::STDERR, stderr).unwrap(); + io.set_file(RawFd::STDIN, stdin)?; + io.set_file(RawFd::STDOUT, stdout)?; + io.set_file(RawFd::STDERR, stderr)?; drop(io); user_init.set_session_terminal(console); user_init_main.enqueue_somewhere(); } + + Ok(()) } diff --git a/src/syscall/mod.rs b/src/syscall/mod.rs index a3fe19e4..d26de8cb 100644 --- a/src/syscall/mod.rs +++ b/src/syscall/mod.rs @@ -381,12 +381,18 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result Ok(0) } - SyscallFunction::Exit => { + SyscallFunction::ExitThread => { let code = ExitCode::from(args[0] as i32); - // TODO separate handlers for process exit and thread exit? thread.exit(code); - // Process::current().exit(code); - panic!(); + unreachable!() + } + SyscallFunction::ExitProcess => { + // A bit different from thread exit: wait for other threads to finish and exit only + // after that + let code = ExitCode::from(args[0] as i32); + + thread.exit_process(code); + unreachable!() } SyscallFunction::SendSignal => { let pid = ProcessId::from(args[0] as u32); diff --git a/src/task/context.rs b/src/task/context.rs index c6c36c14..96fff708 100644 --- a/src/task/context.rs +++ b/src/task/context.rs @@ -1,5 +1,7 @@ //! Platform-specific task context manipulation interfaces +use core::fmt; + use abi::{arch::SavedFrame, error::Error, process::ExitCode}; use alloc::boxed::Box; use cfg_if::cfg_if; @@ -16,6 +18,34 @@ cfg_if! { } } +pub trait Termination { + fn into_exit_code(self) -> ExitCode; +} + +impl Termination for Result { + fn into_exit_code(self) -> ExitCode { + match self { + Ok(_) => ExitCode::SUCCESS, + Err(err) => { + warnln!("Kernel thread failed: {:?}", err); + ExitCode::Exited(1) + } + } + } +} + +impl Termination for ExitCode { + fn into_exit_code(self) -> ExitCode { + self + } +} + +impl Termination for () { + fn into_exit_code(self) -> ExitCode { + ExitCode::SUCCESS + } +} + /// Interface for task state save/restore mechanisms pub trait TaskFrame { /// Creates a "snapshot" of a exception/syscall frame @@ -81,17 +111,20 @@ pub trait TaskContextImpl: Sized { unsafe fn switch(&self, from: &Self); /// Constructs a safe wrapper process to execute a kernel-space closure - fn kernel_closure(f: F) -> Result { - extern "C" fn closure_wrapper(closure_addr: usize) -> ! { + fn kernel_closure T + Send + 'static>( + f: F, + ) -> Result { + extern "C" fn closure_wrapper T + Send + 'static>( + closure_addr: usize, + ) -> ! { let closure = unsafe { Box::from_raw(closure_addr as *mut F) }; - closure(); - - Thread::current().exit(ExitCode::SUCCESS); + let result = closure(); + Thread::current().exit(result.into_exit_code()); unreachable!(); } let closure = Box::new(f); debugln!("closure: {:p}", closure); - Self::kernel(closure_wrapper::, Box::into_raw(closure) as usize) + Self::kernel(closure_wrapper::, Box::into_raw(closure) as usize) } } diff --git a/src/task/mod.rs b/src/task/mod.rs index f472bdc6..1f7658aa 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -11,7 +11,10 @@ use crate::{ task::{sched::CpuQueue, thread::Thread}, }; -use self::{context::TaskContextImpl, process::Process}; +use self::{ + context::{TaskContextImpl, Termination}, + process::Process, +}; pub mod context; pub mod process; @@ -65,7 +68,7 @@ pub use context::{Cpu, TaskContext}; // pub static PROCESSES: IrqSafeSpinlock = IrqSafeSpinlock::new(ProcessList::new()); /// Creates a new kernel-space process to execute a closure and queues it to some CPU -pub fn spawn_kernel_closure, F: Fn() + Send + 'static>( +pub fn spawn_kernel_closure, T: Termination, F: Fn() -> T + Send + 'static>( name: S, f: F, ) -> Result<(), Error> { diff --git a/src/task/process.rs b/src/task/process.rs index 5bae9a4a..4156bab2 100644 --- a/src/task/process.rs +++ b/src/task/process.rs @@ -359,6 +359,21 @@ impl Process { pub fn get(id: ProcessId) -> Option> { PROCESSES.lock().get(&id).cloned() } + + pub async fn terminate_others(&self, except: ThreadId) { + let mut inner = self.inner.lock(); + + for thread in inner.threads.iter() { + if thread.id() == except { + continue; + } + + infoln!("Terminate thread {}", thread.id()); + thread.terminate().await; + } + + inner.threads.retain(|t| t.id() == except); + } } impl fmt::Display for ProcessId { diff --git a/src/task/sched.rs b/src/task/sched.rs index 174a5520..f36b12b4 100644 --- a/src/task/sched.rs +++ b/src/task/sched.rs @@ -175,6 +175,10 @@ impl CpuQueue { let current_t = current.and_then(Thread::get); if let Some(current_t) = current_t.as_ref() { + if current_t.state.load(Ordering::Acquire) == ThreadState::Terminated { + current_t.exit_notify.wake_all(); + } + if current_t .state .compare_exchange( diff --git a/src/task/thread.rs b/src/task/thread.rs index ec52067e..1719db56 100644 --- a/src/task/thread.rs +++ b/src/task/thread.rs @@ -2,7 +2,9 @@ use core::{ fmt, mem::size_of, ops::Deref, + pin::Pin, sync::atomic::{AtomicU32, AtomicU64, Ordering}, + task::{Context, Poll}, }; use abi::{ @@ -15,16 +17,19 @@ use alloc::{ sync::Arc, }; use atomic_enum::atomic_enum; -use futures_util::task::ArcWake; +use futures_util::{task::ArcWake, Future}; use kernel_util::util::OneTimeInit; use crate::{ + block, mem::{process::ProcessAddressSpace, ForeignPointer}, sync::{IrqGuard, IrqSafeSpinlock}, task::{context::TaskContextImpl, Cpu}, }; -use super::{context::TaskFrame, process::Process, sched::CpuQueue, TaskContext}; +use super::{ + context::TaskFrame, process::Process, runtime::QueueWaker, sched::CpuQueue, TaskContext, +}; /// Represents the states a thread can be at some point in time #[atomic_enum] @@ -54,7 +59,6 @@ struct SignalEntry { } struct ThreadInner { - exit_status: i32, queue: Option<&'static CpuQueue>, signal_entry: Option, @@ -73,6 +77,7 @@ pub struct Thread { space: Option>, inner: IrqSafeSpinlock, + pub(super) exit_notify: Arc, } static THREADS: IrqSafeSpinlock>> = @@ -102,12 +107,13 @@ impl Thread { space, inner: IrqSafeSpinlock::new(ThreadInner { - exit_status: 0, queue: None, signal_stack: VecDeque::new(), signal_entry: None, }), + + exit_notify: Arc::new(QueueWaker::new()), }); THREADS.lock().insert(id, thread.clone()); @@ -225,8 +231,8 @@ impl Thread { assert_ne!(new_state, ThreadState::Ready); assert_ne!(new_state, ThreadState::Running); - let mut inner = self.inner.lock(); let current_state = self.state.swap(new_state, Ordering::SeqCst); + let mut inner = self.inner.lock(); let proc_queue = inner.queue.take().unwrap(); proc_queue.dequeue(self.id()); @@ -271,24 +277,6 @@ impl Thread { } // Thread inner - /// Handles the cleanup of an exited thread - pub fn handle_exit(&self) { - // Scheduler still holds a lock of this process? - // TODO cancel Wait if a process was killed while suspended? - let code = { - let inner = self.inner.lock(); - let exit_status = ExitCode::from(inner.exit_status); - exit_status - }; - - if let Some(process) = self.process.as_ref() { - process.handle_thread_exit(self.id(), code); - } - - // TODO WaitThread, notify any waiters we're done - // self.exit_waker.wake_all(); - } - pub fn set_signal_entry(&self, entry: usize, stack: usize) { let mut inner = self.inner.lock(); inner.signal_entry.replace(SignalEntry { entry, stack }); @@ -301,6 +289,32 @@ impl Thread { self.clone().enqueue_somewhere(); } } + + pub fn terminate(self: &Arc) -> impl Future { + struct F(Arc); + + impl Future for F { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let F(thread) = self.deref(); + + thread.exit_notify.register(cx.waker()); + + if thread.state.load(Ordering::Acquire) == ThreadState::Terminated { + thread.exit_notify.remove(cx.waker()); + Poll::Ready(()) + } else { + Poll::Pending + } + } + } + + // Will not abort the execution: called from another thread + self.dequeue(ThreadState::Terminated); + + F(self.clone()) + } } impl ArcWake for Thread { @@ -310,13 +324,60 @@ impl ArcWake for Thread { } impl CurrentThread { - /// Terminate the current process - pub fn exit(&self, status: ExitCode) { - self.inner.lock().exit_status = status.into(); - debugln!("Thread {} exited with code {:?}", self.id(), status); + fn dequeue_terminate(&self, code: ExitCode) { + self.state + .compare_exchange( + ThreadState::Running, + ThreadState::Terminated, + Ordering::AcqRel, + Ordering::Relaxed, + ) + .unwrap(); - self.handle_exit(); - self.dequeue(ThreadState::Terminated); + if let Some(process) = self.process.as_ref() { + process.handle_thread_exit(self.id(), code); + } + + let mut inner = self.inner.lock(); + let proc_queue = inner.queue.take().unwrap(); + let queue = Cpu::local().queue(); + assert_eq!(proc_queue.index(), queue.index()); + + drop(inner); + + queue.dequeue(self.id()); + unsafe { queue.yield_cpu() } + + unreachable!() + } + + /// Terminate the current thread + pub fn exit(&self, code: ExitCode) { + // May already have been terminated by process exit + if self.state.load(Ordering::Acquire) == ThreadState::Terminated { + return; + } + + self.dequeue_terminate(code) + } + + pub fn exit_process(&self, code: ExitCode) { + let _guard = IrqGuard::acquire(); + + let process = self + .process + .clone() + .expect("exit_process() called on a detached thread"); + + let p = process.clone(); + + block! { + p.terminate_others(self.id()).await; + } + .unwrap(); + + self.exit(code); + unreachable!(); } pub fn suspend(&self) -> Result<(), Error> {