yggdrasil/src/util/ring.rs

118 lines
2.9 KiB
Rust

use core::{
pin::Pin,
task::{Context, Poll},
};
use abi::error::Error;
use alloc::sync::Arc;
use futures_util::Future;
use kernel_util::sync::IrqSafeSpinlock;
use crate::task::runtime::QueueWaker;
pub struct RingBuffer<T, const N: usize> {
rd: usize,
wr: usize,
data: [T; N],
}
pub struct AsyncRing<T, const N: usize> {
inner: Arc<IrqSafeSpinlock<RingBuffer<T, N>>>,
read_waker: Arc<QueueWaker>,
}
impl<T: Copy, const N: usize> RingBuffer<T, N> {
pub const fn new(value: T) -> Self {
Self {
rd: 0,
wr: 0,
data: [value; N],
}
}
#[inline]
const fn is_readable(&self) -> bool {
self.is_readable_at(self.rd)
}
const fn is_readable_at(&self, at: usize) -> bool {
if at <= self.wr {
(self.wr - at) > 0
} else {
(self.wr + (N - at)) > 0
}
}
#[inline]
unsafe fn read_unchecked(&mut self) -> T {
let res = self.data[self.rd];
self.rd = (self.rd + 1) % N;
res
}
pub unsafe fn read_all_static(&mut self, pos: usize, buffer: &mut [T]) -> usize {
let mut pos = (self.rd + pos) % N;
let mut off = 0;
while off < buffer.len() && self.is_readable_at(pos) {
buffer[off] = self.data[pos];
pos += 1;
off += 1;
}
off
}
#[inline]
pub unsafe fn write_unchecked(&mut self, ch: T) {
self.data[self.wr] = ch;
self.wr = (self.wr + 1) % N;
}
}
impl<T: Copy, const N: usize> AsyncRing<T, N> {
pub fn new(value: T) -> Self {
Self {
inner: Arc::new(IrqSafeSpinlock::new(RingBuffer::new(value))),
read_waker: Arc::new(QueueWaker::new()),
}
}
pub fn try_write(&self, item: T) -> Result<(), Error> {
let mut lock = self.inner.lock();
unsafe {
lock.write_unchecked(item);
}
drop(lock);
self.read_waker.wake_one();
Ok(())
}
pub fn read(&self) -> impl Future<Output = T> {
struct ReadFuture<T: Copy, const N: usize> {
inner: Arc<IrqSafeSpinlock<RingBuffer<T, N>>>,
read_waker: Arc<QueueWaker>,
}
impl<T: Copy, const N: usize> Future for ReadFuture<T, N> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.read_waker.register(cx.waker());
let mut inner = self.inner.lock();
if inner.is_readable() {
self.read_waker.remove(cx.waker());
Poll::Ready(unsafe { inner.read_unchecked() })
} else {
Poll::Pending
}
}
}
ReadFuture {
inner: self.inner.clone(),
read_waker: self.read_waker.clone(),
}
}
}