proc: separate thread and process exit

This commit is contained in:
Mark Poliakov 2023-11-26 15:05:16 +02:00
parent 0dc2cfa159
commit 6eef11a4e4
8 changed files with 180 additions and 65 deletions

View File

@ -192,7 +192,7 @@ impl Device for Pl011 {
self.inner.init(IrqSafeSpinlock::new(inner)); self.inner.init(IrqSafeSpinlock::new(inner));
// debug::add_sink(self, LogLevel::Debug); debug::add_sink(self, LogLevel::Debug);
devfs::add_char_device(self, CharDeviceType::TtySerial)?; devfs::add_char_device(self, CharDeviceType::TtySerial)?;
Ok(()) Ok(())

View File

@ -1,7 +1,7 @@
//! Kernel main process implementation: filesystem initialization and userspace init start //! Kernel main process implementation: filesystem initialization and userspace init start
use abi::{ use abi::{
error::Error, error::Error,
io::{FileMode, OpenOptions, RawFd}, io::{OpenOptions, RawFd},
}; };
use memfs::MemoryFilesystem; use memfs::MemoryFilesystem;
use vfs::{Filesystem, IoContext, VnodeRef}; use vfs::{Filesystem, IoContext, VnodeRef};
@ -23,7 +23,7 @@ fn setup_root() -> Result<VnodeRef, Error> {
/// ///
/// This function is meant to be used as a kernel-space process after all the platform-specific /// This function is meant to be used as a kernel-space process after all the platform-specific
/// initialization has finished. /// initialization has finished.
pub fn kinit() { pub fn kinit() -> Result<(), Error> {
infoln!("In main"); infoln!("In main");
#[cfg(feature = "fb_console")] #[cfg(feature = "fb_console")]
@ -32,46 +32,39 @@ pub fn kinit() {
runtime::spawn(async move { runtime::spawn(async move {
update_consoles_task().await; update_consoles_task().await;
}) })?;
.expect("Could not start periodic console auto-flush task");
} }
let root = match setup_root() { let root = setup_root()?;
Ok(root) => root,
Err(err) => {
warnln!("Could not setup root from initrd: {:?}", err);
return;
}
};
let ioctx = IoContext::new(root); let ioctx = IoContext::new(root);
let node = ioctx.find(None, "/init", true, true).unwrap(); let node = ioctx.find(None, "/init", true, true)?;
let file = node.open(OpenOptions::READ).unwrap(); let file = node.open(OpenOptions::READ)?;
let devfs = devfs::root(); let devfs = devfs::root();
#[cfg(target_arch = "x86_64")] #[cfg(target_arch = "x86_64")]
let console = ioctx.find(Some(devfs.clone()), "tty0", true, true).unwrap(); let console = ioctx.find(Some(devfs.clone()), "tty0", true, true)?;
#[cfg(target_arch = "aarch64")] #[cfg(target_arch = "aarch64")]
let console = ioctx let console = ioctx.find(Some(devfs.clone()), "ttyS0", true, true)?;
.find(Some(devfs.clone()), "ttyS0", true, true) let stdin = console.open(OpenOptions::READ)?;
.unwrap(); let stdout = console.open(OpenOptions::WRITE)?;
let stdin = console.open(OpenOptions::READ).unwrap();
let stdout = console.open(OpenOptions::WRITE).unwrap();
let stderr = stdout.clone(); let stderr = stdout.clone();
{ {
// XXX // XXX
let (user_init, user_init_main) = let (user_init, user_init_main) =
proc::exec::load_elf("init", file, &["/init", "xxx"], &[]).unwrap(); proc::exec::load_elf("init", file, &["/init", "xxx"], &[])?;
let mut io = user_init.io.lock(); let mut io = user_init.io.lock();
io.set_ioctx(ioctx); io.set_ioctx(ioctx);
io.set_file(RawFd::STDIN, stdin).unwrap(); io.set_file(RawFd::STDIN, stdin)?;
io.set_file(RawFd::STDOUT, stdout).unwrap(); io.set_file(RawFd::STDOUT, stdout)?;
io.set_file(RawFd::STDERR, stderr).unwrap(); io.set_file(RawFd::STDERR, stderr)?;
drop(io); drop(io);
user_init.set_session_terminal(console); user_init.set_session_terminal(console);
user_init_main.enqueue_somewhere(); user_init_main.enqueue_somewhere();
} }
Ok(())
} }

View File

@ -381,12 +381,18 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
Ok(0) Ok(0)
} }
SyscallFunction::Exit => { SyscallFunction::ExitThread => {
let code = ExitCode::from(args[0] as i32); let code = ExitCode::from(args[0] as i32);
// TODO separate handlers for process exit and thread exit?
thread.exit(code); thread.exit(code);
// Process::current().exit(code); unreachable!()
panic!(); }
SyscallFunction::ExitProcess => {
// A bit different from thread exit: wait for other threads to finish and exit only
// after that
let code = ExitCode::from(args[0] as i32);
thread.exit_process(code);
unreachable!()
} }
SyscallFunction::SendSignal => { SyscallFunction::SendSignal => {
let pid = ProcessId::from(args[0] as u32); let pid = ProcessId::from(args[0] as u32);

View File

@ -1,5 +1,7 @@
//! Platform-specific task context manipulation interfaces //! Platform-specific task context manipulation interfaces
use core::fmt;
use abi::{arch::SavedFrame, error::Error, process::ExitCode}; use abi::{arch::SavedFrame, error::Error, process::ExitCode};
use alloc::boxed::Box; use alloc::boxed::Box;
use cfg_if::cfg_if; use cfg_if::cfg_if;
@ -16,6 +18,34 @@ cfg_if! {
} }
} }
pub trait Termination {
fn into_exit_code(self) -> ExitCode;
}
impl<T, E: fmt::Debug> Termination for Result<T, E> {
fn into_exit_code(self) -> ExitCode {
match self {
Ok(_) => ExitCode::SUCCESS,
Err(err) => {
warnln!("Kernel thread failed: {:?}", err);
ExitCode::Exited(1)
}
}
}
}
impl Termination for ExitCode {
fn into_exit_code(self) -> ExitCode {
self
}
}
impl Termination for () {
fn into_exit_code(self) -> ExitCode {
ExitCode::SUCCESS
}
}
/// Interface for task state save/restore mechanisms /// Interface for task state save/restore mechanisms
pub trait TaskFrame { pub trait TaskFrame {
/// Creates a "snapshot" of a exception/syscall frame /// Creates a "snapshot" of a exception/syscall frame
@ -81,17 +111,20 @@ pub trait TaskContextImpl: Sized {
unsafe fn switch(&self, from: &Self); unsafe fn switch(&self, from: &Self);
/// Constructs a safe wrapper process to execute a kernel-space closure /// Constructs a safe wrapper process to execute a kernel-space closure
fn kernel_closure<F: FnOnce() + Send + 'static>(f: F) -> Result<Self, Error> { fn kernel_closure<T: Termination, F: FnOnce() -> T + Send + 'static>(
extern "C" fn closure_wrapper<F: FnOnce() + Send + 'static>(closure_addr: usize) -> ! { f: F,
) -> Result<Self, Error> {
extern "C" fn closure_wrapper<T: Termination, F: FnOnce() -> T + Send + 'static>(
closure_addr: usize,
) -> ! {
let closure = unsafe { Box::from_raw(closure_addr as *mut F) }; let closure = unsafe { Box::from_raw(closure_addr as *mut F) };
closure(); let result = closure();
Thread::current().exit(result.into_exit_code());
Thread::current().exit(ExitCode::SUCCESS);
unreachable!(); unreachable!();
} }
let closure = Box::new(f); let closure = Box::new(f);
debugln!("closure: {:p}", closure); debugln!("closure: {:p}", closure);
Self::kernel(closure_wrapper::<F>, Box::into_raw(closure) as usize) Self::kernel(closure_wrapper::<T, F>, Box::into_raw(closure) as usize)
} }
} }

View File

@ -11,7 +11,10 @@ use crate::{
task::{sched::CpuQueue, thread::Thread}, task::{sched::CpuQueue, thread::Thread},
}; };
use self::{context::TaskContextImpl, process::Process}; use self::{
context::{TaskContextImpl, Termination},
process::Process,
};
pub mod context; pub mod context;
pub mod process; pub mod process;
@ -65,7 +68,7 @@ pub use context::{Cpu, TaskContext};
// pub static PROCESSES: IrqSafeSpinlock<ProcessList> = IrqSafeSpinlock::new(ProcessList::new()); // pub static PROCESSES: IrqSafeSpinlock<ProcessList> = IrqSafeSpinlock::new(ProcessList::new());
/// Creates a new kernel-space process to execute a closure and queues it to some CPU /// Creates a new kernel-space process to execute a closure and queues it to some CPU
pub fn spawn_kernel_closure<S: Into<String>, F: Fn() + Send + 'static>( pub fn spawn_kernel_closure<S: Into<String>, T: Termination, F: Fn() -> T + Send + 'static>(
name: S, name: S,
f: F, f: F,
) -> Result<(), Error> { ) -> Result<(), Error> {

View File

@ -359,6 +359,21 @@ impl Process {
pub fn get(id: ProcessId) -> Option<Arc<Self>> { pub fn get(id: ProcessId) -> Option<Arc<Self>> {
PROCESSES.lock().get(&id).cloned() PROCESSES.lock().get(&id).cloned()
} }
pub async fn terminate_others(&self, except: ThreadId) {
let mut inner = self.inner.lock();
for thread in inner.threads.iter() {
if thread.id() == except {
continue;
}
infoln!("Terminate thread {}", thread.id());
thread.terminate().await;
}
inner.threads.retain(|t| t.id() == except);
}
} }
impl fmt::Display for ProcessId { impl fmt::Display for ProcessId {

View File

@ -175,6 +175,10 @@ impl CpuQueue {
let current_t = current.and_then(Thread::get); let current_t = current.and_then(Thread::get);
if let Some(current_t) = current_t.as_ref() { if let Some(current_t) = current_t.as_ref() {
if current_t.state.load(Ordering::Acquire) == ThreadState::Terminated {
current_t.exit_notify.wake_all();
}
if current_t if current_t
.state .state
.compare_exchange( .compare_exchange(

View File

@ -2,7 +2,9 @@ use core::{
fmt, fmt,
mem::size_of, mem::size_of,
ops::Deref, ops::Deref,
pin::Pin,
sync::atomic::{AtomicU32, AtomicU64, Ordering}, sync::atomic::{AtomicU32, AtomicU64, Ordering},
task::{Context, Poll},
}; };
use abi::{ use abi::{
@ -15,16 +17,19 @@ use alloc::{
sync::Arc, sync::Arc,
}; };
use atomic_enum::atomic_enum; use atomic_enum::atomic_enum;
use futures_util::task::ArcWake; use futures_util::{task::ArcWake, Future};
use kernel_util::util::OneTimeInit; use kernel_util::util::OneTimeInit;
use crate::{ use crate::{
block,
mem::{process::ProcessAddressSpace, ForeignPointer}, mem::{process::ProcessAddressSpace, ForeignPointer},
sync::{IrqGuard, IrqSafeSpinlock}, sync::{IrqGuard, IrqSafeSpinlock},
task::{context::TaskContextImpl, Cpu}, task::{context::TaskContextImpl, Cpu},
}; };
use super::{context::TaskFrame, process::Process, sched::CpuQueue, TaskContext}; use super::{
context::TaskFrame, process::Process, runtime::QueueWaker, sched::CpuQueue, TaskContext,
};
/// Represents the states a thread can be at some point in time /// Represents the states a thread can be at some point in time
#[atomic_enum] #[atomic_enum]
@ -54,7 +59,6 @@ struct SignalEntry {
} }
struct ThreadInner { struct ThreadInner {
exit_status: i32,
queue: Option<&'static CpuQueue>, queue: Option<&'static CpuQueue>,
signal_entry: Option<SignalEntry>, signal_entry: Option<SignalEntry>,
@ -73,6 +77,7 @@ pub struct Thread {
space: Option<Arc<ProcessAddressSpace>>, space: Option<Arc<ProcessAddressSpace>>,
inner: IrqSafeSpinlock<ThreadInner>, inner: IrqSafeSpinlock<ThreadInner>,
pub(super) exit_notify: Arc<QueueWaker>,
} }
static THREADS: IrqSafeSpinlock<BTreeMap<ThreadId, Arc<Thread>>> = static THREADS: IrqSafeSpinlock<BTreeMap<ThreadId, Arc<Thread>>> =
@ -102,12 +107,13 @@ impl Thread {
space, space,
inner: IrqSafeSpinlock::new(ThreadInner { inner: IrqSafeSpinlock::new(ThreadInner {
exit_status: 0,
queue: None, queue: None,
signal_stack: VecDeque::new(), signal_stack: VecDeque::new(),
signal_entry: None, signal_entry: None,
}), }),
exit_notify: Arc::new(QueueWaker::new()),
}); });
THREADS.lock().insert(id, thread.clone()); THREADS.lock().insert(id, thread.clone());
@ -225,8 +231,8 @@ impl Thread {
assert_ne!(new_state, ThreadState::Ready); assert_ne!(new_state, ThreadState::Ready);
assert_ne!(new_state, ThreadState::Running); assert_ne!(new_state, ThreadState::Running);
let mut inner = self.inner.lock();
let current_state = self.state.swap(new_state, Ordering::SeqCst); let current_state = self.state.swap(new_state, Ordering::SeqCst);
let mut inner = self.inner.lock();
let proc_queue = inner.queue.take().unwrap(); let proc_queue = inner.queue.take().unwrap();
proc_queue.dequeue(self.id()); proc_queue.dequeue(self.id());
@ -271,24 +277,6 @@ impl Thread {
} }
// Thread inner // Thread inner
/// Handles the cleanup of an exited thread
pub fn handle_exit(&self) {
// Scheduler still holds a lock of this process?
// TODO cancel Wait if a process was killed while suspended?
let code = {
let inner = self.inner.lock();
let exit_status = ExitCode::from(inner.exit_status);
exit_status
};
if let Some(process) = self.process.as_ref() {
process.handle_thread_exit(self.id(), code);
}
// TODO WaitThread, notify any waiters we're done
// self.exit_waker.wake_all();
}
pub fn set_signal_entry(&self, entry: usize, stack: usize) { pub fn set_signal_entry(&self, entry: usize, stack: usize) {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.signal_entry.replace(SignalEntry { entry, stack }); inner.signal_entry.replace(SignalEntry { entry, stack });
@ -301,6 +289,32 @@ impl Thread {
self.clone().enqueue_somewhere(); self.clone().enqueue_somewhere();
} }
} }
pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
struct F(Arc<Thread>);
impl Future for F {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let F(thread) = self.deref();
thread.exit_notify.register(cx.waker());
if thread.state.load(Ordering::Acquire) == ThreadState::Terminated {
thread.exit_notify.remove(cx.waker());
Poll::Ready(())
} else {
Poll::Pending
}
}
}
// Will not abort the execution: called from another thread
self.dequeue(ThreadState::Terminated);
F(self.clone())
}
} }
impl ArcWake for Thread { impl ArcWake for Thread {
@ -310,13 +324,60 @@ impl ArcWake for Thread {
} }
impl CurrentThread { impl CurrentThread {
/// Terminate the current process fn dequeue_terminate(&self, code: ExitCode) {
pub fn exit(&self, status: ExitCode) { self.state
self.inner.lock().exit_status = status.into(); .compare_exchange(
debugln!("Thread {} exited with code {:?}", self.id(), status); ThreadState::Running,
ThreadState::Terminated,
Ordering::AcqRel,
Ordering::Relaxed,
)
.unwrap();
self.handle_exit(); if let Some(process) = self.process.as_ref() {
self.dequeue(ThreadState::Terminated); process.handle_thread_exit(self.id(), code);
}
let mut inner = self.inner.lock();
let proc_queue = inner.queue.take().unwrap();
let queue = Cpu::local().queue();
assert_eq!(proc_queue.index(), queue.index());
drop(inner);
queue.dequeue(self.id());
unsafe { queue.yield_cpu() }
unreachable!()
}
/// Terminate the current thread
pub fn exit(&self, code: ExitCode) {
// May already have been terminated by process exit
if self.state.load(Ordering::Acquire) == ThreadState::Terminated {
return;
}
self.dequeue_terminate(code)
}
pub fn exit_process(&self, code: ExitCode) {
let _guard = IrqGuard::acquire();
let process = self
.process
.clone()
.expect("exit_process() called on a detached thread");
let p = process.clone();
block! {
p.terminate_others(self.id()).await;
}
.unwrap();
self.exit(code);
unreachable!();
} }
pub fn suspend(&self) -> Result<(), Error> { pub fn suspend(&self) -> Result<(), Error> {