proc: better wait_process

This commit is contained in:
Mark Poliakov 2024-11-28 22:28:32 +02:00
parent 20272d3db0
commit 6017e1044a
13 changed files with 259 additions and 85 deletions

View File

@ -14,10 +14,7 @@ use alloc::rc::Rc;
use block::BlockAllocator;
use dir::DirectoryNode;
use file::FileNode;
use libk::vfs::{
impls::{fixed_path_symlink, fixed_symlink, FixedSymlink},
AccessToken, NodeRef,
};
use libk::vfs::{impls::fixed_path_symlink, AccessToken, NodeRef};
use tar::TarEntry;
use yggdrasil_abi::{
error::Error,

View File

@ -116,16 +116,37 @@ impl BoolEvent {
}
pub fn signal_saturating(&self) {
self.try_signal().ok();
if !self.state.swap(true, Ordering::Release) {
self.notify.wake_all();
}
// self.try_signal().ok();
}
pub fn try_signal(&self) -> Result<(), bool> {
self.state
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)?;
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)?;
self.notify.wake_all();
Ok(())
}
pub async fn wait_reset(&self) {
poll_fn(|cx| {
self.notify.register(cx.waker());
if self
.state
.compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
// Event set and immediately reset
self.notify.remove(cx.waker());
Poll::Ready(())
} else {
Poll::Pending
}
})
.await
}
pub async fn wait(&self) {
poll_fn(|cx| {
self.notify.register(cx.waker());

View File

@ -1,10 +1,4 @@
use alloc::{
borrow::ToOwned,
string::String,
sync::{Arc, Weak},
vec,
vec::Vec,
};
use alloc::{borrow::ToOwned, string::String, sync::Arc, vec, vec::Vec};
use bytemuck::Pod;
use kernel_arch::task::{TaskContext, UserContextInfo};
use libk_mm::{
@ -23,7 +17,7 @@ use yggdrasil_abi::{
use crate::{
task::{
mem::ForeignPointer,
process::{Process, ProcessImage},
process::{Process, ProcessCreateInfo, ProcessImage},
thread::Thread,
TaskContextImpl,
},
@ -37,7 +31,7 @@ pub mod elf;
pub type LoadedProcess = (Arc<Process>, Arc<Thread>);
pub struct LoadOptions<'e, P: AsRef<Path>> {
pub parent: Option<Weak<Process>>,
pub parent: Option<Arc<Process>>,
pub group_id: ProcessGroupId,
pub path: P,
pub args: &'e [&'e str],
@ -247,14 +241,14 @@ where
P: AsRef<Path>,
{
let context = setup_context(options, &space, &image, args, envs)?;
let (process, main) = Process::new_with_main(
let info = ProcessCreateInfo {
name,
options.group_id,
options.parent.clone(),
Arc::new(space),
context,
Some(image),
);
group_id: options.group_id,
space: Arc::new(space),
image: Some(image),
};
let (process, main) = Process::spawn(options.parent.as_ref(), info);
Ok((process, main))
}
@ -343,7 +337,7 @@ pub fn load_into<P: AsRef<Path>>(
args: Vec<String>,
envs: Vec<String>,
) -> Result<(TaskContextImpl, ProcessImage), Error> {
let process = options.parent.as_ref().unwrap().upgrade().unwrap();
let process = options.parent.as_ref().ok_or(Error::InvalidOperation)?;
let mut io = process.io.lock();
// Have to make the Path owned, going to drop the address space from which it came
let path = options.path.as_ref().to_owned();

View File

@ -24,7 +24,7 @@ use libk_util::{
};
use yggdrasil_abi::{
error::Error,
process::{ExitCode, ProcessGroupId, ProcessId, Signal, ThreadSpawnOptions},
process::{ExitCode, ProcessGroupId, ProcessId, Signal, ThreadSpawnOptions, WaitFlags},
};
use crate::{
@ -65,6 +65,7 @@ pub struct ProcessInner {
session_terminal: Option<NodeRef>,
threads: Vec<Arc<Thread>>,
children: BTreeMap<ProcessId, Weak<Process>>,
mutexes: BTreeMap<usize, Arc<UserspaceMutex>>,
space: Option<Arc<ProcessAddressSpace>>,
image: Option<ProcessImage>,
@ -84,38 +85,52 @@ pub struct Process {
signal_entry: AtomicUsize,
pub(crate) exit: OneTimeEvent<ExitCode>,
pub(crate) child_exit_notify: BoolEvent,
/// Process I/O information
pub io: IrqSafeSpinlock<ProcessIo>,
}
pub struct ProcessCreateInfo<S: Into<String>> {
pub name: S,
pub group_id: ProcessGroupId,
pub space: Arc<ProcessAddressSpace>,
pub image: Option<ProcessImage>,
// Main thread
pub context: TaskContextImpl,
}
impl Process {
/// Creates a new process with given main thread
pub fn new_with_main<S: Into<String>>(
name: S,
group_id: ProcessGroupId,
parent: Option<Weak<Self>>,
space: Arc<ProcessAddressSpace>,
context: TaskContextImpl,
image: Option<ProcessImage>,
pub fn spawn<S: Into<String>>(
parent: Option<&Arc<Self>>,
info: ProcessCreateInfo<S>,
) -> (Arc<Self>, Arc<Thread>) {
let name = name.into();
let name = info.name.into();
let id = ProcessId::new();
let process = Arc::new(Self {
name: name.clone(),
id,
parent,
parent: parent.map(Arc::downgrade),
inner: IrqSafeRwLock::new(ProcessInner::new(
id,
info.group_id,
Some(info.space.clone()),
info.image,
)),
signal_entry: AtomicUsize::new(0),
inner: IrqSafeRwLock::new(ProcessInner::new(id, group_id, Some(space.clone()), image)),
child_exit_notify: BoolEvent::new(),
exit: OneTimeEvent::new(),
io: IrqSafeSpinlock::new(ProcessIo::new()),
});
// Add a child if parent specified
if let Some(parent) = parent {
parent.inner.write().register_child(&process);
}
// Create "main" thread
let thread = Thread::new_uthread(process.id, Some(name), space, context);
let thread = Thread::new_uthread(process.id, Some(name), info.space, info.context);
process.inner.write().register_thread(thread.clone());
MANAGER.register_process(process.clone());
@ -123,6 +138,39 @@ impl Process {
(process, thread)
}
// /// Creates a new process with given main thread
// pub fn new_with_main<S: Into<String>>(
// name: S,
// group_id: ProcessGroupId,
// parent: Option<Weak<Self>>,
// space: Arc<ProcessAddressSpace>,
// context: TaskContextImpl,
// image: Option<ProcessImage>,
// ) -> (Arc<Self>, Arc<Thread>) {
// let name = name.into();
// let id = ProcessId::new();
// let process = Arc::new(Self {
// name: name.clone(),
// id,
// parent,
// signal_entry: AtomicUsize::new(0),
// inner: IrqSafeRwLock::new(ProcessInner::new(id, group_id, Some(space.clone()), image)),
// exit: OneTimeEvent::new(),
// io: IrqSafeSpinlock::new(ProcessIo::new()),
// });
// // Create "main" thread
// let thread = Thread::new_uthread(process.id, Some(name), space, context);
// process.inner.write().register_thread(thread.clone());
// MANAGER.register_process(process.clone());
// (process, thread)
// }
pub fn create_group() -> ProcessGroupId {
static ID: AtomicU32 = AtomicU32::new(1);
let id = ID.fetch_add(1, Ordering::AcqRel);
@ -163,34 +211,35 @@ impl Process {
unsafe fn fork_inner<F: ForkFrame<Context = TaskContextImpl>>(
self: &Arc<Self>,
frame: &F,
_frame: &F,
) -> Result<ProcessId, Error> {
let src_inner = self.inner.read();
let new_space = src_inner.space.as_ref().unwrap().fork()?;
let new_context = frame.fork(new_space.as_address_with_asid())?;
todo!()
// let src_inner = self.inner.read();
// let new_space = src_inner.space.as_ref().unwrap().fork()?;
// let new_context = frame.fork(new_space.as_address_with_asid())?;
let (new_process, new_main) = Self::new_with_main(
&self.name,
self.group_id(),
Some(Arc::downgrade(self)),
Arc::new(new_space),
new_context,
src_inner.image.clone(),
);
// let (new_process, new_main) = Self::new_with_main(
// &self.name,
// self.group_id(),
// Some(Arc::downgrade(self)),
// Arc::new(new_space),
// new_context,
// src_inner.image.clone(),
// );
{
let mut dst_io = new_process.io.lock();
let src_io = self.io.lock();
// {
// let mut dst_io = new_process.io.lock();
// let src_io = self.io.lock();
dst_io.fork_from(&src_io)?;
}
// dst_io.fork_from(&src_io)?;
// }
new_process.inherit(self)?;
// new_process.inherit(self)?;
log::info!("Process::fork -> {:?}", new_process.id);
new_main.enqueue();
// log::info!("Process::fork -> {:?}", new_process.id);
// new_main.enqueue();
Ok(new_process.id)
// Ok(new_process.id)
}
/// Performs a "fork" operation on the process, creating an identical copy of it, cloning
@ -314,6 +363,67 @@ impl Process {
self.exit.is_signalled()
}
pub fn wait_for_child(&self, id: ProcessId, flags: WaitFlags) -> Result<ExitCode, Error> {
// TODO ESRCH analog
let child = self
.inner
.read()
.strong_child(id)
.ok_or(Error::DoesNotExist)?;
if let Some(status) = child.get_exit_status() {
log::debug!("Child {id} exited with status {status:?} (didn't block)");
// TODO remove child
return Ok(status);
}
if flags.contains(WaitFlags::NON_BLOCKING) {
return Err(Error::WouldBlock);
}
block! {
loop {
if let Some(status) = child.get_exit_status() {
log::debug!("Child {id} exited with status {status:?}", );
// TODO remove child
break status;
}
self.child_exit_notify.wait_reset().await;
}
}
}
pub fn wait_for_any_child(&self, flags: WaitFlags) -> Result<(ProcessId, ExitCode), Error> {
if let Some(child) = self.inner.read().any_exited_child() {
let id = child.id;
// unwrap ok: ProcessInner tells the child already exited
let status = child.get_exit_status().unwrap();
log::debug!("Child {id} exited with status {status:?} (didn't block)");
// TODO remove child
return Ok((id, status));
}
if flags.contains(WaitFlags::NON_BLOCKING) {
return Err(Error::WouldBlock);
}
block! {
loop {
if let Some(child) = self.inner.read().any_exited_child() {
let id = child.id;
// unwrap ok: ProcessInner tells the child already exited
let status = child.get_exit_status().unwrap();
log::debug!("Child {id} exited with status {status:?}", );
// TODO remove child
break (id, status);
}
self.child_exit_notify.wait_reset().await;
}
}
}
pub async fn wait_for_exit(&self) -> ExitCode {
self.exit.wait_copy().await
}
@ -323,7 +433,9 @@ impl Process {
self.io.lock().handle_exit();
inner.threads.clear();
if let Some(space) = inner.space.take() {
space.clear();
if let Err(err) = space.clear() {
log::error!("Address space cleanup error: {err:?}");
}
}
}
@ -415,6 +527,11 @@ impl Process {
log::debug!("Last thread of {} exited", self.id);
self.cleanup(inner);
self.exit.signal(code);
if let Some(parent) = self.parent.as_ref().and_then(Weak::upgrade) {
log::info!("{}: notify parent ({}) of exit", self.id, parent.id);
// Notify parent of child exit
parent.child_exit_notify.signal_saturating();
}
}
}
@ -477,6 +594,7 @@ impl ProcessInner {
group_id,
session_terminal: None,
threads: Vec::new(),
children: BTreeMap::new(),
mutexes: BTreeMap::new(),
image,
@ -491,6 +609,22 @@ impl ProcessInner {
self.threads.push(thread);
}
pub fn register_child(&mut self, child: &Arc<Process>) {
self.children.insert(child.id, Arc::downgrade(child));
}
pub fn strong_child(&self, id: ProcessId) -> Option<Arc<Process>> {
self.children.get(&id).and_then(Weak::upgrade)
}
pub fn any_exited_child(&self) -> Option<Arc<Process>> {
// TODO not really efficient, bleh
self.children
.values()
.filter_map(Weak::upgrade)
.find(|child| child.has_exited())
}
pub fn remove_thread(&mut self, id: ThreadId) -> bool {
let n = self.threads.len();
self.threads.retain(|t| t.id != id);

View File

@ -6,7 +6,7 @@ pub(crate) use abi::{
},
mem::{MappingFlags, MappingSource},
net::SocketType,
process::{Signal, SignalEntryData, SpawnOptions},
process::{Signal, SignalEntryData, SpawnOptions, WaitFlags},
system::SystemInfo,
};
use abi::{

View File

@ -5,8 +5,8 @@ use abi::{
io::DeviceRequest,
mem::{MappingFlags, MappingSource},
process::{
ExitCode, MutexOperation, ProcessGroupId, ProcessId, ProcessOption, Signal, SpawnFlags,
SpawnOption, SpawnOptions, ThreadOption, ThreadSpawnOptions,
ExitCode, MutexOperation, ProcessGroupId, ProcessId, ProcessOption, ProcessWait, Signal,
SpawnFlags, SpawnOption, SpawnOptions, ThreadOption, ThreadSpawnOptions, WaitFlags,
},
};
use alloc::sync::Arc;
@ -115,7 +115,7 @@ pub(crate) fn spawn_process(options: &SpawnOptions<'_>) -> Result<ProcessId, Err
// Setup a new process from the file
let load_options = LoadOptions {
group_id: process.group_id(),
parent: Some(Arc::downgrade(&process)),
parent: Some(process.clone()),
path: options.program,
args: options.arguments,
envs: options.arguments,
@ -184,10 +184,25 @@ pub(crate) fn spawn_process(options: &SpawnOptions<'_>) -> Result<ProcessId, Err
})
}
pub(crate) fn wait_process(pid: ProcessId, status: &mut ExitCode) -> Result<(), Error> {
let target = Process::get(pid).ok_or(Error::DoesNotExist)?;
*status = block!(target.wait_for_exit().await)?;
Ok(())
pub(crate) fn wait_process(
wait: &ProcessWait,
status: &mut ExitCode,
flags: WaitFlags,
) -> Result<ProcessId, Error> {
let thread = Thread::current();
let process = thread.process();
match wait {
&ProcessWait::Process(id) => {
*status = process.wait_for_child(id, flags)?;
Ok(id)
}
ProcessWait::Group(_id) => todo!(),
ProcessWait::AnyChild => {
let (id, exit) = process.wait_for_any_child(flags)?;
*status = exit;
Ok(id)
}
}
}
pub(crate) fn get_pid() -> ProcessId {
@ -222,6 +237,11 @@ pub(crate) fn nanosleep(
}
pub(crate) fn send_signal(pid: ProcessId, signal: Signal) -> Result<(), Error> {
// Debug is only issued by kernel for internal purposes of interrupting a thread when someone
// attaches to it
if signal == Signal::Debug {
return Err(Error::InvalidArgument);
}
let thread = Thread::current();
let target = Process::get(pid).ok_or(Error::DoesNotExist)?;
target.raise_signal(Some(thread.id), signal);
@ -301,9 +321,7 @@ pub(crate) fn wait_thread(id: u32) -> Result<(), Error> {
let this_thread = Thread::current();
let process = this_thread.process();
log::debug!("wait_thread({id})");
let result = block!(process.wait_for_thread(tid).await)?;
log::debug!("www -> {result:?}");
result
}

View File

@ -1,4 +1,4 @@
// vi:syntax=yggdrasil_abi:
// vi:syntax=yggdrasil-abi:
enum Signal(u32) {
/// Process has tried to perform an illegal memory operation
@ -21,6 +21,13 @@ newtype ProcessId(u32);
newtype ProcessGroupId(u32);
newtype ThreadId(u32);
#[default(_)]
bitfield WaitFlags(u32) {
/// Makes the wait call return immediately. If the requested query didn't match any exited
/// processes, will return Error::WouldBlock instead.
NON_BLOCKING: 1,
}
// Spawn
#[default(_)]
bitfield SpawnFlags(u32) {

View File

@ -15,6 +15,7 @@ extern {
type ExitCode = yggdrasil_abi::process::ExitCode;
type ThreadOption = yggdrasil_abi::process::ThreadOption;
type ProcessOption = yggdrasil_abi::process::ProcessOption;
type ProcessWait = yggdrasil_abi::process::ProcessWait;
type SocketAddr = core::net::SocketAddr;
type SocketOption = yggdrasil_abi::net::SocketOption;
@ -77,7 +78,7 @@ syscall get_process_group_id() -> ProcessGroupId;
syscall exit_process(code: ExitCode) -> !;
syscall spawn_process(options: &SpawnOptions<'_>) -> Result<ProcessId>;
syscall wait_process(pid: ProcessId, status: &mut ExitCode) -> Result<()>;
syscall wait_process(wait: &ProcessWait, status: &mut ExitCode, flags: WaitFlags) -> Result<ProcessId>;
syscall get_pid() -> ProcessId;
syscall get_tid() -> u32;

View File

@ -9,7 +9,7 @@ pub mod thread;
pub use crate::generated::{
ExecveOptions, ProcessGroupId, ProcessId, Signal, SignalEntryData, SpawnFlags, SpawnOptions,
ThreadId, ThreadSpawnOptions,
ThreadId, ThreadSpawnOptions, WaitFlags,
};
pub use exit::ExitCode;
pub use thread::ThreadOption;
@ -29,6 +29,13 @@ pub mod auxv {
pub const NULL: u64 = 0x00;
}
#[derive(Debug)]
pub enum ProcessWait {
Process(ProcessId),
Group(ProcessGroupId),
AnyChild,
}
#[derive(Debug)]
pub enum ProcessOption {
SignalEntry(usize),

View File

@ -5,8 +5,8 @@ use core::{mem::MaybeUninit, time::Duration};
use abi::error::Error;
pub use abi::process::{
auxv, AuxValue, AuxValueIter, ExecveOptions, ExitCode, MutexOperation, ProcessGroupId,
ProcessId, ProcessInfoElement, ProgramArgumentInner, Signal, SignalEntryData, SpawnFlags,
SpawnOption, SpawnOptions, StringArgIter, ThreadId, ThreadSpawnOptions,
ProcessId, ProcessInfoElement, ProcessWait, ProgramArgumentInner, Signal, SignalEntryData,
SpawnFlags, SpawnOption, SpawnOptions, StringArgIter, ThreadId, ThreadSpawnOptions, WaitFlags,
};
use crate::sys;

View File

@ -24,7 +24,7 @@ mod generated {
net::SocketType,
process::{
ExecveOptions, ProcessGroupId, ProcessId, ProcessOption, Signal, SignalEntryData,
SpawnOptions, ThreadSpawnOptions,
SpawnOptions, ThreadSpawnOptions, WaitFlags,
},
SyscallFunction,
};

View File

@ -1,4 +1,4 @@
#![feature(yggdrasil_os)]
#![feature(yggdrasil_os, never_type)]
use std::{
fmt,
@ -155,7 +155,7 @@ fn handle_message(msg: InitMsg) -> io::Result<()> {
}
}
fn main_loop(channel: MessageChannel) -> io::Result<()> {
fn main_loop(channel: MessageChannel) -> io::Result<!> {
let mut buf = [0; 1024];
loop {
@ -190,11 +190,7 @@ fn main() -> ExitCode {
}
}
match main_loop(channel) {
Ok(_) => ExitCode::SUCCESS,
Err(e) => {
debug_trace!("init: main_loop returned {}", e);
ExitCode::FAILURE
}
}
let Err(error) = main_loop(channel);
debug_trace!("init: main_loop returned {error}");
ExitCode::FAILURE
}

View File

@ -42,7 +42,6 @@ fn exec_script<P: AsRef<Path>>(path: P, arg: &str) -> Result<(), Error> {
yggdrasil_rt::debug_trace!("rc: {:?} {}", path, arg);
// TODO run those in parallel, if allowed
// TODO binfmt guessing
let mut process = Command::new(path).arg(arg).spawn()?;
if !process.wait()?.success() {