pty: make pty buffer blocking
This commit is contained in:
parent
99644d335d
commit
8c4bdcbe64
@ -29,6 +29,12 @@ pub struct LossyRingQueue<T: Copy> {
|
||||
read_notify: QueueWaker,
|
||||
}
|
||||
|
||||
pub struct BlockingRingQueue<T: Copy> {
|
||||
ring: IrqSafeSpinlock<RingBuffer<T>>,
|
||||
write_notify: QueueWaker,
|
||||
read_notify: QueueWaker,
|
||||
}
|
||||
|
||||
impl<T: Copy> RingBuffer<T> {
|
||||
/// Constructs an empty [RingBuffer].
|
||||
///
|
||||
@ -231,3 +237,86 @@ impl<T: Copy> LossyRingQueue<T> {
|
||||
self.read_notify.wake_all();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Copy> BlockingRingQueue<T> {
|
||||
pub const fn with_capacity(capacity: usize) -> Self {
|
||||
Self {
|
||||
ring: IrqSafeSpinlock::new(RingBuffer::with_capacity(capacity)),
|
||||
write_notify: QueueWaker::new(),
|
||||
read_notify: QueueWaker::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_with_capacity(capacity: usize) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
ring: IrqSafeSpinlock::new(RingBuffer::try_with_capacity(capacity)?),
|
||||
write_notify: QueueWaker::new(),
|
||||
read_notify: QueueWaker::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn write(&self, data: T) {
|
||||
poll_fn(|cx| {
|
||||
if self.try_write(data) {
|
||||
self.notify_readers();
|
||||
self.write_notify.remove(cx.waker());
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
self.write_notify.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn write_all(&self, data: &[T]) {
|
||||
for &ch in data {
|
||||
self.write(ch).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_write(&self, data: T) -> bool {
|
||||
let mut lock = self.ring.lock();
|
||||
if !lock.is_writable() {
|
||||
return false;
|
||||
}
|
||||
lock.write(data);
|
||||
true
|
||||
}
|
||||
|
||||
pub async fn read_lock(&self) -> IrqSafeSpinlockGuard<RingBuffer<T>> {
|
||||
poll_fn(|cx| self.poll_read_lock(cx)).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn poll_read_lock(
|
||||
&self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<IrqSafeSpinlockGuard<RingBuffer<T>>> {
|
||||
if let Some(lock) = self.try_read_lock() {
|
||||
self.read_notify.remove(cx.waker());
|
||||
Poll::Ready(lock)
|
||||
} else {
|
||||
self.read_notify.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn try_read_lock(&self) -> Option<IrqSafeSpinlockGuard<RingBuffer<T>>> {
|
||||
let lock = self.ring.lock();
|
||||
lock.is_readable().then_some(lock)
|
||||
}
|
||||
|
||||
pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
self.poll_read_lock(cx).map(|_| ())
|
||||
}
|
||||
|
||||
pub fn notify_readers(&self) {
|
||||
self.read_notify.wake_all();
|
||||
}
|
||||
|
||||
pub fn notify_writers(&self) {
|
||||
self.write_notify.wake_all();
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ use core::{
|
||||
|
||||
use alloc::{boxed::Box, sync::Arc};
|
||||
use async_trait::async_trait;
|
||||
use libk_util::{ring::LossyRingQueue, sync::spin_rwlock::IrqSafeRwLock};
|
||||
use libk_util::{ring::BlockingRingQueue, sync::spin_rwlock::IrqSafeRwLock};
|
||||
use yggdrasil_abi::{
|
||||
error::Error,
|
||||
io::{TerminalOptions, TerminalSize},
|
||||
@ -22,24 +22,23 @@ use super::{
|
||||
const CAPACITY: usize = 32768;
|
||||
|
||||
struct PtyOutput {
|
||||
ring: LossyRingQueue<u8>,
|
||||
ring: BlockingRingQueue<u8>,
|
||||
shutdown: AtomicBool,
|
||||
size: IrqSafeRwLock<TerminalSize>,
|
||||
}
|
||||
|
||||
impl TerminalOutput for PtyOutput {
|
||||
fn write(&self, byte: u8) -> Result<(), Error> {
|
||||
self.ring.write(byte);
|
||||
Ok(())
|
||||
block!(self.ring.write(byte).await)
|
||||
}
|
||||
|
||||
fn write_multiple(&self, bytes: &[u8]) -> Result<usize, Error> {
|
||||
self.ring.write_multiple(bytes);
|
||||
block!(self.ring.write_all(bytes).await)?;
|
||||
Ok(bytes.len())
|
||||
}
|
||||
|
||||
fn notify_readers(&self) {
|
||||
self.ring.notify_all();
|
||||
self.ring.notify_readers();
|
||||
}
|
||||
|
||||
fn size(&self) -> TerminalSize {
|
||||
@ -56,7 +55,7 @@ impl TerminalOutput for PtyOutput {
|
||||
impl PtyOutput {
|
||||
pub fn with_capacity(size: TerminalSize, capacity: usize) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
ring: LossyRingQueue::try_with_capacity(capacity)?,
|
||||
ring: BlockingRingQueue::try_with_capacity(capacity)?,
|
||||
shutdown: AtomicBool::new(false),
|
||||
size: IrqSafeRwLock::new(size),
|
||||
})
|
||||
@ -76,7 +75,9 @@ impl PtyOutput {
|
||||
}
|
||||
|
||||
let mut lock = self.ring.read_lock().await;
|
||||
Ok(terminal::read_all(&mut lock, buffer, None))
|
||||
let len = terminal::read_all(&mut lock, buffer, None);
|
||||
self.ring.notify_writers();
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
pub fn read_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, Error> {
|
||||
@ -85,7 +86,9 @@ impl PtyOutput {
|
||||
}
|
||||
|
||||
let mut lock = self.ring.try_read_lock().ok_or(Error::WouldBlock)?;
|
||||
Ok(terminal::read_all(&mut lock, buffer, None))
|
||||
let len = terminal::read_all(&mut lock, buffer, None);
|
||||
self.ring.notify_writers();
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,7 +161,8 @@ impl Drop for PseudoTerminalSlave {
|
||||
fn drop(&mut self) {
|
||||
let output = self.0.output();
|
||||
output.shutdown.store(true, Ordering::Release);
|
||||
output.ring.notify_all();
|
||||
output.ring.notify_writers();
|
||||
output.ring.notify_readers();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user