vfs: add basic non-blocking operations

This commit is contained in:
Mark Poliakov 2024-08-08 16:52:34 +03:00
parent 5c090f7a38
commit b131d2b52a
9 changed files with 207 additions and 76 deletions

2
Cargo.lock generated
View File

@ -1997,6 +1997,7 @@ dependencies = [
name = "ygg_driver_input"
version = "0.1.0"
dependencies = [
"async-trait",
"libk",
"libk-mm",
"libk-util",
@ -2152,6 +2153,7 @@ dependencies = [
"acpi",
"acpi-system",
"aml",
"async-trait",
"atomic_enum",
"bitflags 2.6.0",
"bytemuck",

View File

@ -45,6 +45,7 @@ log = "0.4.22"
futures-util = { version = "0.3.30", default-features = false, features = ["alloc", "async-await"] }
crossbeam-queue = { version = "0.3.11", default-features = false, features = ["alloc"] }
bytemuck = { version = "1.16.1", features = ["derive"] }
async-trait = "0.1.81"
[dependencies.elf]
version = "0.7.2"

View File

@ -8,3 +8,5 @@ yggdrasil-abi = { path = "../../../lib/abi" }
libk-util = { path = "../../libk/libk-util" }
libk-mm = { path = "../../libk/libk-mm" }
libk = { path = "../../libk" }
async-trait = "0.1.81"

View File

@ -4,10 +4,9 @@ extern crate alloc;
use core::task::{Context, Poll};
use libk::{
block,
vfs::{CharDevice, FileReadiness},
};
use alloc::boxed::Box;
use async_trait::async_trait;
use libk::vfs::{CharDevice, FileReadiness};
use libk_util::ring::LossyRingQueue;
use yggdrasil_abi::{
error::Error,
@ -22,13 +21,26 @@ impl FileReadiness for KeyboardDevice {
}
}
#[async_trait]
impl CharDevice for KeyboardDevice {
fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
async fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
if buf.len() < 4 {
return Ok(0);
}
let ev = block!(INPUT_QUEUE.read().await)?;
let ev = INPUT_QUEUE.read().await;
buf[..4].copy_from_slice(&ev.as_bytes());
Ok(4)
}
fn read_nonblocking(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
if buf.len() < 4 {
return Ok(0);
}
let ev = INPUT_QUEUE.try_read().ok_or(Error::WouldBlock)?;
buf[..4].copy_from_slice(&ev.as_bytes());

View File

@ -1,3 +1,5 @@
use alloc::boxed::Box;
use async_trait::async_trait;
use yggdrasil_abi::{error::Error, io::DeviceRequest};
use crate::vfs::{
@ -8,16 +10,28 @@ use crate::vfs::{
/// Character device interface
#[allow(unused)]
#[async_trait]
pub trait CharDevice: FileReadiness + Sync {
/// Reads data from the device
fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
/// Reads data from the device, blocking until operation has completed
async fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
Err(Error::NotImplemented)
}
/// Writes the data to the device
fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
/// Writes data to the devices, blocking until operation has completed
async fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
Err(Error::NotImplemented)
}
/// Reads data from the device, returns [Error::WouldBlock] if no data is available
fn read_nonblocking(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
Err(Error::WouldBlock)
}
/// Writes data to the device, returns [Error::WouldBlock] if data cannot yet be written
fn write_nonblocking(&'static self, buf: &[u8]) -> Result<usize, Error> {
Err(Error::WouldBlock)
}
/// Returns `true` if the device can be read from
fn is_readable(&self) -> bool {
true

View File

@ -1,3 +1,5 @@
use core::sync::atomic::{AtomicBool, Ordering};
use libk_util::sync::IrqSafeSpinlock;
use yggdrasil_abi::{error::Error, io::SeekFrom};
@ -19,6 +21,7 @@ pub struct CharFile {
pub(super) node: NodeRef,
pub(super) read: bool,
pub(super) write: bool,
pub(super) blocking: AtomicBool,
}
impl BlockFile {
@ -73,18 +76,38 @@ impl BlockFile {
impl CharFile {
pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
if self.read {
self.device.0.read(buf)
if !self.read {
return Err(Error::InvalidOperation);
}
if self.blocking.load(Ordering::Acquire) {
block!(self.device.0.read(buf).await)?
} else {
Err(Error::InvalidOperation)
self.device.0.read_nonblocking(buf)
}
}
// if self.read {
// self.device.0.read(buf)
// } else {
// Err(Error::InvalidOperation)
// }
// }
pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
if self.write {
self.device.0.write(buf)
if !self.write {
return Err(Error::InvalidOperation);
}
if self.blocking.load(Ordering::Acquire) {
block!(self.device.0.write(buf).await)?
} else {
Err(Error::ReadOnly)
self.device.0.write_nonblocking(buf)
}
}
// if self.write {
// self.device.0.write(buf)
// } else {
// Err(Error::ReadOnly)
// }
// }
}

View File

@ -2,13 +2,16 @@ use core::{
any::Any,
fmt,
mem::MaybeUninit,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll},
};
use alloc::{
boxed::Box,
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use async_trait::async_trait;
use libk_mm::{address::PhysicalAddress, table::MapAttributes, PageProvider};
use libk_util::sync::IrqSafeSpinlock;
use yggdrasil_abi::{
@ -75,8 +78,23 @@ pub enum File {
Timer(TimerFile),
Channel(ChannelDescriptor),
SharedMemory(Arc<SharedMemory>),
PtySlave(Arc<PseudoTerminalSlave>, NodeRef),
PtyMaster(Arc<PseudoTerminalMaster>, NodeRef),
PtySlave(TerminalHalfWrapper<PseudoTerminalSlave>),
PtyMaster(TerminalHalfWrapper<PseudoTerminalMaster>),
}
#[async_trait]
pub trait TerminalHalf {
async fn read(&self, buf: &mut [u8]) -> Result<usize, Error>;
fn read_nonblocking(&self, buf: &mut [u8]) -> Result<usize, Error>;
fn write(&self, buf: &[u8]) -> Result<usize, Error>;
fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>>;
fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error>;
}
pub struct TerminalHalfWrapper<T: TerminalHalf> {
blocking: AtomicBool,
half: Arc<T>,
node: NodeRef,
}
/// Contains a per-process fd -> FileRef map
@ -121,8 +139,16 @@ impl File {
let slave = Arc::new(slave);
let (master_node, slave_node) = Node::pseudo_terminal_nodes(master.clone(), slave.clone());
Ok((
Arc::new(Self::PtyMaster(master, master_node)),
Arc::new(Self::PtySlave(slave, slave_node)),
Arc::new(Self::PtyMaster(TerminalHalfWrapper {
blocking: AtomicBool::new(true),
half: master,
node: master_node,
})),
Arc::new(Self::PtySlave(TerminalHalfWrapper {
blocking: AtomicBool::new(true),
half: slave,
node: slave_node,
})),
))
}
@ -217,6 +243,7 @@ impl File {
node,
read,
write,
blocking: AtomicBool::new(true),
})))
}
@ -227,12 +254,9 @@ impl File {
Self::Block(_) => todo!(),
Self::Regular(file) => Ok(Arc::new(Self::Regular(file.clone()))),
Self::SharedMemory(shm) => Ok(Arc::new(Self::SharedMemory(shm.clone()))),
Self::PtySlave(pt, pt_node) => {
Ok(Arc::new(Self::PtySlave(pt.clone(), pt_node.clone())))
}
Self::PtyMaster(pt, pt_node) => {
Ok(Arc::new(Self::PtyMaster(pt.clone(), pt_node.clone())))
}
Self::PtySlave(half) => Ok(Arc::new(Self::PtySlave(half.clone()))),
Self::PtyMaster(half) => Ok(Arc::new(Self::PtyMaster(half.clone()))),
_ => {
log::info!("Invalid file send(): {:?}", self);
Err(Error::InvalidOperation)
@ -255,8 +279,8 @@ impl File {
Self::Regular(file) => Some(&file.node),
Self::Block(file) => Some(&file.node),
Self::Char(file) => Some(&file.node),
Self::PtyMaster(_, node) => Some(node),
Self::PtySlave(_, node) => Some(node),
Self::PtyMaster(half) => Some(&half.node),
Self::PtySlave(half) => Some(&half.node),
_ => None,
}
}
@ -267,8 +291,8 @@ impl File {
Self::Char(f) => f.device.0.poll_read(cx),
Self::Channel(ch) => ch.poll_read(cx),
Self::Poll(ch) => ch.poll_read(cx),
Self::PtyMaster(f, _) => f.poll_read(cx),
Self::PtySlave(f, _) => f.poll_read(cx),
Self::PtyMaster(half) => half.half.poll_read(cx),
Self::PtySlave(half) => half.half.poll_read(cx),
Self::PacketSocket(sock) => sock.poll_read(cx),
Self::StreamSocket(sock) => sock.poll_read(cx),
Self::ListenerSocket(sock) => sock.poll_read(cx),
@ -283,8 +307,8 @@ impl File {
match self {
Self::Char(f) => f.device.0.device_request(req),
Self::Block(f) => f.device.0.device_request(req),
Self::PtySlave(f, _) => f.device_request(req),
Self::PtyMaster(f, _) => f.device_request(req),
Self::PtySlave(half) => half.half.device_request(req),
Self::PtyMaster(half) => half.half.device_request(req),
_ => Err(Error::InvalidOperation),
}
}
@ -392,8 +416,8 @@ impl Read for File {
Self::Block(file) => file.read(buf),
Self::Char(file) => file.read(buf),
Self::AnonymousPipe(pipe) => pipe.read(buf),
Self::PtySlave(pt, _) => pt.read(buf),
Self::PtyMaster(pt, _) => pt.read(buf),
Self::PtySlave(half) => half.read(buf),
Self::PtyMaster(half) => half.read(buf),
// TODO maybe allow reading trigger count?
Self::Timer(_) => Err(Error::InvalidOperation),
// TODO maybe allow reading FDs from poll channels as if they were regular streams?
@ -417,8 +441,8 @@ impl Write for File {
Self::Block(file) => file.write(buf),
Self::Char(file) => file.write(buf),
Self::AnonymousPipe(pipe) => pipe.write(buf),
Self::PtySlave(pt, _) => pt.write(buf),
Self::PtyMaster(pt, _) => pt.write(buf),
Self::PtySlave(half) => half.write(buf),
Self::PtyMaster(half) => half.write(buf),
Self::Timer(timer) => timer.write(buf),
// TODO maybe allow adding FDs to poll channels this way
Self::Poll(_) => Err(Error::InvalidOperation),
@ -479,8 +503,8 @@ impl fmt::Debug for File {
Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
Self::Channel(_) => f.debug_struct("Channel").finish_non_exhaustive(),
Self::SharedMemory(_) => f.debug_struct("SharedMemory").finish_non_exhaustive(),
Self::PtySlave(_, _) => f.debug_struct("PtySlave").finish_non_exhaustive(),
Self::PtyMaster(_, _) => f.debug_struct("PtyMaster").finish_non_exhaustive(),
Self::PtySlave(_) => f.debug_struct("PtySlave").finish_non_exhaustive(),
Self::PtyMaster(_) => f.debug_struct("PtyMaster").finish_non_exhaustive(),
Self::PacketSocket(sock) => f
.debug_struct("PacketSocket")
.field("local", &sock.local_address())
@ -500,6 +524,30 @@ impl fmt::Debug for File {
}
}
impl<T: TerminalHalf> TerminalHalfWrapper<T> {
pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
if self.blocking.load(Ordering::Acquire) {
block!(self.half.read(buf).await)?
} else {
self.half.read_nonblocking(buf)
}
}
pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
self.half.write(buf)
}
}
impl<T: TerminalHalf> Clone for TerminalHalfWrapper<T> {
fn clone(&self) -> Self {
Self {
half: self.half.clone(),
node: self.node.clone(),
blocking: AtomicBool::new(self.blocking.load(Ordering::Acquire)),
}
}
}
impl FileSet {
/// Creates an empty [FileSet]
pub fn new() -> Self {

View File

@ -6,14 +6,18 @@ use core::{
task::{Context, Poll},
};
use alloc::sync::Arc;
use alloc::{boxed::Box, sync::Arc};
use async_trait::async_trait;
use libk_util::{ring::LossyRingQueue, sync::spin_rwlock::IrqSafeRwLock};
use yggdrasil_abi::{
error::Error,
io::{DeviceRequest, TerminalOptions, TerminalSize},
};
use super::terminal::{self, Terminal, TerminalInput, TerminalOutput};
use super::{
file::TerminalHalf,
terminal::{self, Terminal, TerminalInput, TerminalOutput},
};
const CAPACITY: usize = 8192;
@ -66,17 +70,22 @@ impl PtyOutput {
}
}
pub fn read_blocking(&self, buffer: &mut [u8]) -> Result<usize, Error> {
pub async fn read(&self, buffer: &mut [u8]) -> Result<usize, Error> {
if self.shutdown.load(Ordering::Acquire) {
return Ok(0);
}
if let Some(mut lock) = self.ring.try_read_lock() {
let count = terminal::read_all(&mut lock, buffer, None);
Ok(count)
} else {
todo!()
let mut lock = self.ring.read_lock().await;
Ok(terminal::read_all(&mut lock, buffer, None))
}
pub fn read_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, Error> {
if self.shutdown.load(Ordering::Acquire) {
return Ok(0);
}
let mut lock = self.ring.try_read_lock().ok_or(Error::WouldBlock)?;
Ok(terminal::read_all(&mut lock, buffer, None))
}
}
@ -88,49 +97,52 @@ pub struct PseudoTerminalSlave(Arc<Terminal<PtyOutput>>);
#[derive(Clone)]
pub struct PseudoTerminalMaster(Arc<Terminal<PtyOutput>>);
impl PseudoTerminalSlave {
/// Reads from the master-to-slave half of the PTY
pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.read_from_input(buf)
#[async_trait]
impl TerminalHalf for PseudoTerminalSlave {
async fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.read_from_input(buf).await
}
/// Writes to the slave-to-master half of the PTY
pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
fn read_nonblocking(&self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.read_from_input_nonblocking(buf)
}
fn write(&self, buf: &[u8]) -> Result<usize, Error> {
self.0.write_to_output(buf)
}
/// Polls PTY read readiness
pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
self.0.input().poll_read(cx).map(Ok)
}
/// Performs a device-specific request to the PTY
pub fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
self.0.handle_device_request(req)
}
}
impl PseudoTerminalMaster {
#[async_trait]
impl TerminalHalf for PseudoTerminalMaster {
/// Reads from the slave-to-master half of the PTY
pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.output().read_blocking(buf)
async fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.output().read(buf).await
}
/// Writes to the master-to-slave half of the PTY
pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
fn read_nonblocking(&self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.output().read_nonblocking(buf)
}
fn write(&self, buf: &[u8]) -> Result<usize, Error> {
for &byte in buf {
self.0.write_to_input(byte);
}
Ok(buf.len())
}
/// Polls PTY read readiness
pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
self.0.output().poll_read(cx).map(Ok)
}
/// Performs a device-specific request to the PTY
pub fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
self.0.handle_device_request(req)
}
}

View File

@ -4,6 +4,7 @@ use core::{
};
use alloc::boxed::Box;
use async_trait::async_trait;
use libk_util::{
ring::{LossyRingQueue, RingBuffer},
sync::{spin_rwlock::IrqSafeRwLock, IrqSafeSpinlock},
@ -84,12 +85,22 @@ impl<O: TerminalOutput> Terminal<O> {
*self.config.read()
}
pub fn read_from_input(&self, buffer: &mut [u8]) -> Result<usize, Error> {
#[inline]
pub async fn read_from_input(&self, buffer: &mut [u8]) -> Result<usize, Error> {
let eof = {
let config = self.config.read();
config.is_canonical().then_some(config.chars.eof)
};
self.input.read_blocking(buffer, eof)
self.input.read(buffer, eof).await
}
#[inline]
pub fn read_from_input_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, Error> {
let eof = {
let config = self.config.read();
config.is_canonical().then_some(config.chars.eof)
};
self.input.read_nonblocking(buffer, eof)
}
pub fn putc_to_output(&self, byte: u8) -> Result<(), Error> {
@ -205,12 +216,9 @@ impl TerminalInput {
Ok(read_all(&mut lock, buffer, eof))
}
pub fn read_blocking(&self, buffer: &mut [u8], eof: Option<u8>) -> Result<usize, Error> {
if let Some(mut lock) = self.ready_ring.try_read_lock() {
Ok(read_all(&mut lock, buffer, eof))
} else {
block!(self.read(buffer, eof).await)?
}
pub fn read_nonblocking(&self, buffer: &mut [u8], eof: Option<u8>) -> Result<usize, Error> {
let mut lock = self.ready_ring.try_read_lock().ok_or(Error::WouldBlock)?;
Ok(read_all(&mut lock, buffer, eof))
}
pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<()> {
@ -261,13 +269,22 @@ impl InputBuffer {
}
}
#[async_trait]
impl<O: TerminalOutput> CharDevice for Terminal<O> {
fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
async fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
self.write_to_output(buf)
}
fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
self.read_from_input(buf)
fn write_nonblocking(&'static self, buf: &[u8]) -> Result<usize, Error> {
self.write_to_output(buf)
}
async fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
self.read_from_input(buf).await
}
fn read_nonblocking(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
self.read_from_input_nonblocking(buf)
}
fn is_readable(&self) -> bool {