channel: send files over channels

This commit is contained in:
Mark Poliakov 2023-12-31 01:53:43 +02:00
parent 21ff2616f9
commit 293dcfea6a
8 changed files with 96 additions and 38 deletions

View File

@ -1,10 +1,17 @@
use core::task::{Context, Poll};
use alloc::{boxed::Box, format, sync::Arc};
use futures_util::{task::waker_ref, Future};
use futures_util::{
task::{waker_ref, ArcWake, WakerRef},
Future,
};
use yggdrasil_abi::error::Error;
use crate::thread::Thread;
use crate::{
api::__cpu_index,
cpu_index,
thread::{CurrentThread, Thread},
};
use super::{
task::{Task, Termination},
@ -48,6 +55,7 @@ pub fn run_to_completion<'a, T, F: Future<Output = T> + Send + 'a>(future: F) ->
loop {
let thread = Thread::current();
let waker = waker_ref(&thread);
let context = &mut Context::from_waker(&waker);

View File

@ -15,18 +15,29 @@ use kernel_util::{
block,
sync::{mutex::Mutex, IrqSafeSpinlock, LockMethod},
};
use yggdrasil_abi::{error::Error, io::MessageDestination};
use yggdrasil_abi::{
error::Error,
io::{MessageDestination, ReceivedMessageMetadata},
};
use crate::FileReadiness;
use crate::{FileReadiness, FileRef};
pub struct Channel {
last_id: AtomicU32,
subscriptions: Mutex<BTreeMap<u32, Arc<Subscription>>>,
}
/// Describes message payload
pub enum MessagePayload {
/// Payload contains a file
File(FileRef),
/// Payload contains byte data
Data(Box<[u8]>),
}
pub struct Message {
source: u32,
data: Box<[u8]>,
pub source: u32,
pub payload: MessagePayload,
}
pub struct Subscription {
@ -54,27 +65,22 @@ impl ChannelDescriptor {
Self { tx, rx, id }
}
pub fn receive_message(&self, buf: &mut [u8]) -> Result<(u32, usize), Error> {
pub fn receive_message(&self) -> Result<Arc<Message>, Error> {
let Some(rx) = self.rx.as_ref() else {
return Err(Error::InvalidOperation);
};
let message = rx.receive_message_inner()?;
let len = message.data.len();
if buf.len() < len {
return Err(Error::MissingData);
rx.receive_message_inner()
}
buf[..len].copy_from_slice(&message.data);
Ok((message.source, len))
}
pub fn send_message(&self, msg: &[u8], dst: MessageDestination) -> Result<(), Error> {
pub fn send_message(
&self,
payload: MessagePayload,
dst: MessageDestination,
) -> Result<(), Error> {
let message = Arc::new(Message {
source: self.id,
data: Box::from(msg),
payload,
});
let lock = self.tx.subscriptions.lock()?;

View File

@ -37,7 +37,7 @@ mod pipe;
mod regular;
/// Per-file optional instance data created when a regular file is opened
pub type InstanceData = Box<dyn Any + Send + Sync>;
pub type InstanceData = Arc<dyn Any + Send + Sync>;
/// Describes the starting position of the directory
pub enum DirectoryOpenPosition {
@ -159,6 +159,14 @@ impl File {
})))
}
/// Clones an open file for sending it to another process
pub fn send(&self) -> Result<Arc<Self>, Error> {
match self {
Self::Regular(file) => Ok(Arc::new(Self::Regular(file.clone()))),
_ => Err(Error::InvalidOperation),
}
}
/// Reads entries from the directory
pub fn read_dir(&self, entries: &mut [MaybeUninit<DirectoryEntry>]) -> Result<usize, Error> {
match self {

View File

@ -4,6 +4,7 @@ use yggdrasil_abi::{error::Error, io::SeekFrom};
use super::InstanceData;
use crate::node::NodeRef;
#[derive(Clone)]
pub struct RegularFile {
pub(super) node: NodeRef,
pub(super) read: bool,

View File

@ -19,6 +19,7 @@ pub(crate) mod path;
pub(crate) mod poll;
pub(crate) mod traits;
pub use channel::MessagePayload;
pub use device::CharDevice;
pub use file::{DirectoryOpenPosition, File, FileRef, FileSet, InstanceData};
pub use ioctx::{Action, IoContext};

View File

@ -141,7 +141,7 @@ where
return Err(Error::ReadOnly);
}
let t = (self.read)()?;
Ok((0, Some(Box::new(t.as_instance_data()))))
Ok((0, Some(Arc::new(t.as_instance_data()))))
}
fn read(
@ -240,10 +240,10 @@ where
if opts.contains(OpenOptions::READ | OpenOptions::WRITE) {
Err(Error::InvalidOperation)
} else if opts.contains(OpenOptions::WRITE) {
Ok((0, Some(Box::new(FnNodeData::write()))))
Ok((0, Some(Arc::new(FnNodeData::write()))))
} else if opts.contains(OpenOptions::READ) {
let t = (self.read)()?;
Ok((0, Some(Box::new(FnNodeData::read(t)))))
Ok((0, Some(Arc::new(FnNodeData::read(t)))))
} else {
Err(Error::InvalidOperation)
}

View File

@ -35,7 +35,10 @@ use kernel_util::{
sync::IrqGuard,
};
use crate::{mem::phys::PhysicalMemoryRegion, task::sched::CpuQueue};
use crate::{
mem::phys::PhysicalMemoryRegion,
task::{sched::CpuQueue, Cpu},
};
cfg_if! {
if #[cfg(target_arch = "aarch64")] {
@ -278,8 +281,7 @@ fn __release_irq_guard(mask: bool) {
#[no_mangle]
fn __cpu_index() -> usize {
todo!()
// Cpu::local_id() as _
Cpu::local().id() as _
}
#[no_mangle]

View File

@ -5,15 +5,15 @@ use abi::{
error::Error,
io::{
DeviceRequest, DirectoryEntry, FileAttr, FileMode, MessageDestination, OpenOptions,
PollControl, RawFd, SeekFrom,
PollControl, RawFd, ReceivedMessageMetadata, SeekFrom, SentMessage,
},
mem::MappingSource,
process::{ExitCode, MutexOperation, Signal, SpawnOption, SpawnOptions, ThreadSpawnOptions},
syscall::SyscallFunction,
};
use alloc::sync::Arc;
use alloc::{boxed::Box, sync::Arc};
use kernel_util::{block, mem::table::EntryLevelExt, runtime, sync::IrqSafeSpinlockGuard};
use vfs::{File, IoContext, NodeRef, Read, Seek, Write};
use vfs::{File, IoContext, MessagePayload, NodeRef, Read, Seek, Write};
use yggdrasil_abi::{error::SyscallResult, io::MountOptions};
use crate::{
@ -372,31 +372,63 @@ fn syscall_handler(func: SyscallFunction, args: &[u64]) -> Result<usize, Error>
}
SyscallFunction::SendMessage => {
let fd = RawFd::from(args[0] as u32);
let buf = arg_buffer_ref(args[1] as usize, args[2] as usize)?;
let destination = MessageDestination::from(args[3]);
let message = arg_user_ref::<SentMessage>(args[1] as usize)?;
let destination = MessageDestination::from(args[2]);
run_with_io(process, |io| {
let file = io.files.file(fd)?;
let channel = file.as_message_channel()?;
channel.send_message(buf, destination)?;
match message {
&SentMessage::File(fd) => {
let sent_file = io.files.file(fd)?;
channel
.send_message(MessagePayload::File(sent_file.clone()), destination)?;
}
&SentMessage::Data(data) => {
channel.send_message(MessagePayload::Data(Box::from(data)), destination)?;
}
}
Ok(0)
})
}
SyscallFunction::ReceiveMessage => {
let fd = RawFd::from(args[0] as u32);
let buf = arg_buffer_mut(args[1] as usize, args[2] as usize)?;
let from = arg_user_mut::<MaybeUninit<u32>>(args[3] as usize)?;
let metadata = arg_user_mut::<MaybeUninit<ReceivedMessageMetadata>>(args[1] as usize)?;
let buf = arg_buffer_mut(args[2] as usize, args[3] as usize)?;
let from = arg_user_mut::<MaybeUninit<u32>>(args[4] as usize)?;
run_with_io(process, |io| {
run_with_io(process, |mut io| {
let file = io.files.file(fd)?;
let channel = file.as_message_channel()?;
let (id, len) = channel.receive_message(buf)?;
from.write(id);
let message = channel.receive_message()?;
from.write(message.source);
match &message.payload {
MessagePayload::Data(data) => {
// TODO allow truncated messages?
let len = data.len();
if buf.len() < len {
return Err(Error::MissingData);
}
metadata.write(ReceivedMessageMetadata::Data(len));
buf[..len].copy_from_slice(&data);
Ok(len)
}
MessagePayload::File(file) => {
let fd = io.files.place_file(file.clone(), true)?;
metadata.write(ReceivedMessageMetadata::File(fd));
Ok(0)
}
}
})
}
// Process management