From 8c4bdcbe6418cf61c2dda847f469bd19cf2a526b Mon Sep 17 00:00:00 2001 From: Mark Poliakov Date: Fri, 28 Feb 2025 12:40:14 +0200 Subject: [PATCH] pty: make pty buffer blocking --- kernel/libk/libk-util/src/ring.rs | 89 +++++++++++++++++++++++++++++++ kernel/libk/src/vfs/pty.rs | 24 +++++---- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/kernel/libk/libk-util/src/ring.rs b/kernel/libk/libk-util/src/ring.rs index 9bb02700..98425a30 100644 --- a/kernel/libk/libk-util/src/ring.rs +++ b/kernel/libk/libk-util/src/ring.rs @@ -29,6 +29,12 @@ pub struct LossyRingQueue { read_notify: QueueWaker, } +pub struct BlockingRingQueue { + ring: IrqSafeSpinlock>, + write_notify: QueueWaker, + read_notify: QueueWaker, +} + impl RingBuffer { /// Constructs an empty [RingBuffer]. /// @@ -231,3 +237,86 @@ impl LossyRingQueue { self.read_notify.wake_all(); } } + +impl BlockingRingQueue { + pub const fn with_capacity(capacity: usize) -> Self { + Self { + ring: IrqSafeSpinlock::new(RingBuffer::with_capacity(capacity)), + write_notify: QueueWaker::new(), + read_notify: QueueWaker::new(), + } + } + + pub fn try_with_capacity(capacity: usize) -> Result { + Ok(Self { + ring: IrqSafeSpinlock::new(RingBuffer::try_with_capacity(capacity)?), + write_notify: QueueWaker::new(), + read_notify: QueueWaker::new(), + }) + } + + pub async fn write(&self, data: T) { + poll_fn(|cx| { + if self.try_write(data) { + self.notify_readers(); + self.write_notify.remove(cx.waker()); + Poll::Ready(()) + } else { + self.write_notify.register(cx.waker()); + Poll::Pending + } + }) + .await + } + + pub async fn write_all(&self, data: &[T]) { + for &ch in data { + self.write(ch).await; + } + } + + pub fn try_write(&self, data: T) -> bool { + let mut lock = self.ring.lock(); + if !lock.is_writable() { + return false; + } + lock.write(data); + true + } + + pub async fn read_lock(&self) -> IrqSafeSpinlockGuard> { + poll_fn(|cx| self.poll_read_lock(cx)).await + } + + #[inline] + pub fn poll_read_lock( + &self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(lock) = self.try_read_lock() { + self.read_notify.remove(cx.waker()); + Poll::Ready(lock) + } else { + self.read_notify.register(cx.waker()); + Poll::Pending + } + } + + #[inline] + pub fn try_read_lock(&self) -> Option>> { + let lock = self.ring.lock(); + lock.is_readable().then_some(lock) + } + + pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<()> { + self.poll_read_lock(cx).map(|_| ()) + } + + pub fn notify_readers(&self) { + self.read_notify.wake_all(); + } + + pub fn notify_writers(&self) { + self.write_notify.wake_all(); + } +} diff --git a/kernel/libk/src/vfs/pty.rs b/kernel/libk/src/vfs/pty.rs index ae87a30c..40e7b320 100644 --- a/kernel/libk/src/vfs/pty.rs +++ b/kernel/libk/src/vfs/pty.rs @@ -8,7 +8,7 @@ use core::{ use alloc::{boxed::Box, sync::Arc}; use async_trait::async_trait; -use libk_util::{ring::LossyRingQueue, sync::spin_rwlock::IrqSafeRwLock}; +use libk_util::{ring::BlockingRingQueue, sync::spin_rwlock::IrqSafeRwLock}; use yggdrasil_abi::{ error::Error, io::{TerminalOptions, TerminalSize}, @@ -22,24 +22,23 @@ use super::{ const CAPACITY: usize = 32768; struct PtyOutput { - ring: LossyRingQueue, + ring: BlockingRingQueue, shutdown: AtomicBool, size: IrqSafeRwLock, } impl TerminalOutput for PtyOutput { fn write(&self, byte: u8) -> Result<(), Error> { - self.ring.write(byte); - Ok(()) + block!(self.ring.write(byte).await) } fn write_multiple(&self, bytes: &[u8]) -> Result { - self.ring.write_multiple(bytes); + block!(self.ring.write_all(bytes).await)?; Ok(bytes.len()) } fn notify_readers(&self) { - self.ring.notify_all(); + self.ring.notify_readers(); } fn size(&self) -> TerminalSize { @@ -56,7 +55,7 @@ impl TerminalOutput for PtyOutput { impl PtyOutput { pub fn with_capacity(size: TerminalSize, capacity: usize) -> Result { Ok(Self { - ring: LossyRingQueue::try_with_capacity(capacity)?, + ring: BlockingRingQueue::try_with_capacity(capacity)?, shutdown: AtomicBool::new(false), size: IrqSafeRwLock::new(size), }) @@ -76,7 +75,9 @@ impl PtyOutput { } let mut lock = self.ring.read_lock().await; - Ok(terminal::read_all(&mut lock, buffer, None)) + let len = terminal::read_all(&mut lock, buffer, None); + self.ring.notify_writers(); + Ok(len) } pub fn read_nonblocking(&self, buffer: &mut [u8]) -> Result { @@ -85,7 +86,9 @@ impl PtyOutput { } let mut lock = self.ring.try_read_lock().ok_or(Error::WouldBlock)?; - Ok(terminal::read_all(&mut lock, buffer, None)) + let len = terminal::read_all(&mut lock, buffer, None); + self.ring.notify_writers(); + Ok(len) } } @@ -158,7 +161,8 @@ impl Drop for PseudoTerminalSlave { fn drop(&mut self) { let output = self.0.output(); output.shutdown.store(true, Ordering::Release); - output.ring.notify_all(); + output.ring.notify_writers(); + output.ring.notify_readers(); } }