feature: add threads (WIP)

This commit is contained in:
Mark Poliakov 2021-11-17 13:05:51 +02:00
parent 6bb4f38edc
commit adb95ac52e
16 changed files with 746 additions and 367 deletions

1
Cargo.lock generated
View File

@ -250,6 +250,7 @@ checksum = "1230ec65f13e0f9b28d789da20d2d419511893ea9dac2c1f4ef67b8b14e5da80"
name = "user"
version = "0.1.0"
dependencies = [
"lazy_static",
"libusr",
]

View File

@ -4,7 +4,7 @@ use crate::arch::machine;
use crate::debug::Level;
use crate::dev::irq::{IntController, IrqContext};
use crate::mem;
use crate::proc::{sched, Process};
use crate::proc::{sched, Thread, Process};
use crate::syscall;
use cortex_a::registers::{ESR_EL1, FAR_EL1};
use libsys::{abi, signal::Signal};
@ -90,7 +90,8 @@ extern "C" fn __aa64_exc_sync_handler(exc: &mut ExceptionFrame) {
let far = FAR_EL1.get() as usize;
if far < mem::KERNEL_OFFSET && sched::is_ready() {
let proc = Process::current();
let thread = Thread::current();
let proc = thread.owner().unwrap();
if proc
.manipulate_space(|space| space.try_cow_copy(far))
@ -98,7 +99,7 @@ extern "C" fn __aa64_exc_sync_handler(exc: &mut ExceptionFrame) {
{
// Kill program
dump_data_abort(Level::Error, esr, far as u64);
proc.enter_signal(Signal::SegmentationFault);
proc.enter_fault_signal(thread, Signal::SegmentationFault);
}
unsafe {
@ -138,6 +139,11 @@ extern "C" fn __aa64_exc_sync_handler(exc: &mut ExceptionFrame) {
_ => {}
}
if sched::is_ready() {
let thread = Thread::current();
errorln!("Unhandled exception in thread {}, {:?}", thread.id(), thread.owner().map(|e| e.id()));
}
errorln!(
"Unhandled exception at ELR={:#018x}, ESR={:#010x}",
exc.elr_el1,

View File

@ -3,7 +3,7 @@
use crate::config::{ConfigKey, CONFIG};
use crate::fs::{devfs, MemfsBlockAlloc};
use crate::mem;
use crate::proc::{elf, Process};
use crate::proc::{wait, elf, Process};
use libsys::stat::{FileDescriptor, OpenFlags};
use memfs::Ramfs;
use vfs::{Filesystem, Ioctx};

View File

@ -6,8 +6,11 @@ use alloc::collections::BTreeMap;
use libsys::proc::Pid;
pub mod elf;
pub mod thread;
pub use thread::{Thread, ThreadRef, State as ThreadState};
pub(self) use thread::Context;
pub mod process;
pub use process::{Process, ProcessRef, State as ProcessState};
pub use process::{Process, ProcessRef, ProcessState};
pub mod io;
pub use io::ProcessIo;
@ -52,6 +55,9 @@ pub fn process(id: Pid) -> ProcessRef {
pub(self) static PROCESSES: IrqSafeSpinLock<BTreeMap<Pid, ProcessRef>> =
IrqSafeSpinLock::new(BTreeMap::new());
pub(self) static THREADS: IrqSafeSpinLock<BTreeMap<u32, ThreadRef>> =
IrqSafeSpinLock::new(BTreeMap::new());
/// Sets up initial process and enters it.
///
/// See [Scheduler::enter]
@ -61,6 +67,6 @@ pub(self) static PROCESSES: IrqSafeSpinLock<BTreeMap<Pid, ProcessRef>> =
/// Unsafe: May only be called once.
pub unsafe fn enter() -> ! {
SCHED.init();
SCHED.enqueue(Process::new_kernel(init::init_fn, 0).unwrap().id());
Process::new_kernel(init::init_fn, 0).unwrap().enqueue();
SCHED.enter();
}

View File

@ -5,50 +5,45 @@ use crate::mem::{
phys::{self, PageUsage},
virt::{MapAttributes, Space},
};
use crate::proc::{wait::Wait, ProcessIo, PROCESSES, SCHED};
use crate::sync::IrqSafeSpinLock;
use alloc::rc::Rc;
use crate::proc::{
wait::Wait, Context, ProcessIo, Thread, ThreadRef, ThreadState, PROCESSES, SCHED, THREADS,
};
use crate::sync::{IrqSafeSpinLock, IrqSafeSpinLockGuard};
use alloc::{rc::Rc, vec::Vec};
use core::cell::UnsafeCell;
use core::sync::atomic::{AtomicU32, Ordering};
use libsys::{error::Errno, signal::Signal, proc::{ExitCode, Pid}};
pub use crate::arch::platform::context::{self, Context};
use libsys::{
error::Errno,
proc::{ExitCode, Pid},
signal::Signal,
};
/// Wrapper type for a process struct reference
pub type ProcessRef = Rc<Process>;
/// List of possible process states
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum State {
/// Process is ready to be executed and/or is scheduled for it
Ready,
/// Process is currently running or is in system call/interrupt handler
Running,
pub enum ProcessState {
/// Process is alive
Active,
/// Process has finished execution and is waiting to be reaped
Finished,
/// Process is waiting for some external event
Waiting,
}
struct ProcessInner {
space: Option<&'static mut Space>,
state: State,
state: ProcessState,
id: Pid,
wait_flag: bool,
exit: Option<ExitCode>,
signal_entry: usize,
signal_stack: usize,
threads: Vec<u32>,
}
/// Structure describing an operating system process
#[allow(dead_code)]
pub struct Process {
ctx: UnsafeCell<Context>,
signal_ctx: UnsafeCell<Context>,
inner: IrqSafeSpinLock<ProcessInner>,
exit_wait: Wait,
signal_state: AtomicU32,
signal_pending: AtomicU32,
/// Process I/O context
pub io: IrqSafeSpinLock<ProcessIo>,
}
@ -57,9 +52,53 @@ impl Process {
const USTACK_VIRT_TOP: usize = 0x100000000;
const USTACK_PAGES: usize = 4;
/// Returns currently executing process
#[inline]
pub fn id(&self) -> Pid {
self.inner.lock().id
}
#[inline]
pub fn current() -> ProcessRef {
SCHED.current_process()
Thread::current().owner().unwrap()
}
#[inline]
pub fn manipulate_space<F>(&self, f: F) -> Result<(), Errno>
where
F: FnOnce(&mut Space) -> Result<(), Errno>,
{
f(self.inner.lock().space.as_mut().unwrap())
}
pub fn new_kernel(entry: extern "C" fn(usize) -> !, arg: usize) -> Result<ProcessRef, Errno> {
let id = new_kernel_pid();
let thread = Thread::new_kernel(Some(id), entry, arg)?;
let mut inner = ProcessInner {
threads: Vec::new(),
id,
exit: None,
space: None,
state: ProcessState::Active,
};
inner.threads.push(thread.id());
let res = Rc::new(Self {
exit_wait: Wait::new(),
io: IrqSafeSpinLock::new(ProcessIo::new()),
signal_state: AtomicU32::new(0),
inner: IrqSafeSpinLock::new(inner),
});
debugln!("New kernel process: {:?}", id);
let prev = PROCESSES.lock().insert(id, res.clone());
assert!(prev.is_none());
Ok(res)
}
pub fn enqueue(&self) {
let inner = self.inner.lock();
for &tid in inner.threads.iter() {
SCHED.enqueue(tid);
}
}
/// Returns process (if any) to which `pid` refers
@ -69,201 +108,55 @@ impl Process {
/// Sets a pending signal for a process
pub fn set_signal(&self, signal: Signal) {
let lock = self.inner.lock();
let mut lock = self.inner.lock();
let main_thread = Thread::get(lock.threads[0]).unwrap();
match lock.state {
State::Running => {
drop(lock);
self.enter_signal(signal);
// TODO check that `signal` is not a fault signal
// it is illegal to call this function with
// fault signals
match main_thread.state() {
ThreadState::Running => {
Process::enter_signal_on(lock, main_thread, signal);
}
State::Waiting => {
ThreadState::Waiting => {
// TODO abort whatever the process is waiting for
todo!()
}
State::Ready => {
ThreadState::Ready => {
todo!()
}
State::Finished => {
ThreadState::Finished => {
// TODO report error back
todo!()
}
}
}
/// Switches current thread back from signal handler
pub fn return_from_signal(&self) {
if self.signal_pending.load(Ordering::Acquire) == 0 {
panic!("TODO handle cases when returning from no signal");
}
self.signal_pending.store(0, Ordering::Release);
let src_ctx = self.signal_ctx.get();
let dst_ctx = self.ctx.get();
assert_eq!(self.inner.lock().state, State::Running);
unsafe {
(&mut *src_ctx).switch(&mut *dst_ctx);
}
fn enter_signal_on(mut inner: IrqSafeSpinLockGuard<ProcessInner>, thread: ThreadRef, signal: Signal) {
let ttbr0 =
inner.space.as_mut().unwrap().address_phys() | ((inner.id.asid() as usize) << 48);
drop(inner);
thread.enter_signal(signal, ttbr0);
}
/// Switches current thread to a signal handler
pub fn enter_signal(&self, signal: Signal) {
if self
.signal_pending
.compare_exchange_weak(0, signal as u32, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
panic!("Already handling a signal (maybe handle this case)");
pub fn enter_fault_signal(&self, thread: ThreadRef, signal: Signal) {
let lock = self.inner.lock();
Process::enter_signal_on(lock, thread, signal);
}
pub fn new_user_thread(&self, entry: usize, stack: usize, arg: usize) -> Result<u32, Errno> {
let mut lock = self.inner.lock();
let signal_ctx = unsafe { &mut *self.signal_ctx.get() };
let dst_id = lock.id;
let dst_space_phys = lock.space.as_mut().unwrap().address_phys();
let dst_ttbr0 = dst_space_phys | ((dst_id.asid() as usize) << 48);
let space_phys = lock.space.as_mut().unwrap().address_phys();
let ttbr0 = space_phys | ((lock.id.asid() as usize) << 48);
debugln!(
"Signal entry: pc={:#x}, sp={:#x}, ttbr0={:#x}",
lock.signal_entry,
lock.signal_stack,
dst_ttbr0
);
assert_eq!(lock.state, State::Running);
let thread = Thread::new_user(lock.id, entry, stack, arg, ttbr0)?;
let tid = thread.id();
lock.threads.push(tid);
SCHED.enqueue(tid);
unsafe {
signal_ctx.setup_signal_entry(
lock.signal_entry,
signal as usize,
dst_ttbr0,
lock.signal_stack,
);
}
let src_ctx = self.ctx.get();
drop(lock);
unsafe {
(&mut *src_ctx).switch(signal_ctx);
}
}
/// Sets up values needed for signal entry
pub fn setup_signal_context(&self, entry: usize, stack: usize) {
let mut lock = self.inner.lock();
lock.signal_entry = entry;
lock.signal_stack = stack;
}
/// Schedules an initial thread for execution
///
/// # Safety
///
/// Unsafe: only allowed to be called once, repeated calls
/// will generate undefined behavior
pub unsafe fn enter(proc: ProcessRef) -> ! {
// FIXME use some global lock to guarantee atomicity of thread entry?
proc.inner.lock().state = State::Running;
proc.current_context().enter()
}
/// Executes a function allowing mutation of the process address space
#[inline]
pub fn manipulate_space<F: FnOnce(&mut Space) -> Result<(), Errno>>(
&self,
f: F,
) -> Result<(), Errno> {
f(self.inner.lock().space.as_mut().unwrap())
}
#[allow(clippy::mut_from_ref)]
fn current_context(&self) -> &mut Context {
if self.signal_pending.load(Ordering::Acquire) != 0 {
unsafe { &mut *self.signal_ctx.get() }
} else {
unsafe { &mut *self.ctx.get() }
}
}
/// Schedules a next thread for execution
///
/// # Safety
///
/// Unsafe:
///
/// * Does not ensure src and dst threads are not the same thread
/// * Does not ensure src is actually current context
pub unsafe fn switch(src: ProcessRef, dst: ProcessRef, discard: bool) {
{
let mut src_lock = src.inner.lock();
let mut dst_lock = dst.inner.lock();
if !discard {
assert_eq!(src_lock.state, State::Running);
src_lock.state = State::Ready;
}
assert!(dst_lock.state == State::Ready || dst_lock.state == State::Waiting);
dst_lock.state = State::Running;
}
let src_ctx = src.current_context();
let dst_ctx = dst.current_context();
(&mut *src_ctx).switch(&mut *dst_ctx);
}
/// Suspends current process with a "waiting" status
pub fn enter_wait(&self) {
let drop = {
let mut lock = self.inner.lock();
let drop = lock.state == State::Running;
lock.state = State::Waiting;
SCHED.dequeue(lock.id);
drop
};
if drop {
SCHED.switch(true);
}
}
/// Changes process wait condition status
pub fn set_wait_flag(&self, v: bool) {
self.inner.lock().wait_flag = v;
}
/// Returns `true` if process wait condition has not been reached
pub fn wait_flag(&self) -> bool {
self.inner.lock().wait_flag
}
/// Returns the process ID
pub fn id(&self) -> Pid {
self.inner.lock().id
}
/// Creates a new kernel process
pub fn new_kernel(entry: extern "C" fn(usize) -> !, arg: usize) -> Result<ProcessRef, Errno> {
let id = new_kernel_pid();
let res = Rc::new(Self {
ctx: UnsafeCell::new(Context::kernel(entry as usize, arg)),
signal_ctx: UnsafeCell::new(Context::empty()),
io: IrqSafeSpinLock::new(ProcessIo::new()),
exit_wait: Wait::new(),
signal_state: AtomicU32::new(0),
signal_pending: AtomicU32::new(0),
inner: IrqSafeSpinLock::new(ProcessInner {
signal_entry: 0,
signal_stack: 0,
id,
exit: None,
space: None,
wait_flag: false,
state: State::Ready,
}),
});
debugln!("New kernel process: {:?}", id);
assert!(PROCESSES.lock().insert(id, res.clone()).is_none());
Ok(res)
Ok(tid)
}
/// Creates a "fork" of the process, cloning its address space and
@ -277,40 +170,53 @@ impl Process {
let dst_space_phys = (dst_space as *mut _ as usize) - mem::KERNEL_OFFSET;
let dst_ttbr0 = dst_space_phys | ((dst_id.asid() as usize) << 48);
let mut threads = Vec::new();
let tid = Thread::fork(Some(dst_id), frame, dst_ttbr0)?.id();
threads.push(tid);
let dst = Rc::new(Self {
ctx: UnsafeCell::new(Context::fork(frame, dst_ttbr0)),
signal_ctx: UnsafeCell::new(Context::empty()),
io: IrqSafeSpinLock::new(src_io.fork()?),
exit_wait: Wait::new(),
io: IrqSafeSpinLock::new(src_io.fork()?),
signal_state: AtomicU32::new(0),
signal_pending: AtomicU32::new(0),
inner: IrqSafeSpinLock::new(ProcessInner {
signal_entry: 0,
signal_stack: 0,
id: dst_id,
threads,
exit: None,
space: Some(dst_space),
state: State::Ready,
wait_flag: false,
state: ProcessState::Active,
id: dst_id,
}),
});
debugln!("Process {:?} forked into {:?}", src_inner.id, dst_id);
assert!(PROCESSES.lock().insert(dst_id, dst).is_none());
SCHED.enqueue(dst_id);
SCHED.enqueue(tid);
Ok(dst_id)
}
// TODO a way to terminate a single thread?
/// Terminates a process.
pub fn exit<I: Into<ExitCode>>(&self, status: I) {
pub fn exit<I: Into<ExitCode>>(status: I) {
unsafe {
asm!("msr daifclr, #0xF");
}
let status = status.into();
let drop = {
let mut lock = self.inner.lock();
let drop = lock.state == State::Running;
let thread = Thread::current();
let process = thread.owner().unwrap();
let mut lock = process.inner.lock();
infoln!("Process {:?} is exiting: {:?}", lock.id, status);
assert!(lock.exit.is_none());
lock.exit = Some(status);
lock.state = State::Finished;
lock.state = ProcessState::Finished;
for &tid in lock.threads.iter() {
debugln!("Dequeue {:?}", tid);
Thread::get(tid).unwrap().terminate();
SCHED.dequeue(tid);
}
SCHED.debug();
if let Some(space) = lock.space.take() {
unsafe {
@ -319,21 +225,18 @@ impl Process {
}
}
self.io.lock().handle_exit();
process.io.lock().handle_exit();
SCHED.dequeue(lock.id);
drop
};
self.exit_wait.wakeup_all();
if drop {
drop(lock);
process.exit_wait.wakeup_all();
SCHED.switch(true);
panic!("This code should never run");
}
}
fn collect(&self) -> Option<ExitCode> {
let lock = self.inner.lock();
if lock.state == State::Finished {
if lock.state == ProcessState::Finished {
lock.exit
} else {
None
@ -370,33 +273,36 @@ impl Process {
asm!("msr daifset, #2");
}
let proc = SCHED.current_process();
let mut lock = proc.inner.lock();
if lock.id.is_kernel() {
let mut proc_lock = PROCESSES.lock();
let old_pid = lock.id;
assert!(
proc_lock.remove(&old_pid).is_some(),
"Failed to downgrade kernel process (remove kernel pid)"
);
lock.id = new_user_pid();
debugln!(
"Process downgrades from kernel to user: {:?} -> {:?}",
old_pid,
lock.id
);
assert!(proc_lock.insert(lock.id, proc.clone()).is_none());
unsafe {
SCHED.hack_current_pid(lock.id);
let proc = Process::current();
let mut process_lock = proc.inner.lock();
if process_lock.threads.len() != 1 {
todo!();
}
let thread = Thread::get(process_lock.threads[0]).unwrap();
if process_lock.id.is_kernel() {
let mut processes = PROCESSES.lock();
let old_pid = process_lock.id;
let new_pid = new_user_pid();
debugln!("Downgrading process {:?} -> {:?}", old_pid, new_pid);
let r = processes.remove(&old_pid);
assert!(r.is_some());
process_lock.id = new_pid;
let r = processes.insert(new_pid, proc.clone());
assert!(r.is_none());
} else {
// Invalidate user ASID
let input = (lock.id.asid() as usize) << 48;
let input = (process_lock.id.asid() as usize) << 48;
unsafe {
asm!("tlbi aside1, {}", in(reg) input);
}
}
thread.set_owner(process_lock.id);
proc.io.lock().handle_cloexec();
let new_space = Space::alloc_empty()?;
@ -419,22 +325,20 @@ impl Process {
debugln!("Will now enter at {:#x}", entry);
// TODO drop old address space
lock.space = Some(new_space);
process_lock.space = Some(new_space);
unsafe {
// TODO drop old context
let ctx = proc.ctx.get();
let ctx = thread.ctx.get();
ctx.write(Context::user(
entry,
arg,
new_space_phys | ((lock.id.asid() as usize) << 48),
new_space_phys | ((process_lock.id.asid() as usize) << 48),
Self::USTACK_VIRT_TOP,
));
assert_eq!(lock.state, State::Running);
drop(lock);
drop(process_lock);
(*ctx).enter();
}

View File

@ -1,13 +1,13 @@
//!
use crate::proc::{Pid, Process, ProcessRef, PROCESSES};
use crate::proc::{Pid, Process, ProcessRef, Thread, ThreadRef, PROCESSES, THREADS};
use crate::sync::IrqSafeSpinLock;
use crate::util::InitOnce;
use alloc::{collections::VecDeque, rc::Rc};
struct SchedulerInner {
queue: VecDeque<Pid>,
idle: Option<Pid>,
current: Option<Pid>,
queue: VecDeque<u32>,
idle: Option<u32>,
current: Option<u32>,
}
/// Process scheduler state and queues
@ -23,7 +23,7 @@ impl SchedulerInner {
current: None,
};
this.idle = Some(Process::new_kernel(idle_fn, 0).unwrap().id());
this.idle = Some(Thread::new_kernel(None, idle_fn, 0).unwrap().id());
this
}
@ -39,13 +39,21 @@ impl Scheduler {
}
/// Schedules a thread for execution
pub fn enqueue(&self, pid: Pid) {
self.inner.get().lock().queue.push_back(pid);
pub fn enqueue(&self, tid: u32) {
self.inner.get().lock().queue.push_back(tid);
}
/// Removes given `pid` from execution queue
pub fn dequeue(&self, pid: Pid) {
self.inner.get().lock().queue.retain(|&p| p != pid)
/// Removes given `tid` from execution queue
pub fn dequeue(&self, tid: u32) {
self.inner.get().lock().queue.retain(|&p| p != tid)
}
pub fn debug(&self) {
let lock = self.inner.get().lock();
debugln!("Scheduler queue:");
for &tid in lock.queue.iter() {
debugln!("TID: {:?}", tid);
}
}
/// Performs initial process entry.
@ -63,11 +71,11 @@ impl Scheduler {
};
inner.current = Some(id);
PROCESSES.lock().get(&id).unwrap().clone()
THREADS.lock().get(&id).unwrap().clone()
};
asm!("msr daifset, #2");
Process::enter(thread)
Thread::enter(thread)
}
/// This hack is required to be called from execve() when downgrading current
@ -76,8 +84,14 @@ impl Scheduler {
/// # Safety
///
/// Unsafe: only allowed to be called from Process::execve()
pub unsafe fn hack_current_pid(&self, new: Pid) {
self.inner.get().lock().current = Some(new);
pub unsafe fn hack_current_tid(&self, old: u32, new: u32) {
let mut lock = self.inner.get().lock();
match lock.current {
Some(t) if t == old => {
lock.current = Some(new);
}
_ => {}
}
}
/// Switches to the next task scheduled for execution. If there're
@ -87,7 +101,7 @@ impl Scheduler {
let mut inner = self.inner.get().lock();
let current = inner.current.unwrap();
if !discard && current != Pid::IDLE {
if !discard && current != 0 {
// Put the process into the back of the queue
inner.queue.push_back(current);
}
@ -100,7 +114,7 @@ impl Scheduler {
inner.current = Some(next);
let (from, to) = {
let lock = PROCESSES.lock();
let lock = THREADS.lock();
(
lock.get(&current).unwrap().clone(),
lock.get(&next).unwrap().clone(),
@ -113,17 +127,23 @@ impl Scheduler {
if !Rc::ptr_eq(&from, &to) {
unsafe {
asm!("msr daifset, #2");
Process::switch(from, to, discard);
Thread::switch(from, to, discard);
}
}
}
/// Returns a Rc-reference to currently running process
pub fn current_process(&self) -> ProcessRef {
pub fn current_thread(&self) -> ThreadRef {
let inner = self.inner.get().lock();
let current = inner.current.unwrap();
PROCESSES.lock().get(&current).unwrap().clone()
let id = inner.current.unwrap();
THREADS.lock().get(&id).unwrap().clone()
}
// /// Returns a Rc-reference to currently running process
// pub fn current_process(&self) -> ProcessRef {
// let inner = self.inner.get().lock();
// let current = inner.current.unwrap();
// PROCESSES.lock().get(&current).unwrap().clone()
// }
}
/// Returns `true` if the scheduler has been initialized

325
kernel/src/proc/thread.rs Normal file
View File

@ -0,0 +1,325 @@
use crate::arch::aarch64::exception::ExceptionFrame;
use crate::proc::{wait::Wait, Process, ProcessRef, SCHED, THREADS};
use crate::sync::IrqSafeSpinLock;
use alloc::{rc::Rc, vec::Vec};
use core::cell::UnsafeCell;
use core::sync::atomic::{AtomicU32, Ordering};
use libsys::{error::Errno, proc::Pid, signal::Signal};
pub use crate::arch::platform::context::{self, Context};
pub type ThreadRef = Rc<Thread>;
/// List of possible process states
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum State {
/// Process is ready to be executed and/or is scheduled for it
Ready,
/// Process is currently running or is in system call/interrupt handler
Running,
/// Process has finished execution and is waiting to be reaped
Finished,
/// Process is waiting for some external event
Waiting,
}
struct ThreadInner {
id: u32,
state: State,
owner: Option<Pid>,
pending_wait: Option<&'static Wait>,
wait_flag: bool,
signal_entry: usize,
signal_stack: usize,
}
pub struct Thread {
inner: IrqSafeSpinLock<ThreadInner>,
pub(super) ctx: UnsafeCell<Context>,
signal_ctx: UnsafeCell<Context>,
signal_pending: AtomicU32,
}
impl Thread {
#[inline]
pub fn current() -> ThreadRef {
SCHED.current_thread()
}
#[inline]
pub fn get(tid: u32) -> Option<ThreadRef> {
THREADS.lock().get(&tid).cloned()
}
#[inline]
pub fn owner(&self) -> Option<ProcessRef> {
self.inner.lock().owner.and_then(Process::get)
}
/// Creates a new kernel process
pub fn new_kernel(
owner: Option<Pid>,
entry: extern "C" fn(usize) -> !,
arg: usize,
) -> Result<ThreadRef, Errno> {
let id = new_tid();
let res = Rc::new(Self {
ctx: UnsafeCell::new(Context::kernel(entry as usize, arg)),
signal_ctx: UnsafeCell::new(Context::empty()),
signal_pending: AtomicU32::new(0),
inner: IrqSafeSpinLock::new(ThreadInner {
signal_entry: 0,
signal_stack: 0,
id,
owner,
pending_wait: None,
wait_flag: false,
state: State::Ready,
}),
});
debugln!("New kernel thread: {:?}", id);
assert!(THREADS.lock().insert(id, res.clone()).is_none());
Ok(res)
}
/// Creates a new userspace process
pub fn new_user(
owner: Pid,
entry: usize,
stack: usize,
arg: usize,
ttbr0: usize,
) -> Result<ThreadRef, Errno> {
let id = new_tid();
let res = Rc::new(Self {
ctx: UnsafeCell::new(Context::user(entry, arg, ttbr0, stack)),
signal_ctx: UnsafeCell::new(Context::empty()),
signal_pending: AtomicU32::new(0),
inner: IrqSafeSpinLock::new(ThreadInner {
signal_entry: 0,
signal_stack: 0,
id,
owner: Some(owner),
pending_wait: None,
wait_flag: false,
state: State::Ready,
}),
});
debugln!("New userspace thread: {:?}", id);
assert!(THREADS.lock().insert(id, res.clone()).is_none());
Ok(res)
}
pub fn fork(
owner: Option<Pid>,
frame: &ExceptionFrame,
ttbr0: usize,
) -> Result<ThreadRef, Errno> {
let id = new_tid();
let res = Rc::new(Self {
ctx: UnsafeCell::new(Context::fork(frame, ttbr0)),
signal_ctx: UnsafeCell::new(Context::empty()),
signal_pending: AtomicU32::new(0),
inner: IrqSafeSpinLock::new(ThreadInner {
signal_entry: 0,
signal_stack: 0,
id,
owner,
pending_wait: None,
wait_flag: false,
state: State::Ready,
}),
});
debugln!("Forked new user thread: {:?}", id);
assert!(THREADS.lock().insert(id, res.clone()).is_none());
Ok(res)
}
#[inline]
pub fn id(&self) -> u32 {
self.inner.lock().id
}
/// Schedules an initial thread for execution
///
/// # Safety
///
/// Unsafe: only allowed to be called once, repeated calls
/// will generate undefined behavior
pub unsafe fn enter(thread: ThreadRef) -> ! {
// FIXME use some global lock to guarantee atomicity of thread entry?
thread.inner.lock().state = State::Running;
thread.current_context().enter()
}
/// Schedules a next thread for execution
///
/// # Safety
///
/// Unsafe:
///
/// * Does not ensure src and dst threads are not the same thread
/// * Does not ensure src is actually current context
pub unsafe fn switch(src: ThreadRef, dst: ThreadRef, discard: bool) {
{
let mut src_lock = src.inner.lock();
let mut dst_lock = dst.inner.lock();
if !discard {
assert_eq!(src_lock.state, State::Running);
src_lock.state = State::Ready;
}
assert!(dst_lock.state == State::Ready || dst_lock.state == State::Waiting);
dst_lock.state = State::Running;
}
let src_ctx = src.current_context();
let dst_ctx = dst.current_context();
(&mut *src_ctx).switch(&mut *dst_ctx);
}
#[allow(clippy::mut_from_ref)]
fn current_context(&self) -> &mut Context {
if self.signal_pending.load(Ordering::Acquire) != 0 {
unsafe { &mut *self.signal_ctx.get() }
} else {
unsafe { &mut *self.ctx.get() }
}
}
/// Suspends current process with a "waiting" status
pub fn enter_wait(&self) {
let drop = {
let mut lock = self.inner.lock();
let drop = lock.state == State::Running;
lock.state = State::Waiting;
SCHED.dequeue(lock.id);
drop
};
if drop {
SCHED.switch(true);
}
}
/// Changes process wait condition status
pub fn setup_wait(&self, wait: *const Wait) {
let mut lock = self.inner.lock();
// FIXME this is not cool
lock.pending_wait = Some(unsafe { &*wait });
lock.wait_flag = true;
}
pub fn set_wait_reached(&self) {
let mut lock = self.inner.lock();
lock.wait_flag = false;
}
pub fn reset_wait(&self) {
let mut lock = self.inner.lock();
lock.pending_wait = None;
}
/// Returns `true` if process wait condition has not been reached
pub fn wait_flag(&self) -> bool {
self.inner.lock().wait_flag
}
/// Switches current thread back from signal handler
pub fn return_from_signal(&self) {
if self.signal_pending.load(Ordering::Acquire) == 0 {
panic!("TODO handle cases when returning from no signal");
}
self.signal_pending.store(0, Ordering::Release);
let src_ctx = self.signal_ctx.get();
let dst_ctx = self.ctx.get();
assert_eq!(self.inner.lock().state, State::Running);
unsafe {
(&mut *src_ctx).switch(&mut *dst_ctx);
}
}
#[inline]
pub fn state(&self) -> State {
self.inner.lock().state
}
pub fn set_owner(&self, pid: Pid) {
self.inner.lock().owner = Some(pid);
}
/// Sets up values needed for signal entry
pub fn set_signal_entry(&self, entry: usize, stack: usize) {
let mut lock = self.inner.lock();
lock.signal_entry = entry;
lock.signal_stack = stack;
}
/// Switches process main thread to a signal handler
pub fn enter_signal(&self, signal: Signal, ttbr0: usize) {
if self
.signal_pending
.compare_exchange_weak(0, signal as u32, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
panic!("Already handling a signal (maybe handle this case)");
}
let mut lock = self.inner.lock();
if lock.signal_entry == 0 || lock.signal_stack == 0 {
todo!();
}
let signal_ctx = unsafe { &mut *self.signal_ctx.get() };
let src_ctx = self.ctx.get();
debugln!(
"Signal entry: tid={}, pc={:#x}, sp={:#x}, ttbr0={:#x}",
lock.id,
lock.signal_entry,
lock.signal_stack,
ttbr0
);
assert_eq!(lock.state, State::Running);
unsafe {
signal_ctx.setup_signal_entry(lock.signal_entry, signal as usize, ttbr0, lock.signal_stack);
}
drop(lock);
unsafe {
(&mut *src_ctx).switch(signal_ctx);
}
}
pub fn terminate(&self) {
let mut lock = self.inner.lock();
lock.state = State::Finished;
let tid = lock.id;
let wait = lock.pending_wait.take();
drop(lock);
if let Some(wait) = wait {
wait.abort(tid);
}
}
}
impl Drop for Thread {
fn drop(&mut self) {
debugln!("Dropping process {:?}", self.id());
}
}
pub fn new_tid() -> u32 {
static LAST: AtomicU32 = AtomicU32::new(1);
let id = LAST.fetch_add(1, Ordering::Relaxed);
assert!(id < 256, "Out of user TIDs");
id
}

View File

@ -2,7 +2,7 @@
use crate::arch::machine;
use crate::dev::timer::TimestampSource;
use crate::proc::{self, sched::SCHED, Process, ProcessRef};
use crate::proc::{self, sched::SCHED, Process, Thread, ThreadRef};
use crate::sync::IrqSafeSpinLock;
use alloc::collections::LinkedList;
use core::time::Duration;
@ -11,11 +11,11 @@ use libsys::{error::Errno, stat::FdSet, proc::Pid};
/// Wait channel structure. Contains a queue of processes
/// waiting for some event to happen.
pub struct Wait {
queue: IrqSafeSpinLock<LinkedList<Pid>>,
queue: IrqSafeSpinLock<LinkedList<u32>>,
}
struct Timeout {
pid: Pid,
tid: u32,
deadline: Duration,
}
@ -30,9 +30,9 @@ pub fn tick() {
while let Some(item) = cursor.current() {
if time > item.deadline {
let pid = item.pid;
let tid = item.tid;
cursor.remove_current();
SCHED.enqueue(pid);
SCHED.enqueue(tid);
} else {
cursor.move_next();
}
@ -56,50 +56,51 @@ pub fn sleep(timeout: Duration, remaining: &mut Duration) -> Result<(), Errno> {
}
pub fn select(
proc: ProcessRef,
thread: ThreadRef,
mut rfds: Option<&mut FdSet>,
mut wfds: Option<&mut FdSet>,
timeout: Option<Duration>,
) -> Result<usize, Errno> {
// TODO support wfds
if wfds.is_some() || rfds.is_none() {
todo!();
}
let read = rfds.as_deref().map(FdSet::clone);
let write = wfds.as_deref().map(FdSet::clone);
rfds.as_deref_mut().map(FdSet::reset);
wfds.as_deref_mut().map(FdSet::reset);
// // TODO support wfds
// if wfds.is_some() || rfds.is_none() {
// todo!();
// }
// let read = rfds.as_deref().map(FdSet::clone);
// let write = wfds.as_deref().map(FdSet::clone);
// rfds.as_deref_mut().map(FdSet::reset);
// wfds.as_deref_mut().map(FdSet::reset);
let deadline = timeout.map(|v| v + machine::local_timer().timestamp().unwrap());
let mut io = proc.io.lock();
// let deadline = timeout.map(|v| v + machine::local_timer().timestamp().unwrap());
// let mut io = proc.io.lock();
loop {
if let Some(read) = &read {
for fd in read.iter() {
let file = io.file(fd)?;
if file.borrow().is_ready(false)? {
rfds.as_mut().unwrap().set(fd);
return Ok(1);
}
}
}
if let Some(write) = &write {
for fd in write.iter() {
let file = io.file(fd)?;
if file.borrow().is_ready(true)? {
wfds.as_mut().unwrap().set(fd);
return Ok(1);
}
}
}
// loop {
// if let Some(read) = &read {
// for fd in read.iter() {
// let file = io.file(fd)?;
// if file.borrow().is_ready(false)? {
// rfds.as_mut().unwrap().set(fd);
// return Ok(1);
// }
// }
// }
// if let Some(write) = &write {
// for fd in write.iter() {
// let file = io.file(fd)?;
// if file.borrow().is_ready(true)? {
// wfds.as_mut().unwrap().set(fd);
// return Ok(1);
// }
// }
// }
// Suspend
match WAIT_SELECT.wait(deadline) {
Err(Errno::TimedOut) => return Ok(0),
Err(e) => return Err(e),
Ok(_) => {}
}
}
// // Suspend
// match WAIT_SELECT.wait(deadline) {
// Err(Errno::TimedOut) => return Ok(0),
// Err(e) => return Err(e),
// Ok(_) => {}
// }
// }
}
impl Wait {
@ -115,12 +116,12 @@ impl Wait {
let mut queue = self.queue.lock();
let mut count = 0;
while limit != 0 && !queue.is_empty() {
let pid = queue.pop_front();
if let Some(pid) = pid {
let tid = queue.pop_front();
if let Some(tid) = tid {
let mut tick_lock = TICK_LIST.lock();
let mut cursor = tick_lock.cursor_front_mut();
while let Some(item) = cursor.current() {
if pid == item.pid {
if tid == item.tid {
cursor.remove_current();
break;
} else {
@ -129,8 +130,8 @@ impl Wait {
}
drop(tick_lock);
proc::process(pid).set_wait_flag(false);
SCHED.enqueue(pid);
Thread::get(tid).unwrap().set_wait_reached();
SCHED.enqueue(tid);
}
limit -= 1;
@ -149,29 +150,54 @@ impl Wait {
self.wakeup_some(1);
}
pub fn abort(&self, tid: u32) {
let mut queue = self.queue.lock();
let mut tick_lock = TICK_LIST.lock();
let mut cursor = tick_lock.cursor_front_mut();
while let Some(item) = cursor.current() {
if tid == item.tid {
cursor.remove_current();
break;
} else {
cursor.move_next();
}
}
let mut cursor = queue.cursor_front_mut();
while let Some(item) = cursor.current() {
if tid == *item {
cursor.remove_current();
break;
} else {
cursor.move_next();
}
}
}
/// Suspends current process until event is signalled or
/// (optional) deadline is reached
pub fn wait(&self, deadline: Option<Duration>) -> Result<(), Errno> {
let proc = Process::current();
let thread = Thread::current();
//let deadline = timeout.map(|t| machine::local_timer().timestamp().unwrap() + t);
let mut queue_lock = self.queue.lock();
queue_lock.push_back(proc.id());
proc.set_wait_flag(true);
queue_lock.push_back(thread.id());
thread.setup_wait(self);
if let Some(deadline) = deadline {
TICK_LIST.lock().push_back(Timeout {
pid: proc.id(),
tid: thread.id(),
deadline,
});
}
loop {
if !proc.wait_flag() {
if !thread.wait_flag() {
return Ok(());
}
drop(queue_lock);
proc.enter_wait();
thread.enter_wait();
queue_lock = self.queue.lock();
if let Some(deadline) = deadline {
@ -179,7 +205,7 @@ impl Wait {
let mut cursor = queue_lock.cursor_front_mut();
while let Some(&mut item) = cursor.current() {
if proc.id() == item {
if thread.id() == item {
cursor.remove_current();
break;
} else {

View File

@ -2,7 +2,7 @@
use crate::arch::platform::exception::ExceptionFrame;
use crate::debug::Level;
use crate::proc::{elf, wait, Process, ProcessIo};
use crate::proc::{self, elf, wait, Process, ProcessIo, Thread};
use core::mem::size_of;
use core::ops::DerefMut;
use core::time::Duration;
@ -55,7 +55,7 @@ pub fn syscall(num: usize, args: &[usize]) -> Result<usize, Errno> {
match num {
// Process management system calls
abi::SYS_EXIT => {
Process::current().exit(args[0] as i32);
Process::exit(args[0] as i32);
unreachable!();
}
@ -174,13 +174,11 @@ pub fn syscall(num: usize, args: &[usize]) -> Result<usize, Errno> {
res.map(|_| 0)
}
abi::SYS_EX_SIGNAL => {
let proc = Process::current();
proc.setup_signal_context(args[0], args[1]);
Thread::current().set_signal_entry(args[0], args[1]);
Ok(0)
}
abi::SYS_EX_SIGRETURN => {
let proc = Process::current();
proc.return_from_signal();
Thread::current().return_from_signal();
panic!("This code won't run");
}
abi::SYS_EX_KILL => {
@ -196,6 +194,19 @@ pub fn syscall(num: usize, args: &[usize]) -> Result<usize, Errno> {
};
Ok(0)
}
abi::SYS_EX_CLONE => {
let entry = args[0];
let stack = args[1];
let arg = args[2];
Process::current()
.new_user_thread(entry, stack, arg)
.map(|e| e as usize)
}
abi::SYS_EX_YIELD => {
proc::switch();
Ok(0)
},
abi::SYS_SELECT => {
let rfds = validate_user_ptr_struct_option::<FdSet>(args[0])?;
@ -206,15 +217,15 @@ pub fn syscall(num: usize, args: &[usize]) -> Result<usize, Errno> {
Some(Duration::from_nanos(args[2] as u64))
};
let proc = Process::current();
wait::select(proc, rfds, wfds, timeout)
wait::select(Thread::current(), rfds, wfds, timeout)
}
_ => {
let proc = Process::current();
let thread = Thread::current();
let proc = thread.owner().unwrap();
errorln!("Undefined system call: {}", num);
proc.enter_signal(Signal::InvalidSystemCall);
todo!()
proc.enter_fault_signal(thread, Signal::InvalidSystemCall);
Err(Errno::InvalidArgument)
}
}
}

View File

@ -4,6 +4,8 @@ pub const SYS_EX_NANOSLEEP: usize = 129;
pub const SYS_EX_SIGNAL: usize = 130;
pub const SYS_EX_SIGRETURN: usize = 131;
pub const SYS_EX_KILL: usize = 132;
pub const SYS_EX_CLONE: usize = 133;
pub const SYS_EX_YIELD: usize = 134;
pub const SYS_EXIT: usize = 1;
pub const SYS_READ: usize = 2;

View File

@ -271,6 +271,32 @@ pub fn sys_ex_kill(pid: SignalDestination, signum: Signal) -> Result<(), Errno>
})
}
#[inline(always)]
pub fn sys_ex_clone(entry: usize, stack: usize, arg: usize) -> Result<usize, Errno> {
Errno::from_syscall(unsafe {
syscall!(
abi::SYS_EX_CLONE,
argn!(entry),
argn!(stack),
argn!(arg)
)
})
}
#[inline(always)]
pub fn sys_ex_yield() {
unsafe {
syscall!(abi::SYS_EX_YIELD);
}
}
#[inline(always)]
pub fn sys_ex_undefined() {
unsafe {
syscall!(0);
}
}
#[inline(always)]
pub fn sys_select(
read_fds: Option<&mut FdSet>,

View File

@ -47,6 +47,6 @@ impl Errno {
impl From<usize> for Errno {
fn from(u: usize) -> Errno {
todo!()
unsafe { core::mem::transmute(u as u32) }
}
}

View File

@ -13,9 +13,16 @@ pub mod os;
pub mod sys;
pub mod sync;
use sys::Signal;
#[inline(never)]
extern "C" fn _signal_handler(arg: sys::Signal) -> ! {
extern "C" fn _signal_handler(arg: Signal) -> ! {
trace!("Entered signal handler: arg={:?}", arg);
match arg {
Signal::Interrupt | Signal::SegmentationFault =>
loop {},
_ => todo!()
}
sys::sys_ex_sigreturn();
}

View File

@ -1,4 +1,5 @@
pub use libsys::signal::{Signal, SignalDestination};
pub use libsys::proc::ExitCode;
pub use libsys::termios;
pub use libsys::calls::*;
pub use libsys::stat::{self, FileDescriptor};
@ -28,7 +29,7 @@ impl RawMutex {
#[inline]
pub unsafe fn lock(&self) {
while !self.try_lock() {
asm!("nop");
sys_ex_yield();
}
}

View File

@ -15,3 +15,4 @@ path = "src/shell/main.rs"
[dependencies]
libusr = { path = "../libusr" }
lazy_static = { version = "*", features = ["spin_no_std"] }

View File

@ -3,24 +3,67 @@
#[macro_use]
extern crate libusr;
#[macro_use]
extern crate lazy_static;
use libusr::io::{self, Read};
use libusr::sys::{Signal, SignalDestination};
use libusr::sync::Mutex;
static mut THREAD_STACK: [u8; 8192] = [0; 8192];
static mut THREAD_SIGNAL_STACK: [u8; 8192] = [0; 8192];
lazy_static! {
static ref MUTEX: Mutex<()> = Mutex::new(());
}
fn sleep(ns: u64) {
let mut rem = [0; 2];
libusr::sys::sys_ex_nanosleep(ns, &mut rem).unwrap();
}
fn fn0_signal(arg: Signal) {
trace!("fn0_signal");
unsafe {
libusr::sys::sys_exit(libusr::sys::ExitCode::from(0));
}
}
fn fn0(_arg: usize) {
unsafe {
libusr::sys::sys_ex_signal(fn0_signal as usize, THREAD_SIGNAL_STACK.as_mut_ptr().add(8192) as usize);
}
unsafe {
core::ptr::read_volatile(0x1234 as *const u32);
}
loop {}
//loop {
// sleep(100_000_000);
// println!("Tick from B");
// {
// let lock = MUTEX.lock();
// sleep(1_000_000_000);
// }
//}
}
fn do_fault() {
unsafe {
core::ptr::read_volatile(0x1238 as *const u32);
}
}
#[no_mangle]
fn main() -> i32 {
let mut buf = [0; 512];
let mut stdin = io::stdin();
eprintln!("stderr test");
loop {
let count = stdin.read(&mut buf).unwrap();
if count == 0 {
break;
}
let line = core::str::from_utf8(&buf[..count]).unwrap();
println!("{:?}", line);
unsafe {
libusr::sys::sys_ex_clone(fn0 as usize, THREAD_STACK.as_mut_ptr().add(8192) as usize, 0);
}
sleep(1_000_000_000);
do_fault();
loop {}
0
}