From 6848f6c56d3b524facd2dd9b14a3ed6c7eadb96d Mon Sep 17 00:00:00 2001 From: Mark Poliakov Date: Thu, 14 Dec 2023 18:45:56 +0200 Subject: [PATCH] char/pipe: implement pipes --- lib/vfs/Cargo.toml | 1 + lib/vfs/src/file/mod.rs | 23 +++- lib/vfs/src/file/pipe.rs | 226 +++++++++++++++++++++++++++++++++++++++ src/proc/io.rs | 12 ++- src/syscall/mod.rs | 32 +++++- src/task/process.rs | 2 +- 6 files changed, 288 insertions(+), 8 deletions(-) create mode 100644 lib/vfs/src/file/pipe.rs diff --git a/lib/vfs/Cargo.toml b/lib/vfs/Cargo.toml index bdb44b29..0a292833 100644 --- a/lib/vfs/Cargo.toml +++ b/lib/vfs/Cargo.toml @@ -13,6 +13,7 @@ kernel-util = { path = "../kernel-util" } ygg_driver_block = { path = "../../driver/block/core" } log = "0.4.20" +futures-util = { version = "0.3.28", default-features = false, features = ["alloc", "async-await"] } [dev-dependencies] hosted-tests = { path = "../hosted-tests" } diff --git a/lib/vfs/src/file/mod.rs b/lib/vfs/src/file/mod.rs index 00f0b5c8..6ff08c9e 100644 --- a/lib/vfs/src/file/mod.rs +++ b/lib/vfs/src/file/mod.rs @@ -16,11 +16,13 @@ use crate::{ use self::{ device::{BlockFile, CharFile}, directory::DirectoryFile, + pipe::PipeEnd, regular::RegularFile, }; mod device; mod directory; +mod pipe; mod regular; /// Per-file optional instance data created when a regular file is opened @@ -45,9 +47,19 @@ pub enum File { Regular(RegularFile), Block(BlockFile), Char(CharFile), + AnonymousPipe(PipeEnd), } impl File { + /// Constructs a pipe pair, returning its `(read, write)` ends + pub fn new_pipe_pair(capacity: usize) -> (Arc, Arc) { + let (read, write) = PipeEnd::new_pair(capacity); + ( + Arc::new(Self::AnonymousPipe(read)), + Arc::new(Self::AnonymousPipe(write)), + ) + } + pub(crate) fn directory(node: NodeRef, position: DirectoryOpenPosition) -> Arc { let position = IrqSafeSpinlock::new(position.into()); Arc::new(Self::Directory(DirectoryFile { node, position })) @@ -133,6 +145,7 @@ impl File { Self::Regular(file) => Some(&file.node), Self::Block(file) => Some(&file.node), Self::Char(file) => Some(&file.node), + Self::AnonymousPipe(_) => None, } } } @@ -143,6 +156,7 @@ impl Read for File { Self::Regular(file) => file.read(buf), Self::Block(file) => file.read(buf), Self::Char(file) => file.read(buf), + Self::AnonymousPipe(pipe) => pipe.read(buf), Self::Directory(_) => Err(Error::IsADirectory), } } @@ -154,6 +168,7 @@ impl Write for File { Self::Regular(file) => file.write(buf), Self::Block(file) => file.write(buf), Self::Char(file) => file.write(buf), + Self::AnonymousPipe(pipe) => pipe.write(buf), Self::Directory(_) => Err(Error::IsADirectory), } } @@ -164,7 +179,7 @@ impl Seek for File { match self { Self::Regular(file) => Ok(*file.position.lock()), Self::Block(file) => Ok(*file.position.lock()), - Self::Char(_) => Err(Error::InvalidOperation), + Self::Char(_) | Self::AnonymousPipe(_) => Err(Error::InvalidOperation), Self::Directory(_) => Err(Error::IsADirectory), } } @@ -173,7 +188,7 @@ impl Seek for File { match self { Self::Regular(file) => file.seek(from), Self::Block(file) => file.seek(from), - Self::Char(_) => Err(Error::InvalidOperation), + Self::Char(_) | Self::AnonymousPipe(_) => Err(Error::InvalidOperation), Self::Directory(_) => Err(Error::IsADirectory), } } @@ -200,6 +215,7 @@ impl fmt::Debug for File { .field("write", &file.write) .finish_non_exhaustive(), Self::Directory(_) => f.debug_struct("DirectoryFile").finish_non_exhaustive(), + Self::AnonymousPipe(_) => f.debug_struct("AnonymousPipe").finish_non_exhaustive(), } } } @@ -210,6 +226,7 @@ mod tests { use std::sync::{Arc, Mutex}; use kernel_util::sync::IrqSafeSpinlock; + use ygg_driver_block::BlockDevice; use yggdrasil_abi::{ error::Error, io::{DirectoryEntry, FileType, OpenOptions, SeekFrom}, @@ -217,7 +234,7 @@ mod tests { }; use crate::{ - device::{BlockDevice, CharDevice}, + device::CharDevice, file::DirectoryOpenPosition, impls::const_value_node, node::{AccessToken, CommonImpl, DirectoryImpl, Node, NodeFlags, NodeRef, RegularImpl}, diff --git a/lib/vfs/src/file/pipe.rs b/lib/vfs/src/file/pipe.rs new file mode 100644 index 00000000..7e662344 --- /dev/null +++ b/lib/vfs/src/file/pipe.rs @@ -0,0 +1,226 @@ +use core::{ + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + task::{Context, Poll}, +}; + +use alloc::{sync::Arc, vec, vec::Vec}; +use futures_util::{task::AtomicWaker, Future}; +use kernel_util::{block, sync::IrqSafeSpinlock}; +use yggdrasil_abi::error::Error; + +struct PipeInner { + data: Vec, + capacity: usize, + rd: usize, + wr: usize, +} + +pub struct Pipe { + inner: IrqSafeSpinlock, + shutdown: AtomicBool, + read_notify: AtomicWaker, + write_notify: AtomicWaker, +} + +pub enum PipeEnd { + Read(Arc), + Write(Arc), +} + +impl PipeInner { + pub fn new(capacity: usize) -> Self { + Self { + data: vec![0; capacity], + capacity, + rd: 0, + wr: 0, + } + } + + pub fn can_write(&self) -> bool { + (self.wr + 1) % self.capacity != self.rd + } + + pub fn can_read(&self) -> bool { + self.rd != self.wr + } + + pub unsafe fn write(&mut self, val: u8) { + self.data[self.wr] = val; + self.wr = (self.wr + 1) % self.capacity; + } + + pub unsafe fn read(&mut self) -> u8 { + let val = self.data[self.rd]; + self.rd = (self.rd + 1) % self.capacity; + val + } + + fn try_write(&mut self, val: u8) -> bool { + if self.can_write() { + unsafe { + self.write(val); + } + true + } else { + false + } + } + + fn try_read(&mut self) -> Option { + if self.can_read() { + Some(unsafe { self.read() }) + } else { + None + } + } +} + +impl Pipe { + pub fn new(capacity: usize) -> Self { + Self { + inner: IrqSafeSpinlock::new(PipeInner::new(capacity)), + shutdown: AtomicBool::new(false), + read_notify: AtomicWaker::new(), + write_notify: AtomicWaker::new(), + } + } + + pub fn shutdown(&self) { + self.shutdown.store(true, Ordering::Release); + self.read_notify.wake(); + self.write_notify.wake(); + } + + pub fn blocking_write(&self, val: u8) -> impl Future> + '_ { + struct F<'a> { + pipe: &'a Pipe, + val: u8, + } + + impl<'a> Future for F<'a> { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut lock = self.pipe.inner.lock(); + + // Try fast path before acquiring write notify to avoid unnecessary contention + if self.pipe.shutdown.load(Ordering::Acquire) { + // TODO BrokenPipe + return Poll::Ready(Err(Error::ReadOnly)); + } else if lock.try_write(self.val) { + self.pipe.read_notify.wake(); + return Poll::Ready(Ok(())); + } + + self.pipe.write_notify.register(cx.waker()); + + if self.pipe.shutdown.load(Ordering::Acquire) { + Poll::Ready(Err(Error::ReadOnly)) + } else if lock.try_write(self.val) { + self.pipe.read_notify.wake(); + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + } + + F { pipe: self, val } + } + + pub fn blocking_read(&self) -> impl Future> + '_ { + struct F<'a> { + pipe: &'a Pipe, + } + + impl<'a> Future for F<'a> { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut lock = self.pipe.inner.lock(); + + if let Some(val) = lock.try_read() { + self.pipe.write_notify.wake(); + return Poll::Ready(Some(val)); + } else if self.pipe.shutdown.load(Ordering::Acquire) { + return Poll::Ready(None); + } + + self.pipe.read_notify.register(cx.waker()); + + if let Some(val) = lock.try_read() { + Poll::Ready(Some(val)) + } else if self.pipe.shutdown.load(Ordering::Acquire) { + Poll::Ready(None) + } else { + Poll::Pending + } + } + } + + F { pipe: self } + } +} + +impl PipeEnd { + pub fn new_pair(capacity: usize) -> (PipeEnd, PipeEnd) { + let pipe = Arc::new(Pipe::new(capacity)); + let read = PipeEnd::Read(pipe.clone()); + let write = PipeEnd::Write(pipe); + + (read, write) + } + + pub fn read(&self, buf: &mut [u8]) -> Result { + let PipeEnd::Read(read) = self else { + return Err(Error::InvalidOperation); + }; + + block! { + let mut pos = 0; + let mut rem = buf.len(); + + while rem != 0 { + if let Some(val) = read.blocking_read().await { + buf[pos] = val; + pos += 1; + rem -= 1; + } else { + break; + } + } + + Ok(pos) + }? + } + + pub fn write(&self, buf: &[u8]) -> Result { + let PipeEnd::Write(write) = self else { + return Err(Error::InvalidOperation); + }; + + block! { + let mut pos = 0; + let mut rem = buf.len(); + + while rem != 0 { + write.blocking_write(buf[pos]).await?; + pos += 1; + rem -= 1; + } + + Ok(pos) + }? + } +} + +impl Drop for PipeEnd { + fn drop(&mut self) { + match self { + Self::Read(read) => read.shutdown(), + Self::Write(write) => write.shutdown(), + } + } +} diff --git a/src/proc/io.rs b/src/proc/io.rs index 160e56a9..cc156865 100644 --- a/src/proc/io.rs +++ b/src/proc/io.rs @@ -62,12 +62,20 @@ impl ProcessIo { /// Removes and closes a [FileRef] from the struct pub fn close_file(&mut self, fd: RawFd) -> Result<(), Error> { // Do nothing, file will be dropped and closed - self.files.remove(&fd).ok_or(Error::InvalidFile)?; - Ok(()) + if self.files.remove(&fd).is_some() { + Ok(()) + } else { + Err(Error::InvalidFile) + } } /// Removes all [FileRef]s from the struct which do not pass the `predicate` check pub fn retain bool>(&mut self, predicate: F) { self.files.retain(predicate); } + + /// Handles process exit by closing all of its files + pub fn handle_exit(&mut self) { + self.files.clear(); + } } diff --git a/src/syscall/mod.rs b/src/syscall/mod.rs index a881a1b5..98f25d35 100644 --- a/src/syscall/mod.rs +++ b/src/syscall/mod.rs @@ -9,7 +9,7 @@ use abi::{ }; use alloc::sync::Arc; use kernel_util::{block, runtime, sync::IrqSafeSpinlockGuard}; -use vfs::{IoContext, NodeRef, Read, Seek, Write}; +use vfs::{File, IoContext, NodeRef, Read, Seek, Write}; use yggdrasil_abi::{error::SyscallResult, io::MountOptions}; use crate::{ @@ -188,7 +188,17 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result let fd = RawFd(args[0] as u32); run_with_io(process, |mut io| { - io.close_file(fd)?; + let res = io.close_file(fd); + + match res { + Err(Error::InvalidFile) => { + warnln!("Double close of fd {:?} in process {}", fd, process.id()); + } + _ => (), + } + + res?; + Ok(0) }) } @@ -262,6 +272,24 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result SyscallFunction::RemoveDirectory => { todo!() } + SyscallFunction::CreatePipe => { + let ends: &mut [MaybeUninit; 2] = arg_user_mut(args[0] as usize)?; + + run_with_io(process, |mut io| { + let (read, write) = File::new_pipe_pair(256); + + let read_fd = io.place_file(read)?; + let write_fd = io.place_file(write)?; + + infoln!("Read end: {:?}", read_fd); + infoln!("Write end: {:?}", write_fd); + + ends[0].write(read_fd); + ends[1].write(write_fd); + + Ok(0) + }) + } // Process management SyscallFunction::SpawnProcess => { let options = arg_user_ref::(args[0] as usize)?; diff --git a/src/task/process.rs b/src/task/process.rs index 261e02e8..446c4284 100644 --- a/src/task/process.rs +++ b/src/task/process.rs @@ -346,7 +346,7 @@ impl Process { inner.state = ProcessState::Terminated(code); // XXX - // self.io.lock().handle_exit(); + self.io.lock().handle_exit(); drop(inner);