From e78784d96d80a36d1f5a994d345fdb1530b40c38 Mon Sep 17 00:00:00 2001 From: Mark Poliakov <mark@alnyan.me> Date: Sun, 10 Dec 2023 23:01:39 +0200 Subject: [PATCH] refactor: move async runtime into kernel-util --- lib/kernel-util/Cargo.toml | 3 + lib/kernel-util/src/api.rs | 24 ++++- lib/kernel-util/src/lib.rs | 6 +- lib/kernel-util/src/runtime/executor.rs | 65 ++++++++++++ lib/kernel-util/src/runtime/macros.rs | 8 ++ lib/kernel-util/src/runtime/mod.rs | 54 ++++++++++ lib/kernel-util/src/runtime/task.rs | 46 ++++++++ lib/kernel-util/src/runtime/task_queue.rs | 70 +++++++++++++ lib/kernel-util/src/runtime/waker.rs | 72 +++++++++++++ lib/kernel-util/src/thread.rs | 102 ++++++++++++++++++ src/arch/mod.rs | 7 ++ src/arch/x86_64/boot/mod.rs | 2 +- src/arch/x86_64/peripherals/i8253.rs | 15 ++- src/device/display/console.rs | 4 +- src/device/nvme/mod.rs | 2 +- src/device/nvme/queue.rs | 3 +- src/device/timer.rs | 9 -- src/device/tty.rs | 3 +- src/init.rs | 3 +- src/syscall/mod.rs | 14 +-- src/task/context.rs | 35 +------ src/task/mod.rs | 5 +- src/task/process.rs | 3 +- src/task/runtime/mod.rs | 122 +++++++++------------- src/task/sync.rs | 3 +- src/task/thread.rs | 45 +++++++- src/util/queue.rs | 3 +- src/util/ring.rs | 4 +- 28 files changed, 571 insertions(+), 161 deletions(-) create mode 100644 lib/kernel-util/src/runtime/executor.rs create mode 100644 lib/kernel-util/src/runtime/macros.rs create mode 100644 lib/kernel-util/src/runtime/mod.rs create mode 100644 lib/kernel-util/src/runtime/task.rs create mode 100644 lib/kernel-util/src/runtime/task_queue.rs create mode 100644 lib/kernel-util/src/runtime/waker.rs create mode 100644 lib/kernel-util/src/thread.rs diff --git a/lib/kernel-util/Cargo.toml b/lib/kernel-util/Cargo.toml index 81ac1c42..781b9149 100644 --- a/lib/kernel-util/Cargo.toml +++ b/lib/kernel-util/Cargo.toml @@ -7,4 +7,7 @@ edition = "2021" [dependencies] yggdrasil-abi = { git = "https://git.alnyan.me/yggdrasil/yggdrasil-abi.git" } + log = "0.4.20" +futures-util = { version = "0.3.28", default-features = false, features = ["alloc", "async-await"] } +crossbeam-queue = { version = "0.3.8", default-features = false, features = ["alloc"] } diff --git a/lib/kernel-util/src/api.rs b/lib/kernel-util/src/api.rs index aa43441f..b391d89c 100644 --- a/lib/kernel-util/src/api.rs +++ b/lib/kernel-util/src/api.rs @@ -1,6 +1,12 @@ -use yggdrasil_abi::error::Error; +use core::time::Duration; -use crate::mem::{address::PhysicalAddress, device::RawDeviceMemoryMapping}; +use alloc::{string::String, sync::Arc}; +use yggdrasil_abi::{error::Error, process::ExitCode}; + +use crate::{ + mem::{address::PhysicalAddress, device::RawDeviceMemoryMapping}, + thread::{CurrentThread, Thread}, +}; extern "Rust" { pub fn __acquire_irq_guard() -> bool; @@ -19,4 +25,18 @@ extern "Rust" { count: usize, ) -> Result<RawDeviceMemoryMapping, Error>; pub fn __unmap_device_pages(mapping: &RawDeviceMemoryMapping); + + // SAFETY: Both the kernel-side and api-side Threads are sized and Arc<___> has the same value + // for them + pub fn __create_kthread( + name: String, + func: extern "C" fn(usize) -> !, + arg: usize, + ) -> Result<Arc<Thread>, Error>; + pub fn __enqueue(t: &Arc<Thread>); + pub fn __current_thread() -> CurrentThread; + pub fn __suspend_current(t: &CurrentThread) -> Result<(), Error>; + pub fn __exit_current(t: &CurrentThread, code: ExitCode) -> !; + + pub fn __monotonic_timestamp() -> Result<Duration, Error>; } diff --git a/lib/kernel-util/src/lib.rs b/lib/kernel-util/src/lib.rs index 7f83491a..573a8066 100644 --- a/lib/kernel-util/src/lib.rs +++ b/lib/kernel-util/src/lib.rs @@ -5,7 +5,9 @@ const_trait_impl, effects, slice_ptr_get, - strict_provenance + strict_provenance, + never_type, + let_chains )] extern crate alloc; @@ -13,7 +15,9 @@ extern crate alloc; pub(crate) mod api; pub mod mem; +pub mod runtime; pub mod sync; +pub mod thread; pub mod util; #[repr(C)] diff --git a/lib/kernel-util/src/runtime/executor.rs b/lib/kernel-util/src/runtime/executor.rs new file mode 100644 index 00000000..cd3295d1 --- /dev/null +++ b/lib/kernel-util/src/runtime/executor.rs @@ -0,0 +1,65 @@ +use core::task::{Context, Poll}; + +use alloc::{boxed::Box, format, sync::Arc}; +use futures_util::{task::waker_ref, Future}; +use yggdrasil_abi::error::Error; + +use crate::thread::Thread; + +use super::{ + task::{Task, Termination}, + task_queue, +}; + +/// Pushes a task into the executor's queue +pub fn enqueue(task: Arc<Task>) -> Result<(), Error> { + task_queue::push_task(task) +} + +/// Spawns a background worker to execute the tasks from the global queue +pub fn spawn_async_worker(index: usize) -> Result<(), Error> { + let name = format!("[async-worker-{}]", index); + + Thread::spawn(name, move || { + loop { + let task = task_queue::pop_task().unwrap(); // queue.block_pop().unwrap(); + let mut future_slot = task.future.lock(); + + if let Some(mut future) = future_slot.take() { + let waker = waker_ref(&task); + let context = &mut Context::from_waker(&waker); + + if future.as_mut().poll(context).is_pending() { + *future_slot = Some(future); + } + } + } + }) +} + +/// Creates a new task for the [Future] and queues it for execution in background +pub fn spawn<T: Termination, F: Future<Output = T> + Send + 'static>( + future: F, +) -> Result<(), Error> { + enqueue(Task::new(future)) +} + +/// Runs a [Future] to its completion on the current thread +pub fn run_to_completion<'a, T, F: Future<Output = T> + Send + 'a>(future: F) -> Result<T, Error> { + let thread = Thread::current(); + let mut future = Box::pin(future); + + loop { + let waker = waker_ref(&thread); + let context = &mut Context::from_waker(&waker); + + match future.as_mut().poll(context) { + Poll::Ready(value) => break Ok(value), + Poll::Pending => { + if let Err(error) = thread.suspend() { + break Err(error); + } + } + } + } +} diff --git a/lib/kernel-util/src/runtime/macros.rs b/lib/kernel-util/src/runtime/macros.rs new file mode 100644 index 00000000..00c7da9b --- /dev/null +++ b/lib/kernel-util/src/runtime/macros.rs @@ -0,0 +1,8 @@ +#[macro_export] +macro_rules! block { + ($($stmt:tt)*) => { + $crate::runtime::run_to_completion(alloc::boxed::Box::pin(async move { + $($stmt)* + })) + }; +} diff --git a/lib/kernel-util/src/runtime/mod.rs b/lib/kernel-util/src/runtime/mod.rs new file mode 100644 index 00000000..d9e93dd0 --- /dev/null +++ b/lib/kernel-util/src/runtime/mod.rs @@ -0,0 +1,54 @@ +use core::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +#[macro_use] +mod macros; + +mod executor; +mod task; +mod task_queue; +mod waker; + +pub use executor::{run_to_completion, spawn, spawn_async_worker}; +use futures_util::Future; +pub use task_queue::init_task_queue; +pub use waker::QueueWaker; + +use crate::api; + +static SLEEP_WAKER: QueueWaker = QueueWaker::new(); + +/// Suspends the task until given duration passes +pub fn sleep(duration: Duration) -> impl Future<Output = ()> { + struct SleepFuture { + deadline: Duration, + } + + impl Future for SleepFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + SLEEP_WAKER.register(cx.waker()); + let now = unsafe { api::__monotonic_timestamp() }.unwrap(); + if now >= self.deadline { + SLEEP_WAKER.remove(cx.waker()); + Poll::Ready(()) + } else { + Poll::Pending + } + } + } + + let now = unsafe { api::__monotonic_timestamp() }.unwrap(); + let deadline = now + duration; + + SleepFuture { deadline } +} + +/// Updates the runtime's time +pub fn tick(_now: Duration) { + SLEEP_WAKER.wake_all(); +} diff --git a/lib/kernel-util/src/runtime/task.rs b/lib/kernel-util/src/runtime/task.rs new file mode 100644 index 00000000..1d5602a1 --- /dev/null +++ b/lib/kernel-util/src/runtime/task.rs @@ -0,0 +1,46 @@ +use core::fmt; + +use alloc::sync::Arc; +use futures_util::{future::BoxFuture, task::ArcWake, Future, FutureExt}; + +use crate::sync::IrqSafeSpinlock; + +use super::executor; + +pub trait Termination { + fn print(&self); +} + +pub struct Task { + pub(super) future: IrqSafeSpinlock<Option<BoxFuture<'static, ()>>>, +} + +impl ArcWake for Task { + fn wake_by_ref(arc_self: &Arc<Self>) { + executor::enqueue(arc_self.clone()).unwrap(); + } +} + +impl Task { + pub fn new<T: Termination, F: Future<Output = T> + Send + 'static>(future: F) -> Arc<Self> { + let future = IrqSafeSpinlock::new(Some( + async move { + future.await.print(); + } + .boxed(), + )); + Arc::new(Self { future }) + } +} + +impl Termination for () { + fn print(&self) {} +} + +impl<T, E: fmt::Debug> Termination for Result<T, E> { + fn print(&self) { + if let Err(error) = self { + log::error!("A task finished with an error: {:?}", error); + } + } +} diff --git a/lib/kernel-util/src/runtime/task_queue.rs b/lib/kernel-util/src/runtime/task_queue.rs new file mode 100644 index 00000000..18e036f2 --- /dev/null +++ b/lib/kernel-util/src/runtime/task_queue.rs @@ -0,0 +1,70 @@ +use alloc::sync::Arc; +use crossbeam_queue::ArrayQueue; +use yggdrasil_abi::error::Error; + +use crate::{sync::IrqGuard, thread::Thread, util::OneTimeInit}; + +use super::task::Task; + +pub(super) static TASK_QUEUE: OneTimeInit<TaskQueue> = OneTimeInit::new(); + +pub(super) struct TaskQueue { + // Queue of workers waiting for an item + pending_workers: ArrayQueue<Arc<Thread>>, + task_queue: ArrayQueue<Arc<Task>>, +} + +impl TaskQueue { + pub fn new(task_capacity: usize) -> Self { + assert!(task_capacity > 0); + Self { + pending_workers: ArrayQueue::new(16), + task_queue: ArrayQueue::new(task_capacity), + } + } + + fn wakeup_one(&self) { + if let Some(worker) = self.pending_workers.pop() { + worker.enqueue(); + } + } + + pub fn enqueue(&self, task: Arc<Task>) -> Result<(), Error> { + let _irq = IrqGuard::acquire(); + if self.task_queue.push(task).is_err() { + todo!(); + } + self.wakeup_one(); + Ok(()) + } + + pub fn dequeue(&self) -> Result<Arc<Task>, Error> { + let thread = Thread::current(); + // assert!(ArchitectureImpl::interrupt_mask()); + loop { + if let Some(task) = self.task_queue.pop() { + return Ok(task); + } + + if self.pending_workers.push(thread.clone()).is_err() { + panic!("Pending worker queue overflow"); + } + + // This must not fail. Signals must not be raised. + thread.suspend().unwrap(); + } + } +} + +/// Initializes the global async/await task queue +pub fn init_task_queue() { + TASK_QUEUE.init(TaskQueue::new(128)); +} + +pub(super) fn push_task(task: Arc<Task>) -> Result<(), Error> { + TASK_QUEUE.get().enqueue(task) +} + +pub(super) fn pop_task() -> Result<Arc<Task>, Error> { + TASK_QUEUE.get().dequeue() +} diff --git a/lib/kernel-util/src/runtime/waker.rs b/lib/kernel-util/src/runtime/waker.rs new file mode 100644 index 00000000..24b3df55 --- /dev/null +++ b/lib/kernel-util/src/runtime/waker.rs @@ -0,0 +1,72 @@ +use core::task::Waker; + +use alloc::collections::VecDeque; + +use crate::sync::IrqSafeSpinlock; + +/// Async/await primitive to suspend and wake up tasks waiting on some shared resource +pub struct QueueWaker { + queue: IrqSafeSpinlock<VecDeque<Waker>>, +} + +impl QueueWaker { + /// Constructs an empty [QueueWaker] + pub const fn new() -> Self { + Self { + queue: IrqSafeSpinlock::new(VecDeque::new()), + } + } + + /// Registers a [Waker] reference to be waken up by this [QueueWaker] + pub fn register(&self, waker: &Waker) { + let mut queue = self.queue.lock(); + + if queue.iter().any(|other| other.will_wake(waker)) { + return; + } + + queue.push_back(waker.clone()); + } + + /// Removes a [Waker] reference from this [QueueWaker] + pub fn remove(&self, waker: &Waker) -> bool { + let mut queue = self.queue.lock(); + let mut index = 0; + let mut removed = false; + + while index < queue.len() { + if queue[index].will_wake(waker) { + removed = true; + queue.remove(index); + } + index += 1; + } + + removed + } + + /// Wakes up up to `limit` tasks waiting on this queue + pub fn wake_some(&self, limit: usize) -> usize { + let mut queue = self.queue.lock(); + let mut count = 0; + + while count < limit + && let Some(item) = queue.pop_front() + { + item.wake(); + count += 1; + } + + count + } + + /// Wakes up a single task waiting on this queue + pub fn wake_one(&self) -> bool { + self.wake_some(1) != 0 + } + + /// Wakes up all tasks waiting on this queue + pub fn wake_all(&self) -> usize { + self.wake_some(usize::MAX) + } +} diff --git a/lib/kernel-util/src/thread.rs b/lib/kernel-util/src/thread.rs new file mode 100644 index 00000000..384fb36d --- /dev/null +++ b/lib/kernel-util/src/thread.rs @@ -0,0 +1,102 @@ +use core::{fmt, ops::Deref}; + +use alloc::{boxed::Box, string::String, sync::Arc}; +use futures_util::task::ArcWake; +use yggdrasil_abi::{error::Error, process::ExitCode}; + +use crate::{api, sync::IrqGuard}; + +#[repr(C)] +pub(crate) struct Thread(!); +#[repr(C)] +pub(crate) struct CurrentThread(Arc<Thread>, IrqGuard); + +/// Conversion trait to allow multiple kernel closure return types +pub trait Termination { + /// Converts the closure return type into [ExitCode] + fn into_exit_code(self) -> ExitCode; +} + +impl<T, E: fmt::Debug> Termination for Result<T, E> { + fn into_exit_code(self) -> ExitCode { + match self { + Ok(_) => ExitCode::SUCCESS, + Err(err) => { + log::warn!("Kernel thread failed: {:?}", err); + ExitCode::Exited(1) + } + } + } +} + +impl Termination for ExitCode { + fn into_exit_code(self) -> ExitCode { + self + } +} + +impl Termination for () { + fn into_exit_code(self) -> ExitCode { + ExitCode::SUCCESS + } +} + +impl Thread { + pub fn spawn<S: Into<String>, F: FnOnce() + Send + 'static>( + name: S, + f: F, + ) -> Result<(), Error> { + extern "C" fn closure_wrapper<F: FnOnce() + Send + 'static>(closure_addr: usize) -> ! { + let closure = unsafe { Box::from_raw(closure_addr as *mut F) }; + let result = closure(); + Thread::current().exit(result.into_exit_code()); + unreachable!(); + } + + let closure = Box::new(f); + log::debug!("closure: {:p}", closure); + let thread = unsafe { + api::__create_kthread( + name.into(), + closure_wrapper::<F>, + Box::into_raw(closure) as usize, + ) + }?; + + thread.enqueue(); + + Ok(()) + } + + pub fn current() -> CurrentThread { + unsafe { api::__current_thread() } + } + + pub fn enqueue(self: &Arc<Self>) { + unsafe { api::__enqueue(self) } + } +} + +impl CurrentThread { + pub fn suspend(&self) -> Result<(), Error> { + unsafe { api::__suspend_current(self) } + } + + pub fn exit(&self, code: ExitCode) { + unsafe { api::__exit_current(self, code) } + } +} + +impl ArcWake for Thread { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.clone().enqueue() + } +} + +impl Deref for CurrentThread { + type Target = Arc<Thread>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/src/arch/mod.rs b/src/arch/mod.rs index d599433d..f16783a4 100644 --- a/src/arch/mod.rs +++ b/src/arch/mod.rs @@ -1,5 +1,7 @@ //! Provides architecture/platform-specific implementation details +use core::time::Duration; + use abi::error::Error; /// Returns an absolute address to the given symbol @@ -264,3 +266,8 @@ fn __map_device_pages( fn __unmap_device_pages(mapping: &RawDeviceMemoryMapping) { unsafe { ARCHITECTURE.unmap_device_memory(mapping) } } + +#[no_mangle] +fn __monotonic_timestamp() -> Result<Duration, Error> { + ARCHITECTURE.monotonic_timer().monotonic_timestamp() +} diff --git a/src/arch/x86_64/boot/mod.rs b/src/arch/x86_64/boot/mod.rs index c15f29eb..c41b561b 100644 --- a/src/arch/x86_64/boot/mod.rs +++ b/src/arch/x86_64/boot/mod.rs @@ -1,6 +1,7 @@ //! x86-64 boot and entry functions use core::{arch::global_asm, sync::atomic::Ordering}; +use kernel_util::runtime; use tock_registers::interfaces::Writeable; use yboot_proto::{ v1::{FramebufferOption, MemoryMap}, @@ -12,7 +13,6 @@ use crate::{ fs::devfs, kernel_main, kernel_secondary_main, mem::KERNEL_VIRT_OFFSET, - task::runtime, }; use super::{cpuid::init_cpuid, exception, ARCHITECTURE}; diff --git a/src/arch/x86_64/peripherals/i8253.rs b/src/arch/x86_64/peripherals/i8253.rs index 3eab2a46..3a94e1cc 100644 --- a/src/arch/x86_64/peripherals/i8253.rs +++ b/src/arch/x86_64/peripherals/i8253.rs @@ -2,17 +2,14 @@ use core::time::Duration; use abi::error::Error; use device_api::{interrupt::InterruptHandler, timer::MonotonicTimestampProviderDevice, Device}; -use kernel_util::sync::IrqSafeSpinlock; +use kernel_util::{runtime, sync::IrqSafeSpinlock}; -use crate::{ - arch::{ - x86_64::{ - intrinsics::{IoPort, IoPortAccess}, - IrqNumber, - }, - Architecture, ARCHITECTURE, +use crate::arch::{ + x86_64::{ + intrinsics::{IoPort, IoPortAccess}, + IrqNumber, }, - task::runtime, + Architecture, ARCHITECTURE, }; const FREQUENCY: u32 = 1193180; diff --git a/src/device/display/console.rs b/src/device/display/console.rs index 68548e22..0823aadb 100644 --- a/src/device/display/console.rs +++ b/src/device/display/console.rs @@ -5,9 +5,9 @@ use core::time::Duration; use abi::{error::Error, primitive_enum}; use alloc::{vec, vec::Vec}; use bitflags::bitflags; -use kernel_util::{sync::IrqSafeSpinlock, util::StaticVector}; +use kernel_util::{runtime, sync::IrqSafeSpinlock, util::StaticVector}; -use crate::{debug::DebugSink, task::runtime}; +use crate::debug::DebugSink; const CONSOLE_ROW_LEN: usize = 80; const MAX_CSI_ARGS: usize = 8; diff --git a/src/device/nvme/mod.rs b/src/device/nvme/mod.rs index 0e0ac415..f54a3288 100644 --- a/src/device/nvme/mod.rs +++ b/src/device/nvme/mod.rs @@ -9,6 +9,7 @@ use kernel_util::{ address::{FromRaw, IntoRaw, PhysicalAddress}, device::{DeviceMemoryIo, DeviceMemoryIoMut}, }, + runtime, sync::IrqSafeSpinlock, util::OneTimeInit, }; @@ -31,7 +32,6 @@ use crate::{ drive::NvmeDrive, queue::{CompletionQueueEntry, SubmissionQueueEntry}, }, - task::runtime, }; use self::{ diff --git a/src/device/nvme/queue.rs b/src/device/nvme/queue.rs index 3d027954..1381c481 100644 --- a/src/device/nvme/queue.rs +++ b/src/device/nvme/queue.rs @@ -17,12 +17,11 @@ use kernel_util::{ address::{AsPhysicalAddress, IntoRaw, PhysicalAddress}, PageBox, }, + runtime::QueueWaker, sync::IrqSafeSpinlock, }; use static_assertions::const_assert; -use crate::task::runtime::QueueWaker; - use super::{ command::{Command, Request}, error::NvmeError, diff --git a/src/device/timer.rs b/src/device/timer.rs index a24002bc..17f1690a 100644 --- a/src/device/timer.rs +++ b/src/device/timer.rs @@ -1,10 +1 @@ //! Timer device utilities - -use core::time::Duration; - -use crate::task::runtime; - -/// Global system timer tick handler -pub fn tick(now: Duration) { - runtime::tick(now); -} diff --git a/src/device/tty.rs b/src/device/tty.rs index 826d942f..dc2b5da9 100644 --- a/src/device/tty.rs +++ b/src/device/tty.rs @@ -15,12 +15,13 @@ use crate::{ #[cfg(feature = "fb_console")] pub mod combined { //! Combined console + keyboard terminal device - use crate::{block, task::process::ProcessId}; + use crate::task::process::ProcessId; use abi::{ error::Error, io::{DeviceRequest, TerminalSize}, }; use device_api::{input::KeyboardConsumer, serial::SerialDevice}; + use kernel_util::block; use vfs::CharDevice; // use vfs::CharDevice; diff --git a/src/init.rs b/src/init.rs index 9579d0ed..619de99a 100644 --- a/src/init.rs +++ b/src/init.rs @@ -28,7 +28,8 @@ pub fn kinit() -> Result<(), Error> { #[cfg(feature = "fb_console")] { - use crate::{device::display::console::update_consoles_task, task::runtime}; + use crate::device::display::console::update_consoles_task; + use kernel_util::runtime; runtime::spawn(async move { update_consoles_task().await; diff --git a/src/syscall/mod.rs b/src/syscall/mod.rs index 7d8b459d..a881a1b5 100644 --- a/src/syscall/mod.rs +++ b/src/syscall/mod.rs @@ -8,19 +8,17 @@ use abi::{ syscall::SyscallFunction, }; use alloc::sync::Arc; -use kernel_util::sync::IrqSafeSpinlockGuard; +use kernel_util::{block, runtime, sync::IrqSafeSpinlockGuard}; use vfs::{IoContext, NodeRef, Read, Seek, Write}; use yggdrasil_abi::{error::SyscallResult, io::MountOptions}; use crate::{ - block, debug::LogLevel, fs, mem::{phys, table::MapAttributes}, proc::{self, io::ProcessIo, random}, task::{ process::{Process, ProcessId}, - runtime, thread::Thread, }, }; @@ -134,10 +132,12 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error> let file = io.ioctx().open(at, path, opts, mode)?; // TODO NO_CTTY? - if process.session_terminal().is_none() && - let Some(node) = file.node() && node.is_terminal() { - debugln!("Session terminal set for #{}: {}", process.id(), path); - process.set_session_terminal(node.clone()); + if process.session_terminal().is_none() + && let Some(node) = file.node() + && node.is_terminal() + { + debugln!("Session terminal set for #{}: {}", process.id(), path); + process.set_session_terminal(node.clone()); } let fd = io.place_file(file)?; diff --git a/src/task/context.rs b/src/task/context.rs index ecac1854..e3a7e23f 100644 --- a/src/task/context.rs +++ b/src/task/context.rs @@ -1,10 +1,9 @@ //! Platform-specific task context manipulation interfaces -use core::fmt; - -use abi::{arch::SavedFrame, error::Error, process::ExitCode}; +use abi::{arch::SavedFrame, error::Error}; use alloc::boxed::Box; use cfg_if::cfg_if; +use kernel_util::thread::Termination; use crate::task::thread::Thread; @@ -18,36 +17,6 @@ cfg_if! { } } -/// Conversion trait to allow multiple kernel closure return types -pub trait Termination { - /// Converts the closure return type into [ExitCode] - fn into_exit_code(self) -> ExitCode; -} - -impl<T, E: fmt::Debug> Termination for Result<T, E> { - fn into_exit_code(self) -> ExitCode { - match self { - Ok(_) => ExitCode::SUCCESS, - Err(err) => { - warnln!("Kernel thread failed: {:?}", err); - ExitCode::Exited(1) - } - } - } -} - -impl Termination for ExitCode { - fn into_exit_code(self) -> ExitCode { - self - } -} - -impl Termination for () { - fn into_exit_code(self) -> ExitCode { - ExitCode::SUCCESS - } -} - /// Interface for task state save/restore mechanisms pub trait TaskFrame { /// Creates a "snapshot" of a exception/syscall frame diff --git a/src/task/mod.rs b/src/task/mod.rs index 09a24826..6e21f0b4 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -4,18 +4,17 @@ use abi::error::Error; use alloc::{string::String, vec::Vec}; -use kernel_util::sync::SpinFence; +use kernel_util::{runtime, sync::SpinFence, thread::Termination}; use crate::{ arch::{Architecture, ArchitectureImpl}, task::{sched::CpuQueue, thread::Thread}, }; -use self::context::{TaskContextImpl, Termination}; +use self::context::TaskContextImpl; pub mod context; pub mod process; -pub mod runtime; pub mod sched; pub mod sync; pub mod thread; diff --git a/src/task/process.rs b/src/task/process.rs index 55897ace..261e02e8 100644 --- a/src/task/process.rs +++ b/src/task/process.rs @@ -14,7 +14,7 @@ use abi::{ }; use alloc::{collections::BTreeMap, string::String, sync::Arc, vec::Vec}; use futures_util::Future; -use kernel_util::sync::IrqSafeSpinlock; +use kernel_util::{runtime::QueueWaker, sync::IrqSafeSpinlock}; use vfs::NodeRef; use crate::{ @@ -24,7 +24,6 @@ use crate::{ }; use super::{ - runtime::QueueWaker, sync::UserspaceMutex, thread::{Thread, ThreadId}, TaskContext, diff --git a/src/task/runtime/mod.rs b/src/task/runtime/mod.rs index b20bfa02..e40af2a1 100644 --- a/src/task/runtime/mod.rs +++ b/src/task/runtime/mod.rs @@ -1,5 +1,51 @@ //! Async/await runtime implementation +// use core::{ +// pin::Pin, +// task::{Context, Poll}, +// time::Duration, +// }; +// +// use futures_util::Future; +// +// use crate::arch::{Architecture, ARCHITECTURE}; +// +// mod executor; +// mod macros; +// mod task; +// mod task_queue; +// mod waker; +// +// pub use executor::{run_to_completion, spawn, spawn_async_worker}; +// pub use task_queue::init_task_queue; +// pub use waker::QueueWaker; +// +// /// [Future] implementation that wraps a poll-function +// pub struct PollFn<F> { +// f: F, +// } +// +// impl<T, F> Future for PollFn<F> +// where +// F: FnMut(&mut Context) -> Poll<T>, +// { +// type Output = T; +// +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { +// #[allow(clippy::needless_borrow)] +// (unsafe { &mut self.get_unchecked_mut().f })(cx) +// } +// } +// +// /// Constructs a [PollFn] from given poll-function +// pub fn poll_fn<T, F>(f: F) -> PollFn<F> +// where +// F: FnMut(&mut Context) -> Poll<T>, +// { +// PollFn { f } +// } +// + use core::{ pin::Pin, task::{Context, Poll}, @@ -7,80 +53,6 @@ use core::{ }; use futures_util::Future; +use kernel_util::runtime::QueueWaker; use crate::arch::{Architecture, ARCHITECTURE}; - -mod executor; -mod macros; -mod task; -mod task_queue; -mod waker; - -pub use executor::{run_to_completion, spawn, spawn_async_worker}; -pub use task_queue::init_task_queue; -pub use waker::QueueWaker; - -/// [Future] implementation that wraps a poll-function -pub struct PollFn<F> { - f: F, -} - -impl<T, F> Future for PollFn<F> -where - F: FnMut(&mut Context) -> Poll<T>, -{ - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[allow(clippy::needless_borrow)] - (unsafe { &mut self.get_unchecked_mut().f })(cx) - } -} - -/// Constructs a [PollFn] from given poll-function -pub fn poll_fn<T, F>(f: F) -> PollFn<F> -where - F: FnMut(&mut Context) -> Poll<T>, -{ - PollFn { f } -} - -static SLEEP_WAKER: QueueWaker = QueueWaker::new(); - -/// Suspends the task until given duration passes -pub fn sleep(duration: Duration) -> impl Future<Output = ()> { - struct SleepFuture { - deadline: Duration, - } - - impl Future for SleepFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - SLEEP_WAKER.register(cx.waker()); - let now = ARCHITECTURE - .monotonic_timer() - .monotonic_timestamp() - .unwrap(); - if now >= self.deadline { - SLEEP_WAKER.remove(cx.waker()); - Poll::Ready(()) - } else { - Poll::Pending - } - } - } - - let now = ARCHITECTURE - .monotonic_timer() - .monotonic_timestamp() - .unwrap(); - let deadline = now + duration; - - SleepFuture { deadline } -} - -/// Updates the runtime's time -pub fn tick(_now: Duration) { - SLEEP_WAKER.wake_all(); -} diff --git a/src/task/sync.rs b/src/task/sync.rs index fcc0fb5a..3776b154 100644 --- a/src/task/sync.rs +++ b/src/task/sync.rs @@ -8,8 +8,7 @@ use core::{ use alloc::sync::Arc; use futures_util::Future; - -use super::runtime::QueueWaker; +use kernel_util::runtime::QueueWaker; /// User-space mutex (like BSD/Linux's futex) data structure pub struct UserspaceMutex { diff --git a/src/task/thread.rs b/src/task/thread.rs index 9aa9d51b..cfc9b0d0 100644 --- a/src/task/thread.rs +++ b/src/task/thread.rs @@ -20,17 +20,18 @@ use alloc::{ }; use atomic_enum::atomic_enum; use futures_util::{task::ArcWake, Future}; -use kernel_util::sync::{IrqGuard, IrqSafeSpinlock}; +use kernel_util::{ + block, + runtime::QueueWaker, + sync::{IrqGuard, IrqSafeSpinlock}, +}; use crate::{ - block, mem::{process::ProcessAddressSpace, ForeignPointer}, task::{context::TaskContextImpl, Cpu}, }; -use super::{ - context::TaskFrame, process::Process, runtime::QueueWaker, sched::CpuQueue, TaskContext, -}; +use super::{context::TaskFrame, process::Process, sched::CpuQueue, TaskContext}; /// Represents the states a thread can be at some point in time #[atomic_enum] @@ -56,6 +57,7 @@ pub enum ThreadId { } /// Wrapper which guarantees the thread referred to is the current one on the current CPU +#[repr(C)] pub struct CurrentThread(Arc<Thread>, IrqGuard); struct SignalEntry { @@ -545,3 +547,36 @@ impl fmt::Display for ThreadId { } } } + +// External API + +#[no_mangle] +fn __create_kthread( + name: String, + func: extern "C" fn(usize) -> !, + arg: usize, +) -> Result<Arc<Thread>, Error> { + let context = TaskContext::kernel(func, arg)?; + Ok(Thread::new_kthread(name, context)) +} + +#[no_mangle] +fn __enqueue(t: &Arc<Thread>) { + t.enqueue_somewhere(); +} + +#[no_mangle] +fn __current_thread() -> CurrentThread { + Thread::current() +} + +#[no_mangle] +fn __suspend_current(t: &CurrentThread) -> Result<(), Error> { + t.suspend() +} + +#[no_mangle] +fn __exit_current(t: &CurrentThread, code: ExitCode) -> ! { + t.exit(code); + unreachable!(); +} diff --git a/src/util/queue.rs b/src/util/queue.rs index fe888221..67969503 100644 --- a/src/util/queue.rs +++ b/src/util/queue.rs @@ -8,8 +8,7 @@ use core::{ use alloc::sync::Arc; use crossbeam_queue::ArrayQueue; use futures_util::Future; - -use crate::task::runtime::QueueWaker; +use kernel_util::runtime::QueueWaker; /// Asynchronous queue pub struct AsyncQueue<T> { diff --git a/src/util/ring.rs b/src/util/ring.rs index 15ad752a..c9605d44 100644 --- a/src/util/ring.rs +++ b/src/util/ring.rs @@ -8,9 +8,7 @@ use core::{ use abi::error::Error; use alloc::sync::Arc; use futures_util::Future; -use kernel_util::sync::IrqSafeSpinlock; - -use crate::task::runtime::QueueWaker; +use kernel_util::{runtime::QueueWaker, sync::IrqSafeSpinlock}; /// Ring buffer base pub struct RingBuffer<T, const N: usize> {