227 lines
5.7 KiB
Rust

use core::{
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll},
};
use alloc::{sync::Arc, vec, vec::Vec};
use futures_util::{task::AtomicWaker, Future};
use kernel_util::{block, sync::IrqSafeSpinlock};
use yggdrasil_abi::error::Error;
struct PipeInner {
data: Vec<u8>,
capacity: usize,
rd: usize,
wr: usize,
}
pub struct Pipe {
inner: IrqSafeSpinlock<PipeInner>,
shutdown: AtomicBool,
read_notify: AtomicWaker,
write_notify: AtomicWaker,
}
pub enum PipeEnd {
Read(Arc<Pipe>),
Write(Arc<Pipe>),
}
impl PipeInner {
pub fn new(capacity: usize) -> Self {
Self {
data: vec![0; capacity],
capacity,
rd: 0,
wr: 0,
}
}
pub fn can_write(&self) -> bool {
(self.wr + 1) % self.capacity != self.rd
}
pub fn can_read(&self) -> bool {
self.rd != self.wr
}
pub unsafe fn write(&mut self, val: u8) {
self.data[self.wr] = val;
self.wr = (self.wr + 1) % self.capacity;
}
pub unsafe fn read(&mut self) -> u8 {
let val = self.data[self.rd];
self.rd = (self.rd + 1) % self.capacity;
val
}
fn try_write(&mut self, val: u8) -> bool {
if self.can_write() {
unsafe {
self.write(val);
}
true
} else {
false
}
}
fn try_read(&mut self) -> Option<u8> {
if self.can_read() {
Some(unsafe { self.read() })
} else {
None
}
}
}
impl Pipe {
pub fn new(capacity: usize) -> Self {
Self {
inner: IrqSafeSpinlock::new(PipeInner::new(capacity)),
shutdown: AtomicBool::new(false),
read_notify: AtomicWaker::new(),
write_notify: AtomicWaker::new(),
}
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
self.read_notify.wake();
self.write_notify.wake();
}
pub fn blocking_write(&self, val: u8) -> impl Future<Output = Result<(), Error>> + '_ {
struct F<'a> {
pipe: &'a Pipe,
val: u8,
}
impl<'a> Future for F<'a> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut lock = self.pipe.inner.lock();
// Try fast path before acquiring write notify to avoid unnecessary contention
if self.pipe.shutdown.load(Ordering::Acquire) {
// TODO BrokenPipe
return Poll::Ready(Err(Error::ReadOnly));
} else if lock.try_write(self.val) {
self.pipe.read_notify.wake();
return Poll::Ready(Ok(()));
}
self.pipe.write_notify.register(cx.waker());
if self.pipe.shutdown.load(Ordering::Acquire) {
Poll::Ready(Err(Error::ReadOnly))
} else if lock.try_write(self.val) {
self.pipe.read_notify.wake();
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
F { pipe: self, val }
}
pub fn blocking_read(&self) -> impl Future<Output = Option<u8>> + '_ {
struct F<'a> {
pipe: &'a Pipe,
}
impl<'a> Future for F<'a> {
type Output = Option<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut lock = self.pipe.inner.lock();
if let Some(val) = lock.try_read() {
self.pipe.write_notify.wake();
return Poll::Ready(Some(val));
} else if self.pipe.shutdown.load(Ordering::Acquire) {
return Poll::Ready(None);
}
self.pipe.read_notify.register(cx.waker());
if let Some(val) = lock.try_read() {
Poll::Ready(Some(val))
} else if self.pipe.shutdown.load(Ordering::Acquire) {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
F { pipe: self }
}
}
impl PipeEnd {
pub fn new_pair(capacity: usize) -> (PipeEnd, PipeEnd) {
let pipe = Arc::new(Pipe::new(capacity));
let read = PipeEnd::Read(pipe.clone());
let write = PipeEnd::Write(pipe);
(read, write)
}
pub fn read(&self, buf: &mut [u8]) -> Result<usize, Error> {
let PipeEnd::Read(read) = self else {
return Err(Error::InvalidOperation);
};
block! {
let mut pos = 0;
let mut rem = buf.len();
while rem != 0 {
if let Some(val) = read.blocking_read().await {
buf[pos] = val;
pos += 1;
rem -= 1;
} else {
break;
}
}
Ok(pos)
}?
}
pub fn write(&self, buf: &[u8]) -> Result<usize, Error> {
let PipeEnd::Write(write) = self else {
return Err(Error::InvalidOperation);
};
block! {
let mut pos = 0;
let mut rem = buf.len();
while rem != 0 {
write.blocking_write(buf[pos]).await?;
pos += 1;
rem -= 1;
}
Ok(pos)
}?
}
}
impl Drop for PipeEnd {
fn drop(&mut self) {
match self {
Self::Read(read) => read.shutdown(),
Self::Write(write) => write.shutdown(),
}
}
}