proc: (wip) async/await runtime

This commit is contained in:
Mark Poliakov 2023-09-05 16:17:48 +03:00
parent f88ca9b6d9
commit cb65a1cff2
24 changed files with 762 additions and 470 deletions

View File

@ -26,6 +26,8 @@ bitmap-font = { version = "0.3.0", optional = true }
embedded-graphics = { version = "0.8.0", optional = true }
log = "0.4.20"
futures-util = { version = "0.3.28", default-features = false, features = ["alloc", "async-await"] }
crossbeam-queue = { version = "0.3.8", default-features = false, features = ["alloc"] }
[dependencies.elf]
version = "0.7.2"

View File

@ -70,6 +70,24 @@ impl<T> OneTimeInit<T> {
unsafe { (*self.value.get()).assume_init_ref() }
}
#[track_caller]
pub fn or_init_with<F: FnOnce() -> T>(&self, init: F) -> &T {
if self
.state
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
.is_err()
{
// Already initialized
unsafe { (*self.value.get()).assume_init_ref() }
} else {
// Initialize
unsafe {
(*self.value.get()).write((init)());
(*self.value.get()).assume_init_ref()
}
}
}
/// Returns an immutable reference to the underlying value and [None] if the value hasn't yet
/// been initialized
pub fn try_get(&self) -> Option<&T> {

View File

@ -16,10 +16,9 @@ use tock_registers::{
use crate::{
arch::{x86_64::IrqNumber, Architecture, ARCHITECTURE},
device::timer,
mem::device::DeviceMemoryIo,
proc::wait,
sync::IrqSafeSpinlock,
task::tasklet,
};
register_bitfields! {
@ -169,8 +168,7 @@ impl InterruptHandler for Hpet {
Duration::from_millis(inner.tim0_counter)
};
wait::tick(now);
tasklet::tick(now);
timer::tick(now);
true
}

View File

@ -380,3 +380,6 @@ impl AddressSpace {
unsafe { (self.l0 as usize).physicalize() }
}
}
unsafe impl Send for AddressSpace {}
unsafe impl Sync for AddressSpace {}

View File

@ -11,6 +11,7 @@ pub mod bus;
pub mod display;
pub mod power;
pub mod serial;
pub mod timer;
pub mod tty;
static DEVICE_MANAGER: IrqSafeSpinlock<DeviceManager> = IrqSafeSpinlock::new(DeviceManager::new());

10
src/device/timer.rs Normal file
View File

@ -0,0 +1,10 @@
use core::time::Duration;
use crate::task::{runtime, tasklet};
pub fn tick(now: Duration) {
runtime::tick(now);
// TODO tasklets are no longer needed
tasklet::tick(now);
}

View File

@ -7,14 +7,15 @@ use abi::{
use device_api::serial::SerialDevice;
use crate::{
proc::wait::Wait,
sync::IrqSafeSpinlock,
task::{process::Process, ProcessId},
util::ring::AsyncRing,
};
#[cfg(feature = "fb_console")]
pub mod combined {
//! Combined console + keyboard terminal device
use crate::block;
use abi::{error::Error, io::DeviceRequest};
use device_api::{input::KeyboardConsumer, serial::SerialDevice};
use vfs::CharDevice;
@ -24,14 +25,13 @@ pub mod combined {
Device,
};
use super::{CharRing, TtyDevice};
use super::{TtyContext, TtyDevice};
// TODO rewrite this
/// Helper device to combine a display and a keyboard input into a single terminal
pub struct CombinedTerminal {
output: &'static (dyn DisplayConsole + Sync + 'static),
input_ring: CharRing<16>,
context: TtyContext,
}
impl CombinedTerminal {
@ -39,14 +39,14 @@ pub mod combined {
pub fn new(output: &'static FramebufferConsole) -> Self {
Self {
output,
input_ring: CharRing::new(),
context: TtyContext::new(),
}
}
}
impl TtyDevice<16> for CombinedTerminal {
fn ring(&self) -> &CharRing<16> {
&self.input_ring
impl TtyDevice for CombinedTerminal {
fn context(&self) -> &TtyContext {
&self.context
}
}
@ -73,7 +73,10 @@ pub mod combined {
impl CharDevice for CombinedTerminal {
fn read(&'static self, blocking: bool, data: &mut [u8]) -> Result<usize, Error> {
assert!(blocking);
self.line_read(data)
block! {
self.line_read(data).await
}
// self.line_read(data)
}
fn write(&self, blocking: bool, data: &[u8]) -> Result<usize, Error> {
@ -96,77 +99,74 @@ pub mod combined {
#[cfg(feature = "fb_console")]
pub use combined::CombinedTerminal;
struct CharRingInner<const N: usize> {
rd: usize,
wr: usize,
data: [u8; N],
flags: u8,
struct TtyContextInner {
config: TerminalOptions,
process_group: Option<ProcessId>,
}
/// Ring buffer for a character device. Handles reads, writes and channel notifications for a
/// terminal device.
pub struct CharRing<const N: usize> {
wait_read: Wait,
wait_write: Wait,
inner: IrqSafeSpinlock<CharRingInner<N>>,
config: IrqSafeSpinlock<TerminalOptions>,
pub struct TtyContext {
ring: AsyncRing<u8, 128>,
// input_queue: AsyncQueue<u8>,
inner: IrqSafeSpinlock<TtyContextInner>,
}
/// Terminal device interface
pub trait TtyDevice<const N: usize>: SerialDevice {
pub trait TtyDevice: SerialDevice {
/// Returns the ring buffer associated with the device
fn ring(&self) -> &CharRing<N>;
/// Returns `true` if data is ready to be read from or written to the terminal
fn is_ready(&self, write: bool) -> Result<bool, Error> {
let ring = self.ring();
if write {
todo!();
} else {
Ok(ring.is_readable())
}
}
fn context(&self) -> &TtyContext;
/// Sets the process group to which signals from this terminal should be delivered
fn set_signal_group(&self, id: ProcessId) {
self.ring().inner.lock().process_group = Some(id);
self.context().inner.lock().process_group.replace(id);
}
/// Sends a single byte to the terminal
fn line_send(&self, byte: u8) -> Result<(), Error> {
let config = self.ring().config.lock();
let cx = self.context();
let inner = cx.inner.lock();
if byte == b'\n' && config.output.contains(TerminalOutputOptions::NL_TO_CRNL) {
if byte == b'\n'
&& inner
.config
.output
.contains(TerminalOutputOptions::NL_TO_CRNL)
{
self.send(b'\r').ok();
}
drop(inner);
self.send(byte)
}
/// Receives a single byte from the terminal
fn recv_byte(&self, mut byte: u8) {
let ring = self.ring();
let config = ring.config.lock();
let cx = self.context();
let inner = cx.inner.lock();
if byte == b'\r' && config.input.contains(TerminalInputOptions::CR_TO_NL) {
if byte == b'\r' && inner.config.input.contains(TerminalInputOptions::CR_TO_NL) {
byte = b'\n';
}
if byte == b'\n' {
// TODO implement proper echo here
let _echo = config.line.contains(TerminalLineOptions::ECHO)
|| config
let _echo = inner.config.line.contains(TerminalLineOptions::ECHO)
|| inner
.config
.line
.contains(TerminalLineOptions::CANONICAL | TerminalLineOptions::ECHO_NL);
if config.output.contains(TerminalOutputOptions::NL_TO_CRNL) {
if inner
.config
.output
.contains(TerminalOutputOptions::NL_TO_CRNL)
{
self.send(b'\r').ok();
}
self.send(byte).ok();
} else if config.line.contains(TerminalLineOptions::ECHO) {
} else if inner.config.line.contains(TerminalLineOptions::ECHO) {
if byte.is_ascii_control() {
if byte != config.chars.erase && byte != config.chars.werase {
if byte != inner.config.chars.erase && byte != inner.config.chars.werase {
self.send(b'^').ok();
self.send(byte + 0x40).ok();
}
@ -176,10 +176,13 @@ pub trait TtyDevice<const N: usize>: SerialDevice {
}
// byte == config.chars.interrupt
if byte == config.chars.interrupt && config.line.contains(TerminalLineOptions::SIGNAL) {
drop(config);
let pgrp = ring.inner.lock().process_group;
if byte == inner.config.chars.interrupt
&& inner.config.line.contains(TerminalLineOptions::SIGNAL)
{
let pgrp = inner.process_group;
if let Some(pgrp) = pgrp {
drop(inner);
Process::signal_group(pgrp, Signal::Interrupted);
return;
} else {
@ -187,20 +190,22 @@ pub trait TtyDevice<const N: usize>: SerialDevice {
}
}
ring.putc(byte, false).ok();
drop(inner);
cx.putc(byte);
}
/// Reads and processes data from the terminal
fn line_read(&'static self, data: &mut [u8]) -> Result<usize, Error> {
let ring = self.ring();
let mut config = ring.config.lock();
async fn line_read(&self, data: &mut [u8]) -> Result<usize, Error> {
let cx = self.context();
let mut inner = cx.inner.lock();
if data.is_empty() {
return Ok(0);
}
if !config.is_canonical() {
let byte = ring.getc()?;
if !inner.config.is_canonical() {
drop(inner);
let byte = cx.getc().await;
data[0] = byte;
Ok(1)
} else {
@ -209,14 +214,14 @@ pub trait TtyDevice<const N: usize>: SerialDevice {
// Run until either end of buffer or return condition is reached
while rem != 0 {
drop(config);
let byte = ring.getc()?;
config = ring.config.lock();
drop(inner);
let byte = cx.getc().await;
inner = cx.inner.lock();
if config.is_canonical() {
if byte == config.chars.eof {
if inner.config.is_canonical() {
if byte == inner.config.chars.eof {
break;
} else if byte == config.chars.erase {
} else if byte == inner.config.chars.erase {
// Erase
if off != 0 {
self.raw_write(b"\x1b[D \x1b[D")?;
@ -225,7 +230,7 @@ pub trait TtyDevice<const N: usize>: SerialDevice {
}
continue;
} else if byte == config.chars.werase {
} else if byte == inner.config.chars.werase {
todo!()
}
}
@ -260,113 +265,69 @@ pub trait TtyDevice<const N: usize>: SerialDevice {
}
}
impl<const N: usize> CharRingInner<N> {
#[inline]
const fn is_readable(&self) -> bool {
if self.rd <= self.wr {
(self.wr - self.rd) > 0
} else {
(self.wr + (N - self.rd)) > 0
}
}
#[inline]
unsafe fn read_unchecked(&mut self) -> u8 {
let res = self.data[self.rd];
self.rd = (self.rd + 1) % N;
res
}
#[inline]
unsafe fn write_unchecked(&mut self, ch: u8) {
self.data[self.wr] = ch;
self.wr = (self.wr + 1) % N;
}
}
impl<const N: usize> CharRing<N> {
/// Constructs an empty ring buffer
pub const fn new() -> Self {
impl TtyContext {
pub fn new() -> Self {
Self {
inner: IrqSafeSpinlock::new(CharRingInner {
rd: 0,
wr: 0,
data: [0; N],
flags: 0,
ring: AsyncRing::new(0),
inner: IrqSafeSpinlock::new(TtyContextInner {
config: TerminalOptions::const_default(),
process_group: None,
}),
wait_read: Wait::new("char_ring_read"),
wait_write: Wait::new("char_ring_write"),
config: IrqSafeSpinlock::new(TerminalOptions::const_default()),
}
}
/// Returns `true` if the buffer has data to read
pub fn is_readable(&self) -> bool {
let inner = self.inner.lock();
let config = self.config.lock();
if config.is_canonical() {
let mut rd = inner.rd;
let mut count = 0usize;
loop {
let readable = if rd <= inner.wr {
(inner.wr - rd) > 0
} else {
(inner.wr + (N - rd)) > 0
};
if !readable {
break;
}
let byte = inner.data[rd];
if byte == b'\n' {
count += 1;
}
rd = (rd + 1) % N;
}
count != 0 || inner.flags != 0
} else {
inner.is_readable() || inner.flags != 0
}
pub fn putc(&self, ch: u8) {
self.ring.try_write(ch).unwrap();
}
/// Reads a single character from the buffer, blocking until available
pub fn getc(&'static self) -> Result<u8, Error> {
let mut lock = self.inner.lock();
loop {
if !lock.is_readable() && lock.flags == 0 {
drop(lock);
self.wait_read.wait(None)?;
lock = self.inner.lock();
} else {
break;
}
}
let byte = unsafe { lock.read_unchecked() };
drop(lock);
self.wait_write.wakeup_one();
// TODO WAIT_SELECT
Ok(byte)
}
/// Sends a single character to the buffer
pub fn putc(&self, ch: u8, blocking: bool) -> Result<(), Error> {
let mut lock = self.inner.lock();
if blocking {
todo!();
}
unsafe {
lock.write_unchecked(ch);
}
drop(lock);
self.wait_read.wakeup_one();
// TODO WAIT_SELECT
Ok(())
pub async fn getc(&self) -> u8 {
self.ring.read().await
}
}
// impl<const N: usize> CharRingInner<N> {
// }
//
// impl<const N: usize> CharRing<N> {
// /// Constructs an empty ring buffer
// pub const fn new() -> Self {
// Self {
// inner: IrqSafeSpinlock::new(CharRingInner {
// rd: 0,
// wr: 0,
// data: [0; N],
// flags: 0,
// process_group: None,
// }),
// // wait_read: Wait::new("char_ring_read"),
// // wait_write: Wait::new("char_ring_write"),
// config: IrqSafeSpinlock::new(TerminalOptions::const_default()),
// }
// }
//
// /// Reads a single character from the buffer, blocking until available
// pub fn getc(&'static self) -> Result<u8, Error> {
// todo!()
// // let mut lock = self.inner.lock();
// // loop {
// // if !lock.is_readable() && lock.flags == 0 {
// // drop(lock);
// // self.wait_read.wait(None)?;
// // lock = self.inner.lock();
// // } else {
// // break;
// // }
// // }
//
// // let byte = unsafe { lock.read_unchecked() };
// // drop(lock);
// // self.wait_write.wakeup_one();
// // // TODO WAIT_SELECT
// // Ok(byte)
// }
//
// /// Sends a single character to the buffer
// pub fn putc(&self, ch: u8, blocking: bool) -> Result<(), Error> {
// todo!()
// }
// }

View File

@ -12,20 +12,21 @@
let_chains,
linked_list_cursors,
rustc_private,
allocator_api
allocator_api,
async_fn_in_trait
)]
#![allow(clippy::new_without_default, clippy::fn_to_numeric_cast)]
#![warn(missing_docs)]
// #![warn(missing_docs)]
#![allow(missing_docs)]
#![no_std]
#![no_main]
use sync::SpinFence;
use task::spawn_kernel_closure;
use crate::{
arch::{Architecture, ArchitectureImpl, ARCHITECTURE},
mem::heap,
task::Cpu,
task::{spawn_kernel_closure, Cpu},
};
extern crate yggdrasil_abi as abi;

View File

@ -2,7 +2,7 @@
use core::mem::size_of;
use abi::error::Error;
use alloc::{rc::Rc, string::String};
use alloc::{string::String, sync::Arc};
use vfs::FileRef;
use crate::{
@ -75,7 +75,7 @@ fn setup_binary<S: Into<String>>(
space: AddressSpace,
entry: usize,
args: &[&str],
) -> Result<Rc<Process>, Error> {
) -> Result<Arc<Process>, Error> {
const USER_STACK_PAGES: usize = 16;
let virt_stack_base = 0x3000000;
@ -123,7 +123,7 @@ pub fn load_elf<S: Into<String>>(
name: S,
file: FileRef,
args: &[&str],
) -> Result<Rc<Process>, Error> {
) -> Result<Arc<Process>, Error> {
let space = AddressSpace::new_empty()?;
let elf_entry = proc::elf::load_elf_from_file(&space, file)?;

View File

@ -3,4 +3,3 @@
pub mod elf;
pub mod exec;
pub mod io;
pub mod wait;

View File

@ -1,183 +0,0 @@
//! Wait channel implementation
use core::time::Duration;
use abi::error::Error;
use alloc::{collections::LinkedList, rc::Rc};
use crate::{
arch::{Architecture, ARCHITECTURE},
sync::IrqSafeSpinlock,
task::process::{Process, ProcessState},
};
/// Defines whether the wait channel is available for a specific task
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum WaitStatus {
/// Wait on the channel was interrupted
Interrupted,
/// Channel did not yet signal availability
Pending,
/// Channel has data available
Done,
}
/// Wait notification channel
pub struct Wait {
queue: IrqSafeSpinlock<LinkedList<Rc<Process>>>,
// Used for tracing waits
#[allow(dead_code)]
name: &'static str,
}
struct Timeout {
process: Rc<Process>,
#[allow(dead_code)]
deadline: Duration,
}
/// Common notification channel for tasks waiting on process exits
pub static PROCESS_EXIT_WAIT: Wait = Wait::new("process-exit");
impl Wait {
/// Constructs a new wait notification channel
pub const fn new(name: &'static str) -> Self {
Self {
name,
queue: IrqSafeSpinlock::new(LinkedList::new()),
}
}
/// Wakes up tasks waiting for availability on this channel, but no more than `limit`
pub fn wakeup_some(&self, mut limit: usize) -> usize {
let mut queue = self.queue.lock();
let mut count = 0;
while limit != 0 && !queue.is_empty() {
let proc = queue.pop_front().unwrap();
{
let mut tick_lock = TICK_LIST.lock();
let mut cursor = tick_lock.cursor_front_mut();
while let Some(item) = cursor.current() {
if proc.id() == item.process.id() {
cursor.remove_current();
break;
} else {
cursor.move_next();
}
}
drop(tick_lock);
unsafe {
proc.set_wait_status(WaitStatus::Done);
}
if proc.state() == ProcessState::Suspended {
proc.enqueue_somewhere();
}
}
limit -= 1;
count += 1;
}
count
}
/// Wakes up all tasks waiting on this channel
pub fn wakeup_all(&self) {
self.wakeup_some(usize::MAX);
}
/// Wakes up a single task waiting on this channel
pub fn wakeup_one(&self) {
self.wakeup_some(1);
}
/// Suspends the task until either the deadline is reached or this channel signals availability
pub fn wait(&'static self, deadline: Option<Duration>) -> Result<(), Error> {
let process = Process::current();
let mut queue_lock = self.queue.lock();
queue_lock.push_back(process.clone());
unsafe {
process.setup_wait(self);
}
if let Some(deadline) = deadline {
TICK_LIST.lock().push_back(Timeout {
process: process.clone(),
deadline,
});
}
loop {
match process.wait_status() {
WaitStatus::Pending => (),
WaitStatus::Done => return Ok(()),
WaitStatus::Interrupted => return Err(Error::InvalidArgument),
}
drop(queue_lock);
process.suspend();
queue_lock = self.queue.lock();
if let Some(deadline) = deadline {
let now = ARCHITECTURE.monotonic_timer().monotonic_timestamp()?;
if now > deadline {
let mut cursor = queue_lock.cursor_front_mut();
while let Some(item) = cursor.current() {
if item.id() == process.id() {
cursor.remove_current();
return Err(Error::TimedOut);
} else {
cursor.move_next();
}
}
// Most likely the process was killed by a signal
}
}
}
}
}
static TICK_LIST: IrqSafeSpinlock<LinkedList<Timeout>> = IrqSafeSpinlock::new(LinkedList::new());
/// Suspends current task until given deadline
pub fn sleep(timeout: Duration, remaining: &mut Duration) -> Result<(), Error> {
static SLEEP_NOTIFY: Wait = Wait::new("sleep");
let now = ARCHITECTURE.monotonic_timer().monotonic_timestamp()?;
let deadline = now + timeout;
match SLEEP_NOTIFY.wait(Some(deadline)) {
// Just what we expected
Err(Error::TimedOut) => {
*remaining = Duration::ZERO;
Ok(())
}
Ok(_) => panic!("This should not happen"),
Err(e) => Err(e),
}
}
/// Updates all pending timeouts and wakes up the tasks that have reached theirs
pub fn tick(now: Duration) {
let mut list = TICK_LIST.lock();
let mut cursor = list.cursor_front_mut();
while let Some(item) = cursor.current() {
if now > item.deadline {
let t = cursor.remove_current().unwrap();
if t.process.state() == ProcessState::Suspended {
t.process.enqueue_somewhere();
}
} else {
cursor.move_next();
}
}
}

View File

@ -15,16 +15,11 @@ use yggdrasil_abi::{
};
use crate::{
fs,
block, fs,
mem::table::{MapAttributes, VirtualMemoryManager},
proc::{
self,
io::ProcessIo,
wait::{self, PROCESS_EXIT_WAIT},
// wait::{self, PROCESS_EXIT_WAIT},
},
proc::{self, io::ProcessIo},
sync::IrqSafeSpinlockGuard,
task::{process::Process, ProcessId},
task::{process::Process, runtime, ProcessId},
};
mod arg;
@ -66,9 +61,10 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
let seconds = args[0];
let nanos = args[1] as u32;
let duration = Duration::new(seconds, nanos);
let mut remaining = Duration::ZERO;
wait::sleep(duration, &mut remaining).unwrap();
block! {
runtime::sleep(duration).await
};
Ok(0)
}
@ -321,18 +317,11 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
let target = Process::get(pid).ok_or(Error::DoesNotExist)?;
loop {
if let Some(exit_status) = target.get_exit_status() {
*status = exit_status;
break Ok(0);
}
*status = block! {
Process::wait_for_exit(target).await
};
// Suspend and wait for signal
match PROCESS_EXIT_WAIT.wait(Some(Duration::from_secs(3))) {
Ok(()) | Err(Error::TimedOut) => (),
Err(_) => todo!(),
}
}
Ok(0)
}
SyscallFunction::SendSignal => {
let pid = args[0] as u32;

View File

@ -3,7 +3,7 @@
#![allow(dead_code)]
use abi::error::Error;
use alloc::{rc::Rc, string::String, vec::Vec};
use alloc::{string::String, sync::Arc, vec::Vec};
use crate::{
arch::{Architecture, ArchitectureImpl},
@ -15,6 +15,7 @@ use self::{context::TaskContextImpl, process::Process};
pub mod context;
pub mod process;
pub mod runtime;
pub mod sched;
pub mod tasklet;
@ -25,7 +26,7 @@ pub type ProcessId = usize;
/// Wrapper structure to hold all the system's processes
pub struct ProcessList {
data: Vec<(ProcessId, Rc<Process>)>,
data: Vec<(ProcessId, Arc<Process>)>,
last_process_id: ProcessId,
}
@ -44,7 +45,7 @@ impl ProcessList {
///
/// Only meant to be called from inside the Process impl, as this function does not perform any
/// accounting information updates.
pub unsafe fn push(&mut self, process: Rc<Process>) -> ProcessId {
pub unsafe fn push(&mut self, process: Arc<Process>) -> ProcessId {
self.last_process_id += 1;
debugln!("Insert process with ID {}", self.last_process_id);
self.data.push((self.last_process_id, process));
@ -52,7 +53,7 @@ impl ProcessList {
}
/// Looks up a process by its ID
pub fn get(&self, id: ProcessId) -> Option<&Rc<Process>> {
pub fn get(&self, id: ProcessId) -> Option<&Arc<Process>> {
self.data
.iter()
.find_map(|(i, p)| if *i == id { Some(p) } else { None })
@ -80,12 +81,10 @@ pub fn init() -> Result<(), Error> {
// Create a queue for each CPU
sched::init_queues(Vec::from_iter((0..cpu_count).map(CpuQueue::new)));
// spawn_kernel_closure(move || loop {
// debugln!("B");
// for _ in 0..100000 {
// core::hint::spin_loop();
// }
// })?;
// Spawn async workers
(0..cpu_count).for_each(|index| {
runtime::spawn_async_worker(index).unwrap();
});
Ok(())
}

View File

@ -2,30 +2,33 @@
use core::{
mem::size_of,
ops::Deref,
pin::Pin,
sync::atomic::{AtomicU32, Ordering},
task::{Context, Poll},
};
use abi::{
error::Error,
process::{ExitCode, Signal, SignalEntryData},
};
use alloc::{collections::VecDeque, rc::Rc, string::String};
use alloc::{collections::VecDeque, string::String, sync::Arc};
use atomic_enum::atomic_enum;
use futures_util::{task::ArcWake, Future};
use kernel_util::util::OneTimeInit;
use vfs::VnodeRef;
use crate::{
arch::{Architecture, ArchitectureImpl},
mem::{table::AddressSpace, ForeignPointer},
proc::{
io::ProcessIo,
wait::{Wait, WaitStatus, PROCESS_EXIT_WAIT},
},
proc::io::ProcessIo,
sync::{IrqGuard, IrqSafeSpinlock},
task::context::TaskContextImpl,
};
use super::{context::TaskFrame, sched::CpuQueue, Cpu, ProcessId, TaskContext, PROCESSES};
use super::{
context::TaskFrame, runtime::QueueWaker, sched::CpuQueue, Cpu, ProcessId, TaskContext,
PROCESSES,
};
/// Represents the states a process can be at some point in time
#[atomic_enum]
@ -49,9 +52,6 @@ pub struct SignalEntry {
}
struct ProcessInner {
// XXX
pending_wait: Option<&'static Wait>,
wait_status: WaitStatus,
exit_status: i32,
session_id: ProcessId,
@ -74,12 +74,15 @@ pub struct Process {
cpu_id: AtomicU32,
space: Option<AddressSpace>,
inner: IrqSafeSpinlock<ProcessInner>,
exit_waker: QueueWaker,
/// I/O state of the task
pub io: IrqSafeSpinlock<ProcessIo>,
}
/// Guard type that provides [Process] operations only available for current processes
pub struct CurrentProcess(Rc<Process>);
pub struct CurrentProcess(Arc<Process>);
impl Process {
/// Creates a process from raw architecture-specific [TaskContext].
@ -91,8 +94,8 @@ impl Process {
name: S,
space: Option<AddressSpace>,
normal_context: TaskContext,
) -> Rc<Self> {
let this = Rc::new(Self {
) -> Arc<Self> {
let this = Arc::new(Self {
normal_context,
id: OneTimeInit::new(),
@ -101,10 +104,9 @@ impl Process {
cpu_id: AtomicU32::new(0),
space,
exit_waker: QueueWaker::new(),
inner: IrqSafeSpinlock::new(ProcessInner {
// XXX
pending_wait: None,
wait_status: WaitStatus::Done,
exit_status: 0,
session_id: 0,
@ -219,7 +221,7 @@ impl Process {
/// # Panics
///
/// Currently, the code will panic if the process is queued/executing on any queue.
pub fn enqueue_somewhere(self: Rc<Self>) -> usize {
pub fn enqueue_somewhere(self: Arc<Self>) -> usize {
// Doesn't have to be precise, so even if something changes, we can still be rebalanced
// to another CPU
let (index, queue) = CpuQueue::least_loaded().unwrap();
@ -234,13 +236,16 @@ impl Process {
/// # Panics
///
/// Currently, the code will panic if the process is queued/executing on any queue.
pub fn enqueue_to(self: Rc<Self>, queue: &'static CpuQueue) {
pub fn enqueue_to(self: Arc<Self>, queue: &'static CpuQueue) {
let _irq = IrqGuard::acquire();
{
let mut inner = self.inner.lock();
let old_queue = inner.queue.replace(queue);
assert!(old_queue.is_none());
if old_queue.is_some() {
// Already in some queue
return;
}
}
let current_state = self.state.swap(ProcessState::Ready, Ordering::SeqCst);
@ -299,21 +304,6 @@ impl Process {
self.dequeue(ProcessState::Suspended);
}
/// Returns current wait status of the task
pub fn wait_status(&self) -> WaitStatus {
self.inner.lock().wait_status
}
/// Updates the wait status for the task.
///
/// # Safety
///
/// This function is only meant to be called on waiting tasks, otherwise atomicity is not
/// guaranteed.
pub unsafe fn set_wait_status(&self, status: WaitStatus) {
self.inner.lock().wait_status = status;
}
/// Returns an exit code if the process exited, [None] if it didn't
pub fn get_exit_status(&self) -> Option<ExitCode> {
if self.state() == ProcessState::Terminated {
@ -330,7 +320,7 @@ impl Process {
}
/// Returns a process by its ID
pub fn get(pid: ProcessId) -> Option<Rc<Self>> {
pub fn get(pid: ProcessId) -> Option<Arc<Self>> {
PROCESSES.lock().get(pid).cloned()
}
@ -361,24 +351,25 @@ impl Process {
}
// Notify any waiters we're done
PROCESS_EXIT_WAIT.wakeup_all();
self.exit_waker.wake_all();
}
/// Raises a signal for the currentprocess
pub fn raise_signal(self: &Rc<Self>, signal: Signal) {
{
let mut inner = self.inner.lock();
inner.wait_status = WaitStatus::Interrupted;
inner.signal_stack.push_back(signal);
}
pub fn raise_signal(self: &Arc<Self>, _signal: Signal) {
// XXX handle signals + async
todo!();
// {
// let mut inner = self.inner.lock();
// inner.signal_stack.push_back(signal);
// }
if self.state() == ProcessState::Suspended {
self.clone().enqueue_somewhere();
}
// if self.state() == ProcessState::Suspended {
// self.clone().enqueue_somewhere();
// }
}
/// Inherits the data from a parent process. Meant to be called from SpawnProcess handler.
pub fn inherit(&self, parent: &Rc<Process>) -> Result<(), Error> {
pub fn inherit(&self, parent: &Arc<Process>) -> Result<(), Error> {
let mut our_inner = self.inner.lock();
let their_inner = parent.inner.lock();
@ -401,6 +392,37 @@ impl Process {
}
}
}
pub fn wait_for_exit(process: Arc<Self>) -> impl Future<Output = ExitCode> {
struct ProcessExitFuture {
process: Arc<Process>,
}
impl Future for ProcessExitFuture {
type Output = ExitCode;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let process = &self.process;
process.exit_waker.register(cx.waker());
if let Some(exit_status) = process.get_exit_status() {
process.exit_waker.remove(cx.waker());
Poll::Ready(exit_status)
} else {
Poll::Pending
}
}
}
ProcessExitFuture { process }
}
}
impl ArcWake for Process {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.clone().enqueue_somewhere();
}
}
impl Drop for Process {
@ -415,25 +437,13 @@ impl CurrentProcess {
/// # Safety
///
/// Only meant to be called from [Process::current] or [CpuQueue::current_process].
pub unsafe fn new(inner: Rc<Process>) -> Self {
pub unsafe fn new(inner: Arc<Process>) -> Self {
// XXX
// assert_eq!(DAIF.read(DAIF::I), 1);
assert!(ArchitectureImpl::interrupt_mask());
Self(inner)
}
/// Sets up a pending wait for the process.
///
/// # Safety
///
/// This function is only meant to be called in no-IRQ context and when caller can guarantee
/// the task won't get scheduled to a CPU in such state.
pub unsafe fn setup_wait(&self, wait: &'static Wait) {
let mut inner = self.inner.lock();
inner.pending_wait.replace(wait);
inner.wait_status = WaitStatus::Pending;
}
/// Configures signal entry information for the process
pub fn set_signal_entry(&self, entry: usize, stack: usize) {
let mut inner = self.inner.lock();
@ -447,21 +457,6 @@ impl CurrentProcess {
self.handle_exit();
self.dequeue(ProcessState::Terminated);
// let current_state = self.state.swap(ProcessState::Terminated, Ordering::SeqCst);
// assert_eq!(current_state, ProcessState::Running);
// match current_state {
// ProcessState::Suspended => {
// todo!();
// }
// ProcessState::Ready => todo!(),
// ProcessState::Running => {
// self.handle_exit();
// unsafe { Cpu::local().queue().yield_cpu() }
// }
// ProcessState::Terminated => todo!(),
// }
}
/// Sets up a return frame to handle a pending signal, if any is present in the task's queue.
@ -519,7 +514,7 @@ impl CurrentProcess {
}
impl Deref for CurrentProcess {
type Target = Rc<Process>;
type Target = Arc<Process>;
fn deref(&self) -> &Self::Target {
&self.0

View File

@ -0,0 +1,63 @@
use core::task::{Context, Poll};
use abi::error::Error;
use alloc::{boxed::Box, format, sync::Arc};
use futures_util::{task::waker_ref, Future};
use crate::task::{process::Process, spawn_kernel_closure};
use super::{
task::Task,
task_queue::{TaskQueue, TASK_QUEUE},
};
fn init_async_queue() -> TaskQueue {
TaskQueue::new(128)
}
pub fn enqueue(task: Arc<Task>) -> Result<(), Error> {
TASK_QUEUE.or_init_with(init_async_queue).enqueue(task)
}
pub fn spawn_async_worker(index: usize) -> Result<(), Error> {
let name = format!("[async-worker-{}]", index);
spawn_kernel_closure(name, move || {
let queue = TASK_QUEUE.or_init_with(init_async_queue);
loop {
let task = queue.block_pop().unwrap();
let mut future_slot = task.future.lock();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);
if future.as_mut().poll(context).is_pending() {
*future_slot = Some(future);
}
}
}
})
}
pub fn spawn<F: Future<Output = ()> + Send + 'static>(future: F) -> Result<(), Error> {
enqueue(Task::new(future))
}
pub fn run_to_completion<'a, T, F: Future<Output = T> + Send + 'a>(future: F) -> T {
let process = Process::current();
let mut future = Box::pin(future);
loop {
let waker = waker_ref(&process);
let context = &mut Context::from_waker(&waker);
match future.as_mut().poll(context) {
Poll::Ready(value) => break value,
Poll::Pending => {
process.suspend();
}
}
}
}

View File

@ -0,0 +1,33 @@
#[macro_export]
macro_rules! block {
($($stmt:tt)*) => {
$crate::task::runtime::run_to_completion(alloc::boxed::Box::pin(async move {
$($stmt)*
}))
};
}
#[macro_export]
macro_rules! any {
($fut0:ident = $pat0:pat => $body0:expr, $fut1:ident = $pat1:pat => $body1:expr) => {
use futures_util::Future;
let mut pin0 = alloc::boxed::Box::pin($fut0);
let mut pin1 = alloc::boxed::Box::pin($fut1);
$crate::task::runtime::poll_fn(move |cx| {
match (pin0.as_mut().poll(cx), pin1.as_mut().poll(cx)) {
(core::task::Poll::Ready($pat0), _) => {
$body0;
core::task::Poll::Ready(())
}
(_, core::task::Poll::Ready($pat1)) => {
$body1;
core::task::Poll::Ready(())
}
(_, _) => core::task::Poll::Pending,
}
})
.await;
};
}

78
src/task/runtime/mod.rs Normal file
View File

@ -0,0 +1,78 @@
use core::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures_util::Future;
use crate::arch::{Architecture, ARCHITECTURE};
mod executor;
mod macros;
mod task;
mod task_queue;
mod waker;
pub use executor::{run_to_completion, spawn, spawn_async_worker};
pub use waker::QueueWaker;
pub struct PollFn<F> {
f: F,
}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context) -> Poll<T>,
{
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
(unsafe { &mut self.get_unchecked_mut().f })(cx)
}
}
pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context) -> Poll<T>,
{
PollFn { f }
}
pub static SLEEP_WAKER: QueueWaker = QueueWaker::new();
pub fn sleep(duration: Duration) -> impl Future<Output = ()> {
struct SleepFuture {
deadline: Duration,
}
impl Future for SleepFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
SLEEP_WAKER.register(cx.waker());
let now = ARCHITECTURE
.monotonic_timer()
.monotonic_timestamp()
.unwrap();
if now >= self.deadline {
SLEEP_WAKER.remove(cx.waker());
Poll::Ready(())
} else {
Poll::Pending
}
}
}
let now = ARCHITECTURE
.monotonic_timer()
.monotonic_timestamp()
.unwrap();
let deadline = now + duration;
SleepFuture { deadline }
}
pub fn tick(_now: Duration) {
SLEEP_WAKER.wake_all();
}

23
src/task/runtime/task.rs Normal file
View File

@ -0,0 +1,23 @@
use alloc::sync::Arc;
use futures_util::{future::BoxFuture, task::ArcWake, Future, FutureExt};
use crate::sync::IrqSafeSpinlock;
use super::executor;
pub struct Task {
pub(super) future: IrqSafeSpinlock<Option<BoxFuture<'static, ()>>>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
executor::enqueue(arc_self.clone()).unwrap();
}
}
impl Task {
pub fn new<F: Future<Output = ()> + Send + 'static>(future: F) -> Arc<Self> {
let future = IrqSafeSpinlock::new(Some(future.boxed()));
Arc::new(Self { future })
}
}

View File

@ -0,0 +1,55 @@
use abi::error::Error;
use alloc::sync::Arc;
use crossbeam_queue::ArrayQueue;
use kernel_util::util::OneTimeInit;
use crate::{sync::IrqGuard, task::process::Process};
use super::task::Task;
pub(super) static TASK_QUEUE: OneTimeInit<TaskQueue> = OneTimeInit::new();
pub(super) struct TaskQueue {
// Queue of workers waiting for an item
pending_workers: ArrayQueue<Arc<Process>>,
task_queue: ArrayQueue<Arc<Task>>,
}
impl TaskQueue {
pub fn new(task_capacity: usize) -> Self {
Self {
pending_workers: ArrayQueue::new(16),
task_queue: ArrayQueue::new(task_capacity),
}
}
fn wakeup_one(&self) {
if let Some(worker) = self.pending_workers.pop() {
worker.enqueue_somewhere();
}
}
pub fn enqueue(&self, task: Arc<Task>) -> Result<(), Error> {
let _irq = IrqGuard::acquire();
if self.task_queue.push(task).is_err() {
todo!();
}
self.wakeup_one();
Ok(())
}
pub fn block_pop(&self) -> Result<Arc<Task>, Error> {
let _irq = IrqGuard::acquire();
let process = Process::current();
loop {
if let Some(task) = self.task_queue.pop() {
return Ok(task);
}
if self.pending_workers.push(process.clone()).is_err() {
panic!("Pending worker queue overflow");
}
process.suspend();
}
}
}

63
src/task/runtime/waker.rs Normal file
View File

@ -0,0 +1,63 @@
use core::task::Waker;
use alloc::collections::VecDeque;
use crate::sync::IrqSafeSpinlock;
pub struct QueueWaker {
queue: IrqSafeSpinlock<VecDeque<Waker>>,
}
impl QueueWaker {
pub const fn new() -> Self {
Self {
queue: IrqSafeSpinlock::new(VecDeque::new()),
}
}
pub fn register(&self, waker: &Waker) {
let mut queue = self.queue.lock();
if queue.iter().find(|other| other.will_wake(waker)).is_some() {
return;
}
queue.push_back(waker.clone());
}
pub fn remove(&self, waker: &Waker) -> bool {
let mut queue = self.queue.lock();
let mut index = 0;
let mut removed = false;
while index < queue.len() {
if queue[index].will_wake(waker) {
removed = true;
queue.remove(index);
}
index += 1;
}
removed
}
pub fn wake_some(&self, limit: usize) -> usize {
let mut queue = self.queue.lock();
let mut count = 0;
while count < limit && let Some(item) = queue.pop_front() {
item.wake();
count += 1;
}
count
}
pub fn wake_one(&self) -> bool {
self.wake_some(1) != 0
}
pub fn wake_all(&self) -> usize {
self.wake_some(usize::MAX)
}
}

View File

@ -1,7 +1,7 @@
//! Per-CPU queue implementation
// use aarch64_cpu::registers::CNTPCT_EL0;
use alloc::{collections::VecDeque, rc::Rc, vec::Vec};
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
use cfg_if::cfg_if;
use kernel_util::util::OneTimeInit;
@ -32,9 +32,9 @@ pub struct CpuQueueStats {
/// Per-CPU queue's inner data, normally resides under a lock
pub struct CpuQueueInner {
/// Current process, None if idling
pub current: Option<Rc<Process>>,
pub current: Option<Arc<Process>>,
/// LIFO queue for processes waiting for execution
pub queue: VecDeque<Rc<Process>>,
pub queue: VecDeque<Arc<Process>>,
/// CPU time usage statistics
pub stats: CpuQueueStats,
@ -80,7 +80,7 @@ impl CpuQueueInner {
/// Picks a next task for execution, skipping (dropping) those that were suspended. May return
/// None if the queue is empty or no valid task was found, in which case the scheduler should
/// go idle.
pub fn next_ready_task(&mut self) -> Option<Rc<Process>> {
pub fn next_ready_task(&mut self) -> Option<Arc<Process>> {
while !self.queue.is_empty() {
let task = self.queue.pop_front().unwrap();
@ -103,7 +103,7 @@ impl CpuQueueInner {
/// Returns an iterator over all the processes in the queue plus the currently running process,
/// if there is one.
pub fn iter(&self) -> impl Iterator<Item = &Rc<Process>> {
pub fn iter(&self) -> impl Iterator<Item = &Arc<Process>> {
Iterator::chain(self.queue.iter(), self.current.iter())
}
}
@ -134,7 +134,7 @@ impl CpuQueue {
/// Only meant to be called from [crate::task::enter()] function.
pub unsafe fn enter(&self) -> ! {
assert!(ArchitectureImpl::interrupt_mask());
// Start from idle thread to avoid having a Rc stuck here without getting dropped
// Start from idle thread to avoid having a Arc stuck here without getting dropped
// let t = CNTPCT_EL0.get();
// self.lock().stats.measure_time = t;
self.idle.enter()
@ -173,18 +173,18 @@ impl CpuQueue {
inner.current = next.clone();
// Can drop the lock, we hold current and next Rc's
// Can drop the lock, we hold current and next Arc's
drop(inner);
let (from, _from_rc) = if let Some(current) = current.as_ref() {
(current.current_context(), Rc::strong_count(current))
(current.current_context(), Arc::strong_count(current))
} else {
(&self.idle, 0)
};
let (to, _to_rc) = if let Some(next) = next.as_ref() {
next.set_running(Cpu::local_id());
(next.current_context(), Rc::strong_count(next))
(next.current_context(), Arc::strong_count(next))
} else {
(&self.idle, 0)
};
@ -216,7 +216,7 @@ impl CpuQueue {
///
/// Only meant to be called from Process impl. The function does not set any process accounting
/// information, which may lead to invalid states.
pub unsafe fn enqueue(&self, p: Rc<Process>) {
pub unsafe fn enqueue(&self, p: Arc<Process>) {
let mut inner = self.inner.lock();
assert!(ArchitectureImpl::interrupt_mask());
assert_eq!(p.state(), ProcessState::Ready);
@ -300,6 +300,10 @@ impl CpuQueue {
queues.iter().enumerate().min_by_key(|(_, q)| q.len())
}
pub fn index(&self) -> usize {
self.index
}
}
/// Initializes the global queue list

View File

@ -5,6 +5,25 @@ use yggdrasil_abi::error::Error;
use crate::arch::{Architecture, ARCHITECTURE};
pub mod queue;
pub mod ring;
pub trait ResultIterator<T, E> {
fn collect_error(self) -> Option<E>;
}
impl<T, E, I: Iterator<Item = Result<T, E>>> ResultIterator<T, E> for I {
fn collect_error(self) -> Option<E> {
for item in self {
match item {
Err(e) => return Some(e),
_ => (),
}
}
None
}
}
/// Performs a busy-loop sleep until the specified duration has passed
pub fn polling_sleep(duration: Duration) -> Result<(), Error> {
let timer = ARCHITECTURE.monotonic_timer();

64
src/util/queue.rs Normal file
View File

@ -0,0 +1,64 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use alloc::sync::Arc;
use crossbeam_queue::ArrayQueue;
use futures_util::Future;
use crate::task::runtime::QueueWaker;
pub struct AsyncQueue<T> {
waker: Arc<QueueWaker>,
queue: Arc<ArrayQueue<T>>,
}
impl<T> AsyncQueue<T> {
pub fn new(capacity: usize) -> Self {
Self {
waker: Arc::new(QueueWaker::new()),
queue: Arc::new(ArrayQueue::new(capacity)),
}
}
pub fn send(&self, value: T) -> Result<(), T> {
let result = self.queue.push(value);
if result.is_ok() {
self.waker.wake_one();
}
result
}
pub fn recv(&self) -> impl Future<Output = T> {
struct AsyncQueueRecvFuture<T> {
waker: Arc<QueueWaker>,
queue: Arc<ArrayQueue<T>>,
}
impl<T> Future for AsyncQueueRecvFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(value) = self.queue.pop() {
return Poll::Ready(value);
}
self.waker.register(cx.waker());
match self.queue.pop() {
Some(value) => {
self.waker.remove(cx.waker());
Poll::Ready(value)
}
None => Poll::Pending,
}
}
}
AsyncQueueRecvFuture {
queue: self.queue.clone(),
waker: self.waker.clone(),
}
}
}

97
src/util/ring.rs Normal file
View File

@ -0,0 +1,97 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use abi::error::Error;
use alloc::sync::Arc;
use futures_util::Future;
use crate::{sync::IrqSafeSpinlock, task::runtime::QueueWaker};
struct Inner<T, const N: usize> {
rd: usize,
wr: usize,
data: [T; N],
}
pub struct AsyncRing<T, const N: usize> {
inner: Arc<IrqSafeSpinlock<Inner<T, N>>>,
read_waker: Arc<QueueWaker>,
}
impl<T: Copy, const N: usize> Inner<T, N> {
#[inline]
const fn is_readable(&self) -> bool {
if self.rd <= self.wr {
(self.wr - self.rd) > 0
} else {
(self.wr + (N - self.rd)) > 0
}
}
#[inline]
unsafe fn read_unchecked(&mut self) -> T {
let res = self.data[self.rd];
self.rd = (self.rd + 1) % N;
res
}
#[inline]
unsafe fn write_unchecked(&mut self, ch: T) {
self.data[self.wr] = ch;
self.wr = (self.wr + 1) % N;
}
}
impl<T: Copy, const N: usize> AsyncRing<T, N> {
pub fn new(value: T) -> Self {
Self {
inner: Arc::new(IrqSafeSpinlock::new(Inner {
rd: 0,
wr: 0,
data: [value; N],
})),
read_waker: Arc::new(QueueWaker::new()),
}
}
pub fn try_write(&self, item: T) -> Result<(), Error> {
let mut lock = self.inner.lock();
unsafe {
lock.write_unchecked(item);
}
drop(lock);
self.read_waker.wake_one();
Ok(())
}
pub fn read(&self) -> impl Future<Output = T> {
struct ReadFuture<T: Copy, const N: usize> {
inner: Arc<IrqSafeSpinlock<Inner<T, N>>>,
read_waker: Arc<QueueWaker>,
}
impl<T: Copy, const N: usize> Future for ReadFuture<T, N> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.read_waker.register(cx.waker());
let mut inner = self.inner.lock();
if inner.is_readable() {
self.read_waker.remove(cx.waker());
Poll::Ready(unsafe { inner.read_unchecked() })
} else {
Poll::Pending
}
}
}
ReadFuture {
inner: self.inner.clone(),
read_waker: self.read_waker.clone(),
}
}
}