From b131d2b52aad8e6c1585fb6ea95ff855c5bedeea Mon Sep 17 00:00:00 2001
From: Mark Poliakov <mark@alnyan.me>
Date: Thu, 8 Aug 2024 16:52:34 +0300
Subject: [PATCH] vfs: add basic non-blocking operations

---
 Cargo.lock                         |  2 +
 kernel/Cargo.toml                  |  1 +
 kernel/driver/input/Cargo.toml     |  2 +
 kernel/driver/input/src/lib.rs     | 24 ++++++--
 kernel/libk/src/vfs/device.rs      | 22 +++++--
 kernel/libk/src/vfs/file/device.rs | 35 ++++++++++--
 kernel/libk/src/vfs/file/mod.rs    | 92 +++++++++++++++++++++++-------
 kernel/libk/src/vfs/pty.rs         | 66 ++++++++++++---------
 kernel/libk/src/vfs/terminal.rs    | 39 +++++++++----
 9 files changed, 207 insertions(+), 76 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f5388e20..5ee35c20 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1997,6 +1997,7 @@ dependencies = [
 name = "ygg_driver_input"
 version = "0.1.0"
 dependencies = [
+ "async-trait",
  "libk",
  "libk-mm",
  "libk-util",
@@ -2152,6 +2153,7 @@ dependencies = [
  "acpi",
  "acpi-system",
  "aml",
+ "async-trait",
  "atomic_enum",
  "bitflags 2.6.0",
  "bytemuck",
diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml
index 811df81e..ce59130f 100644
--- a/kernel/Cargo.toml
+++ b/kernel/Cargo.toml
@@ -45,6 +45,7 @@ log = "0.4.22"
 futures-util = { version = "0.3.30", default-features = false, features = ["alloc", "async-await"] }
 crossbeam-queue = { version = "0.3.11", default-features = false, features = ["alloc"] }
 bytemuck = { version = "1.16.1", features = ["derive"] }
+async-trait = "0.1.81"
 
 [dependencies.elf]
 version = "0.7.2"
diff --git a/kernel/driver/input/Cargo.toml b/kernel/driver/input/Cargo.toml
index 17e06351..542c4966 100644
--- a/kernel/driver/input/Cargo.toml
+++ b/kernel/driver/input/Cargo.toml
@@ -8,3 +8,5 @@ yggdrasil-abi = { path = "../../../lib/abi" }
 libk-util = { path = "../../libk/libk-util" }
 libk-mm = { path = "../../libk/libk-mm" }
 libk = { path = "../../libk" }
+
+async-trait = "0.1.81"
diff --git a/kernel/driver/input/src/lib.rs b/kernel/driver/input/src/lib.rs
index 6ec5611b..c9c4447a 100644
--- a/kernel/driver/input/src/lib.rs
+++ b/kernel/driver/input/src/lib.rs
@@ -4,10 +4,9 @@ extern crate alloc;
 
 use core::task::{Context, Poll};
 
-use libk::{
-    block,
-    vfs::{CharDevice, FileReadiness},
-};
+use alloc::boxed::Box;
+use async_trait::async_trait;
+use libk::vfs::{CharDevice, FileReadiness};
 use libk_util::ring::LossyRingQueue;
 use yggdrasil_abi::{
     error::Error,
@@ -22,13 +21,26 @@ impl FileReadiness for KeyboardDevice {
     }
 }
 
+#[async_trait]
 impl CharDevice for KeyboardDevice {
-    fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
+    async fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
         if buf.len() < 4 {
             return Ok(0);
         }
 
-        let ev = block!(INPUT_QUEUE.read().await)?;
+        let ev = INPUT_QUEUE.read().await;
+
+        buf[..4].copy_from_slice(&ev.as_bytes());
+
+        Ok(4)
+    }
+
+    fn read_nonblocking(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
+        if buf.len() < 4 {
+            return Ok(0);
+        }
+
+        let ev = INPUT_QUEUE.try_read().ok_or(Error::WouldBlock)?;
 
         buf[..4].copy_from_slice(&ev.as_bytes());
 
diff --git a/kernel/libk/src/vfs/device.rs b/kernel/libk/src/vfs/device.rs
index b521009b..873c5206 100644
--- a/kernel/libk/src/vfs/device.rs
+++ b/kernel/libk/src/vfs/device.rs
@@ -1,3 +1,5 @@
+use alloc::boxed::Box;
+use async_trait::async_trait;
 use yggdrasil_abi::{error::Error, io::DeviceRequest};
 
 use crate::vfs::{
@@ -8,16 +10,28 @@ use crate::vfs::{
 
 /// Character device interface
 #[allow(unused)]
+#[async_trait]
 pub trait CharDevice: FileReadiness + Sync {
-    /// Reads data from the device
-    fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
+    /// Reads data from the device, blocking until operation has completed
+    async fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
         Err(Error::NotImplemented)
     }
-    /// Writes the data to the device
-    fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
+
+    /// Writes data to the devices, blocking until operation has completed
+    async fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
         Err(Error::NotImplemented)
     }
 
+    /// Reads data from the device, returns [Error::WouldBlock] if no data is available
+    fn read_nonblocking(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
+        Err(Error::WouldBlock)
+    }
+
+    /// Writes data to the device, returns [Error::WouldBlock] if data cannot yet be written
+    fn write_nonblocking(&'static self, buf: &[u8]) -> Result<usize, Error> {
+        Err(Error::WouldBlock)
+    }
+
     /// Returns `true` if the device can be read from
     fn is_readable(&self) -> bool {
         true
diff --git a/kernel/libk/src/vfs/file/device.rs b/kernel/libk/src/vfs/file/device.rs
index a993230f..e46b21a2 100644
--- a/kernel/libk/src/vfs/file/device.rs
+++ b/kernel/libk/src/vfs/file/device.rs
@@ -1,3 +1,5 @@
+use core::sync::atomic::{AtomicBool, Ordering};
+
 use libk_util::sync::IrqSafeSpinlock;
 use yggdrasil_abi::{error::Error, io::SeekFrom};
 
@@ -19,6 +21,7 @@ pub struct CharFile {
     pub(super) node: NodeRef,
     pub(super) read: bool,
     pub(super) write: bool,
+    pub(super) blocking: AtomicBool,
 }
 
 impl BlockFile {
@@ -73,18 +76,38 @@ impl BlockFile {
 
 impl CharFile {
     pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
-        if self.read {
-            self.device.0.read(buf)
+        if !self.read {
+            return Err(Error::InvalidOperation);
+        }
+
+        if self.blocking.load(Ordering::Acquire) {
+            block!(self.device.0.read(buf).await)?
         } else {
-            Err(Error::InvalidOperation)
+            self.device.0.read_nonblocking(buf)
         }
     }
+    //     if self.read {
+    //         self.device.0.read(buf)
+    //     } else {
+    //         Err(Error::InvalidOperation)
+    //     }
+    // }
 
     pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
-        if self.write {
-            self.device.0.write(buf)
+        if !self.write {
+            return Err(Error::InvalidOperation);
+        }
+
+        if self.blocking.load(Ordering::Acquire) {
+            block!(self.device.0.write(buf).await)?
         } else {
-            Err(Error::ReadOnly)
+            self.device.0.write_nonblocking(buf)
         }
     }
+    //     if self.write {
+    //         self.device.0.write(buf)
+    //     } else {
+    //         Err(Error::ReadOnly)
+    //     }
+    // }
 }
diff --git a/kernel/libk/src/vfs/file/mod.rs b/kernel/libk/src/vfs/file/mod.rs
index ee7833b1..4de50bce 100644
--- a/kernel/libk/src/vfs/file/mod.rs
+++ b/kernel/libk/src/vfs/file/mod.rs
@@ -2,13 +2,16 @@ use core::{
     any::Any,
     fmt,
     mem::MaybeUninit,
+    sync::atomic::{AtomicBool, Ordering},
     task::{Context, Poll},
 };
 
 use alloc::{
+    boxed::Box,
     collections::{btree_map::Entry, BTreeMap},
     sync::Arc,
 };
+use async_trait::async_trait;
 use libk_mm::{address::PhysicalAddress, table::MapAttributes, PageProvider};
 use libk_util::sync::IrqSafeSpinlock;
 use yggdrasil_abi::{
@@ -75,8 +78,23 @@ pub enum File {
     Timer(TimerFile),
     Channel(ChannelDescriptor),
     SharedMemory(Arc<SharedMemory>),
-    PtySlave(Arc<PseudoTerminalSlave>, NodeRef),
-    PtyMaster(Arc<PseudoTerminalMaster>, NodeRef),
+    PtySlave(TerminalHalfWrapper<PseudoTerminalSlave>),
+    PtyMaster(TerminalHalfWrapper<PseudoTerminalMaster>),
+}
+
+#[async_trait]
+pub trait TerminalHalf {
+    async fn read(&self, buf: &mut [u8]) -> Result<usize, Error>;
+    fn read_nonblocking(&self, buf: &mut [u8]) -> Result<usize, Error>;
+    fn write(&self, buf: &[u8]) -> Result<usize, Error>;
+    fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>>;
+    fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error>;
+}
+
+pub struct TerminalHalfWrapper<T: TerminalHalf> {
+    blocking: AtomicBool,
+    half: Arc<T>,
+    node: NodeRef,
 }
 
 /// Contains a per-process fd -> FileRef map
@@ -121,8 +139,16 @@ impl File {
         let slave = Arc::new(slave);
         let (master_node, slave_node) = Node::pseudo_terminal_nodes(master.clone(), slave.clone());
         Ok((
-            Arc::new(Self::PtyMaster(master, master_node)),
-            Arc::new(Self::PtySlave(slave, slave_node)),
+            Arc::new(Self::PtyMaster(TerminalHalfWrapper {
+                blocking: AtomicBool::new(true),
+                half: master,
+                node: master_node,
+            })),
+            Arc::new(Self::PtySlave(TerminalHalfWrapper {
+                blocking: AtomicBool::new(true),
+                half: slave,
+                node: slave_node,
+            })),
         ))
     }
 
@@ -217,6 +243,7 @@ impl File {
             node,
             read,
             write,
+            blocking: AtomicBool::new(true),
         })))
     }
 
@@ -227,12 +254,9 @@ impl File {
             Self::Block(_) => todo!(),
             Self::Regular(file) => Ok(Arc::new(Self::Regular(file.clone()))),
             Self::SharedMemory(shm) => Ok(Arc::new(Self::SharedMemory(shm.clone()))),
-            Self::PtySlave(pt, pt_node) => {
-                Ok(Arc::new(Self::PtySlave(pt.clone(), pt_node.clone())))
-            }
-            Self::PtyMaster(pt, pt_node) => {
-                Ok(Arc::new(Self::PtyMaster(pt.clone(), pt_node.clone())))
-            }
+
+            Self::PtySlave(half) => Ok(Arc::new(Self::PtySlave(half.clone()))),
+            Self::PtyMaster(half) => Ok(Arc::new(Self::PtyMaster(half.clone()))),
             _ => {
                 log::info!("Invalid file send(): {:?}", self);
                 Err(Error::InvalidOperation)
@@ -255,8 +279,8 @@ impl File {
             Self::Regular(file) => Some(&file.node),
             Self::Block(file) => Some(&file.node),
             Self::Char(file) => Some(&file.node),
-            Self::PtyMaster(_, node) => Some(node),
-            Self::PtySlave(_, node) => Some(node),
+            Self::PtyMaster(half) => Some(&half.node),
+            Self::PtySlave(half) => Some(&half.node),
             _ => None,
         }
     }
@@ -267,8 +291,8 @@ impl File {
             Self::Char(f) => f.device.0.poll_read(cx),
             Self::Channel(ch) => ch.poll_read(cx),
             Self::Poll(ch) => ch.poll_read(cx),
-            Self::PtyMaster(f, _) => f.poll_read(cx),
-            Self::PtySlave(f, _) => f.poll_read(cx),
+            Self::PtyMaster(half) => half.half.poll_read(cx),
+            Self::PtySlave(half) => half.half.poll_read(cx),
             Self::PacketSocket(sock) => sock.poll_read(cx),
             Self::StreamSocket(sock) => sock.poll_read(cx),
             Self::ListenerSocket(sock) => sock.poll_read(cx),
@@ -283,8 +307,8 @@ impl File {
         match self {
             Self::Char(f) => f.device.0.device_request(req),
             Self::Block(f) => f.device.0.device_request(req),
-            Self::PtySlave(f, _) => f.device_request(req),
-            Self::PtyMaster(f, _) => f.device_request(req),
+            Self::PtySlave(half) => half.half.device_request(req),
+            Self::PtyMaster(half) => half.half.device_request(req),
             _ => Err(Error::InvalidOperation),
         }
     }
@@ -392,8 +416,8 @@ impl Read for File {
             Self::Block(file) => file.read(buf),
             Self::Char(file) => file.read(buf),
             Self::AnonymousPipe(pipe) => pipe.read(buf),
-            Self::PtySlave(pt, _) => pt.read(buf),
-            Self::PtyMaster(pt, _) => pt.read(buf),
+            Self::PtySlave(half) => half.read(buf),
+            Self::PtyMaster(half) => half.read(buf),
             // TODO maybe allow reading trigger count?
             Self::Timer(_) => Err(Error::InvalidOperation),
             // TODO maybe allow reading FDs from poll channels as if they were regular streams?
@@ -417,8 +441,8 @@ impl Write for File {
             Self::Block(file) => file.write(buf),
             Self::Char(file) => file.write(buf),
             Self::AnonymousPipe(pipe) => pipe.write(buf),
-            Self::PtySlave(pt, _) => pt.write(buf),
-            Self::PtyMaster(pt, _) => pt.write(buf),
+            Self::PtySlave(half) => half.write(buf),
+            Self::PtyMaster(half) => half.write(buf),
             Self::Timer(timer) => timer.write(buf),
             // TODO maybe allow adding FDs to poll channels this way
             Self::Poll(_) => Err(Error::InvalidOperation),
@@ -479,8 +503,8 @@ impl fmt::Debug for File {
             Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
             Self::Channel(_) => f.debug_struct("Channel").finish_non_exhaustive(),
             Self::SharedMemory(_) => f.debug_struct("SharedMemory").finish_non_exhaustive(),
-            Self::PtySlave(_, _) => f.debug_struct("PtySlave").finish_non_exhaustive(),
-            Self::PtyMaster(_, _) => f.debug_struct("PtyMaster").finish_non_exhaustive(),
+            Self::PtySlave(_) => f.debug_struct("PtySlave").finish_non_exhaustive(),
+            Self::PtyMaster(_) => f.debug_struct("PtyMaster").finish_non_exhaustive(),
             Self::PacketSocket(sock) => f
                 .debug_struct("PacketSocket")
                 .field("local", &sock.local_address())
@@ -500,6 +524,30 @@ impl fmt::Debug for File {
     }
 }
 
+impl<T: TerminalHalf> TerminalHalfWrapper<T> {
+    pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
+        if self.blocking.load(Ordering::Acquire) {
+            block!(self.half.read(buf).await)?
+        } else {
+            self.half.read_nonblocking(buf)
+        }
+    }
+
+    pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
+        self.half.write(buf)
+    }
+}
+
+impl<T: TerminalHalf> Clone for TerminalHalfWrapper<T> {
+    fn clone(&self) -> Self {
+        Self {
+            half: self.half.clone(),
+            node: self.node.clone(),
+            blocking: AtomicBool::new(self.blocking.load(Ordering::Acquire)),
+        }
+    }
+}
+
 impl FileSet {
     /// Creates an empty [FileSet]
     pub fn new() -> Self {
diff --git a/kernel/libk/src/vfs/pty.rs b/kernel/libk/src/vfs/pty.rs
index 9a684382..b7703fd7 100644
--- a/kernel/libk/src/vfs/pty.rs
+++ b/kernel/libk/src/vfs/pty.rs
@@ -6,14 +6,18 @@ use core::{
     task::{Context, Poll},
 };
 
-use alloc::sync::Arc;
+use alloc::{boxed::Box, sync::Arc};
+use async_trait::async_trait;
 use libk_util::{ring::LossyRingQueue, sync::spin_rwlock::IrqSafeRwLock};
 use yggdrasil_abi::{
     error::Error,
     io::{DeviceRequest, TerminalOptions, TerminalSize},
 };
 
-use super::terminal::{self, Terminal, TerminalInput, TerminalOutput};
+use super::{
+    file::TerminalHalf,
+    terminal::{self, Terminal, TerminalInput, TerminalOutput},
+};
 
 const CAPACITY: usize = 8192;
 
@@ -66,17 +70,22 @@ impl PtyOutput {
         }
     }
 
-    pub fn read_blocking(&self, buffer: &mut [u8]) -> Result<usize, Error> {
+    pub async fn read(&self, buffer: &mut [u8]) -> Result<usize, Error> {
         if self.shutdown.load(Ordering::Acquire) {
             return Ok(0);
         }
 
-        if let Some(mut lock) = self.ring.try_read_lock() {
-            let count = terminal::read_all(&mut lock, buffer, None);
-            Ok(count)
-        } else {
-            todo!()
+        let mut lock = self.ring.read_lock().await;
+        Ok(terminal::read_all(&mut lock, buffer, None))
+    }
+
+    pub fn read_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, Error> {
+        if self.shutdown.load(Ordering::Acquire) {
+            return Ok(0);
         }
+
+        let mut lock = self.ring.try_read_lock().ok_or(Error::WouldBlock)?;
+        Ok(terminal::read_all(&mut lock, buffer, None))
     }
 }
 
@@ -88,49 +97,52 @@ pub struct PseudoTerminalSlave(Arc<Terminal<PtyOutput>>);
 #[derive(Clone)]
 pub struct PseudoTerminalMaster(Arc<Terminal<PtyOutput>>);
 
-impl PseudoTerminalSlave {
-    /// Reads from the master-to-slave half of the PTY
-    pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
-        self.0.read_from_input(buf)
+#[async_trait]
+impl TerminalHalf for PseudoTerminalSlave {
+    async fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
+        self.0.read_from_input(buf).await
     }
 
-    /// Writes to the slave-to-master half of the PTY
-    pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
+    fn read_nonblocking(&self, buf: &mut [u8]) -> Result<usize, Error> {
+        self.0.read_from_input_nonblocking(buf)
+    }
+
+    fn write(&self, buf: &[u8]) -> Result<usize, Error> {
         self.0.write_to_output(buf)
     }
 
-    /// Polls PTY read readiness
-    pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
+    fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
         self.0.input().poll_read(cx).map(Ok)
     }
 
-    /// Performs a device-specific request to the PTY
-    pub fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
+    fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
         self.0.handle_device_request(req)
     }
 }
 
-impl PseudoTerminalMaster {
+#[async_trait]
+impl TerminalHalf for PseudoTerminalMaster {
     /// Reads from the slave-to-master half of the PTY
-    pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
-        self.0.output().read_blocking(buf)
+    async fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
+        self.0.output().read(buf).await
     }
 
-    /// Writes to the master-to-slave half of the PTY
-    pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
+    fn read_nonblocking(&self, buf: &mut [u8]) -> Result<usize, Error> {
+        self.0.output().read_nonblocking(buf)
+    }
+
+    fn write(&self, buf: &[u8]) -> Result<usize, Error> {
         for &byte in buf {
             self.0.write_to_input(byte);
         }
         Ok(buf.len())
     }
 
-    /// Polls PTY read readiness
-    pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
+    fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
         self.0.output().poll_read(cx).map(Ok)
     }
 
-    /// Performs a device-specific request to the PTY
-    pub fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
+    fn device_request(&self, req: &mut DeviceRequest) -> Result<(), Error> {
         self.0.handle_device_request(req)
     }
 }
diff --git a/kernel/libk/src/vfs/terminal.rs b/kernel/libk/src/vfs/terminal.rs
index 607caac6..1c1de468 100644
--- a/kernel/libk/src/vfs/terminal.rs
+++ b/kernel/libk/src/vfs/terminal.rs
@@ -4,6 +4,7 @@ use core::{
 };
 
 use alloc::boxed::Box;
+use async_trait::async_trait;
 use libk_util::{
     ring::{LossyRingQueue, RingBuffer},
     sync::{spin_rwlock::IrqSafeRwLock, IrqSafeSpinlock},
@@ -84,12 +85,22 @@ impl<O: TerminalOutput> Terminal<O> {
         *self.config.read()
     }
 
-    pub fn read_from_input(&self, buffer: &mut [u8]) -> Result<usize, Error> {
+    #[inline]
+    pub async fn read_from_input(&self, buffer: &mut [u8]) -> Result<usize, Error> {
         let eof = {
             let config = self.config.read();
             config.is_canonical().then_some(config.chars.eof)
         };
-        self.input.read_blocking(buffer, eof)
+        self.input.read(buffer, eof).await
+    }
+
+    #[inline]
+    pub fn read_from_input_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, Error> {
+        let eof = {
+            let config = self.config.read();
+            config.is_canonical().then_some(config.chars.eof)
+        };
+        self.input.read_nonblocking(buffer, eof)
     }
 
     pub fn putc_to_output(&self, byte: u8) -> Result<(), Error> {
@@ -205,12 +216,9 @@ impl TerminalInput {
         Ok(read_all(&mut lock, buffer, eof))
     }
 
-    pub fn read_blocking(&self, buffer: &mut [u8], eof: Option<u8>) -> Result<usize, Error> {
-        if let Some(mut lock) = self.ready_ring.try_read_lock() {
-            Ok(read_all(&mut lock, buffer, eof))
-        } else {
-            block!(self.read(buffer, eof).await)?
-        }
+    pub fn read_nonblocking(&self, buffer: &mut [u8], eof: Option<u8>) -> Result<usize, Error> {
+        let mut lock = self.ready_ring.try_read_lock().ok_or(Error::WouldBlock)?;
+        Ok(read_all(&mut lock, buffer, eof))
     }
 
     pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<()> {
@@ -261,13 +269,22 @@ impl InputBuffer {
     }
 }
 
+#[async_trait]
 impl<O: TerminalOutput> CharDevice for Terminal<O> {
-    fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
+    async fn write(&'static self, buf: &[u8]) -> Result<usize, Error> {
         self.write_to_output(buf)
     }
 
-    fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
-        self.read_from_input(buf)
+    fn write_nonblocking(&'static self, buf: &[u8]) -> Result<usize, Error> {
+        self.write_to_output(buf)
+    }
+
+    async fn read(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
+        self.read_from_input(buf).await
+    }
+
+    fn read_nonblocking(&'static self, buf: &mut [u8]) -> Result<usize, Error> {
+        self.read_from_input_nonblocking(buf)
     }
 
     fn is_readable(&self) -> bool {