From 7c38b84c39c6a6443b419b33d326d4803a6eec69 Mon Sep 17 00:00:00 2001 From: Mark Poliakov Date: Sat, 10 Aug 2024 22:29:02 +0300 Subject: [PATCH] ipc: unix impl for serde-ipc --- userspace/Cargo.lock | 113 ++++++++-- userspace/colors/src/main.rs | 53 +++-- .../libcolors/src/application/connection.rs | 34 ++- userspace/lib/serde-ipc/Cargo.toml | 1 + userspace/lib/serde-ipc/src/lib.rs | 175 ++++++++------- userspace/lib/serde-ipc/src/sys/mod.rs | 36 +++ userspace/lib/serde-ipc/src/sys/unix.rs | 210 ++++++++++++++++++ userspace/lib/serde-ipc/src/sys/yggdrasil.rs | 71 ++++++ 8 files changed, 543 insertions(+), 150 deletions(-) create mode 100644 userspace/lib/serde-ipc/src/sys/mod.rs create mode 100644 userspace/lib/serde-ipc/src/sys/unix.rs create mode 100644 userspace/lib/serde-ipc/src/sys/yggdrasil.rs diff --git a/userspace/Cargo.lock b/userspace/Cargo.lock index ebb81791..ce07a383 100644 --- a/userspace/Cargo.lock +++ b/userspace/Cargo.lock @@ -287,6 +287,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -305,6 +315,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + [[package]] name = "flexbuffers" version = "2.0.0" @@ -460,6 +476,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "lock_api" version = "0.4.11" @@ -757,6 +779,19 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "rustix" +version = "0.38.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +dependencies = [ + "bitflags 2.5.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "ryu" version = "1.0.17" @@ -784,6 +819,7 @@ version = "0.1.0" dependencies = [ "flexbuffers", "serde", + "tempfile", "thiserror", ] @@ -932,6 +968,19 @@ dependencies = [ "yggdrasil-rt", ] +[[package]] +name = "tempfile" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" +dependencies = [ + "cfg-if", + "fastrand", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + [[package]] name = "term" version = "0.1.0" @@ -1131,7 +1180,16 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", ] [[package]] @@ -1151,17 +1209,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -1172,9 +1231,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -1184,9 +1243,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -1196,9 +1255,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -1208,9 +1273,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -1220,9 +1285,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -1232,9 +1297,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -1244,9 +1309,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" diff --git a/userspace/colors/src/main.rs b/userspace/colors/src/main.rs index 5c399da7..c95ba809 100644 --- a/userspace/colors/src/main.rs +++ b/userspace/colors/src/main.rs @@ -3,11 +3,11 @@ // TODO rewrite and split this into meaningful components use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, os::{ fd::{AsRawFd, RawFd}, yggdrasil::io::{ - mapping::FileMapping, message_channel::ChannelPublisherId, poll::PollChannel, + mapping::FileMapping, poll::PollChannel, shared_memory::SharedMemory, }, }, @@ -21,7 +21,7 @@ use libcolors::{ event::{Event, KeyModifiers, WindowEvent, WindowInfo}, message::{ClientMessage, ServerMessage}, }; -use serde_ipc::{Receiver, Sender}; +use serde_ipc::{PeerAddr, Receiver, Sender}; use yggdrasil_abi::io::{KeyboardKey, KeyboardKeyEvent}; pub mod display; @@ -30,7 +30,7 @@ pub mod input; pub struct Window<'a> { window_id: u32, - client_id: ChannelPublisherId, + client_id: PeerAddr, surface_mapping: FileMapping<'a>, surface_data: &'a [u32], @@ -62,6 +62,9 @@ pub struct Server<'a, 'd> { input_state: InputState, + last_client_id: u32, + client_map: HashMap, + // Window management windows: BTreeMap>, rows: Vec, @@ -145,11 +148,11 @@ impl<'a, 'd> Server<'a, 'd> { let mut display = Display::open()?; let input = KeyboardInput::open()?; - let (sender, receiver) = serde_ipc::channel(libcolors::CHANNEL_NAME)?; + let (sender, receiver) = serde_ipc::listen(libcolors::CHANNEL_NAME)?; let sender = ServerSender(sender); poll.add(input.as_poll_fd())?; - poll.add(receiver.as_poll_fd())?; + poll.add(receiver.as_raw_fd())?; let background = 0xFFCCCCCC; display.fill(background); @@ -167,6 +170,9 @@ impl<'a, 'd> Server<'a, 'd> { padding: 4, background, + last_client_id: 0, + client_map: HashMap::new(), + windows: BTreeMap::new(), rows: vec![], last_window_id: 1, @@ -176,7 +182,7 @@ impl<'a, 'd> Server<'a, 'd> { fn create_window( &mut self, - client_id: ChannelPublisherId, + client_id: &PeerAddr, ) -> Result<(WindowInfo, RawFd), Error> { if self.rows.is_empty() { self.rows.push(Row::new( @@ -211,7 +217,7 @@ impl<'a, 'd> Server<'a, 'd> { let window = Window { window_id, - client_id, + client_id: client_id.clone(), surface_mapping, surface_data, }; @@ -341,7 +347,7 @@ impl<'a, 'd> Server<'a, 'd> { self.sender .send_event( Event::WindowEvent(window.window_id, WindowEvent::KeyInput(input)), - window.client_id, + &window.client_id, ) .ok(); } else { @@ -385,7 +391,7 @@ impl<'a, 'd> Server<'a, 'd> { self.sender .send_event( Event::WindowEvent(old_window.window_id, WindowEvent::FocusChanged(false)), - old_window.client_id, + &old_window.client_id, ) .ok(); } @@ -406,7 +412,7 @@ impl<'a, 'd> Server<'a, 'd> { self.sender .send_event( Event::WindowEvent(window.window_id, WindowEvent::FocusChanged(true)), - window.client_id, + &window.client_id, ) .ok(); } @@ -461,7 +467,7 @@ impl<'a, 'd> Server<'a, 'd> { height: frame.h, }, ), - window.client_id, + &window.client_id, ) .ok(); } @@ -470,27 +476,30 @@ impl<'a, 'd> Server<'a, 'd> { fn handle_client_message( &mut self, - client_id: ChannelPublisherId, + client_id: PeerAddr, message: ClientMessage, ) -> Result<(), Error> { match message { ClientMessage::ClientHello => { debug_trace!("{:?}: ClientHello", client_id); // Echo the ID back + self.last_client_id += 1; + let id = self.last_client_id; + self.client_map.insert(id, client_id.clone()); self.sender - .send_event(Event::ServerHello(client_id.into()), client_id) + .send_event(Event::ServerHello(id), &client_id) } ClientMessage::CreateWindow => { debug_trace!("{:?}: CreateWindow", client_id); - let (info, shm_fd) = self.create_window(client_id)?; + let (info, shm_fd) = self.create_window(&client_id)?; let window_id = info.window_id; self.sender - .send_event(Event::NewWindowInfo(info), client_id)?; - self.sender.send_fd(shm_fd, client_id)?; + .send_event(Event::NewWindowInfo(info), &client_id)?; + self.sender.send_fd(&shm_fd, &client_id)?; self.sender.send_event( Event::WindowEvent(window_id, WindowEvent::RedrawRequested), - client_id, + &client_id, )?; Ok(()) @@ -539,7 +548,7 @@ impl<'a, 'd> Server<'a, 'd> { let event = self.input.read_event()?; self.handle_keyboard_event(event)?; } - Some((fd, Ok(_))) if fd == self.receiver.as_poll_fd() => { + Some((fd, Ok(_))) if fd == self.receiver.as_raw_fd() => { let (client_id, message) = self.receiver.receive_message()?; self.handle_client_message(client_id, message)?; } @@ -566,14 +575,14 @@ impl<'a, 'd> Server<'a, 'd> { } impl ServerSender { - pub fn send_event(&mut self, event: Event, client_id: ChannelPublisherId) -> Result<(), Error> { + pub fn send_event(&self, event: Event, client_id: &PeerAddr) -> Result<(), Error> { self.0 .send_to(&ServerMessage::Event(event), client_id) .map_err(Error::from) } - pub fn send_fd(&mut self, fd: RawFd, client_id: ChannelPublisherId) -> Result<(), Error> { - self.0.send_file(fd, client_id).map_err(Error::from) + pub fn send_fd(&self, file: &F, client_id: &PeerAddr) -> Result<(), Error> { + self.0.send_file_to(file, client_id).map_err(Error::from) } } diff --git a/userspace/lib/libcolors/src/application/connection.rs b/userspace/lib/libcolors/src/application/connection.rs index 6fe1a537..68b26a27 100644 --- a/userspace/lib/libcolors/src/application/connection.rs +++ b/userspace/lib/libcolors/src/application/connection.rs @@ -1,13 +1,13 @@ use std::{ collections::VecDeque, os::{ - fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, - yggdrasil::io::{message_channel::ChannelPublisherId, poll::PollChannel}, + fd::{AsRawFd, OwnedFd, RawFd}, + yggdrasil::io::{poll::PollChannel}, }, time::Duration, }; -use serde_ipc::{Message, Receiver, Sender}; +use serde_ipc::{Either, Receiver, Sender}; use crate::{ error::Error, @@ -25,7 +25,7 @@ pub struct Connection { impl Connection { pub fn new() -> Result { - let (sender, receiver) = serde_ipc::channel(crate::CHANNEL_NAME)?; + let (sender, receiver) = serde_ipc::connect(crate::CHANNEL_NAME)?; let timeout = Duration::from_secs(1); let mut poll = PollChannel::new()?; let event_queue = VecDeque::new(); @@ -51,17 +51,16 @@ impl Connection { return Err(Error::CommunicationTimeout); }; - // TODO ignore non-server messages - let (_, msg) = self.receiver.receive_raw()?; + let (_, msg) = self.receiver.receive()?; + // TODO ignore non-server messages match msg { - Message::File(fd) => { - let file = unsafe { OwnedFd::from_raw_fd(fd) }; - break Ok(file); - } - Message::Data(ServerMessage::Event(event)) => { + Either::Left(ServerMessage::Event(event)) => { self.event_queue.push_back(event); } + Either::Right(fd) => { + break Ok(fd); + } } } } @@ -77,15 +76,13 @@ impl Connection { // Unless we're doing a request, the server should not send any FDs, so just drop // anything that's not a message - let (_, Message::Data(ServerMessage::Event(event))) = self.receiver.receive_raw()? - else { + let (_, Either::Left(ServerMessage::Event(event))) = self.receiver.receive()? else { continue; }; match predicate(event) { Ok(val) => break Ok(val), Err(ev) => { - // Predicate rejected the event self.event_queue.push_back(ev); } } @@ -106,8 +103,7 @@ impl Connection { None => continue, }; - let (_, Message::Data(ServerMessage::Event(event))) = self.receiver.receive_raw()? - else { + let (_, Either::Left(ServerMessage::Event(event))) = self.receiver.receive()? else { continue; }; @@ -116,14 +112,12 @@ impl Connection { } pub fn send(&mut self, msg: ClientMessage) -> Result<(), Error> { - self.sender - .send_to(&msg, ChannelPublisherId::ZERO) - .map_err(Error::from) + self.sender.send_to_host(&msg).map_err(Error::from) } pub fn connect(&mut self) -> Result { self.sender - .send_to(&ClientMessage::ClientHello, ChannelPublisherId::ZERO)?; + .send_to_host(&ClientMessage::ClientHello)?; self.receive_map(|ev| { if let Event::ServerHello(id) = ev { diff --git a/userspace/lib/serde-ipc/Cargo.toml b/userspace/lib/serde-ipc/Cargo.toml index b8d17afd..64587118 100644 --- a/userspace/lib/serde-ipc/Cargo.toml +++ b/userspace/lib/serde-ipc/Cargo.toml @@ -7,4 +7,5 @@ authors = ["Mark Poliakov "] [dependencies] flexbuffers = "2.0.0" serde = { version = "1.0.193", features = ["derive"] } +tempfile = "3.12.0" thiserror = "1.0.56" diff --git a/userspace/lib/serde-ipc/src/lib.rs b/userspace/lib/serde-ipc/src/lib.rs index 4ae3eede..bd5c38cc 100644 --- a/userspace/lib/serde-ipc/src/lib.rs +++ b/userspace/lib/serde-ipc/src/lib.rs @@ -1,18 +1,20 @@ -#![feature(yggdrasil_os, rustc_private)] +#![cfg_attr(target_os = "yggdrasil", feature(yggdrasil_os, rustc_private))] +#![cfg_attr(unix, feature(unix_socket_ancillary_data))] use std::{ io, marker::PhantomData, - os::{ - fd::{AsRawFd, RawFd}, - yggdrasil::io::message_channel::{ - ChannelPublisherId, MessageChannel, MessageChannelReceiver, MessageChannelSender, - MessageDestination, MessageReceiver, MessageSender, ReceivedMessageMetadata, - }, - }, + os::fd::{AsRawFd, OwnedFd, RawFd}, }; use serde::{de::DeserializeOwned, Serialize}; +use sys::{IpcReceiver, RawReceiver, RawSender}; + +mod sys; + +pub use sys::{Either, PeerAddr}; + +use crate::sys::IpcSender; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -21,100 +23,123 @@ pub enum Error { #[error("Deserialization error: {0}")] DeserializeError(flexbuffers::DeserializationError), #[error("I/O error: {0}")] - IoError(io::Error), + IoError(#[from] io::Error), } pub struct Sender { - inner: MessageChannelSender, + inner: RawSender, + // inner: MessageChannelSender, _pd: PhantomData, } pub struct Receiver { - inner: MessageChannelReceiver, + inner: RawReceiver, + // inner: MessageChannelReceiver, buffer: [u8; 1024], _pd: PhantomData, } -pub enum Message { - Data(T), - File(RawFd), -} - -fn raw_send_message_to( - sender: &S, - msg: &T, - id: ChannelPublisherId, -) -> Result<(), Error> { - let msg = flexbuffers::to_vec(msg)?; - sender - .send_message(&msg, MessageDestination::Specific(id.into())) - .map_err(Error::from) -} - -fn raw_send_file_to( - sender: &S, - file: &F, - id: ChannelPublisherId, -) -> Result<(), Error> { - sender - .send_raw_fd(file.as_raw_fd(), MessageDestination::Specific(id.into())) - .map_err(Error::from) -} - -pub fn channel( - name: &str, -) -> Result<(Sender, Receiver), Error> { - let raw = MessageChannel::open(name, true)?; - let (raw_sender, raw_receiver) = raw.split(); - Ok(( +fn wrap( + sender: RawSender, + receiver: RawReceiver, +) -> (Sender, Receiver) { + ( Sender { - inner: raw_sender, + inner: sender, _pd: PhantomData, }, Receiver { - inner: raw_receiver, + inner: receiver, buffer: [0; 1024], _pd: PhantomData, }, - )) + ) +} + +#[cfg(target_os = "yggdrasil")] +pub fn listen( + name: &str, +) -> Result<(Sender, Receiver), Error> { + let (sender, receiver) = sys::create_channel(name)?; + Ok(wrap(sender, receiver)) +} + +#[cfg(target_os = "yggdrasil")] +pub fn connect( + name: &str, +) -> Result<(Sender, Receiver), Error> { + let (sender, receiver) = sys::create_channel(name)?; + Ok(wrap(sender, receiver)) +} + +#[cfg(any(unix, rust_analyzer))] +pub fn listen>( + path: P, +) -> Result<(Sender, Receiver), Error> { + let (sender, receiver) = sys::listen(path)?; + Ok(wrap(sender, receiver)) +} + +#[cfg(any(unix, rust_analyzer))] +pub fn connect>( + path: P, +) -> Result<(Sender, Receiver), Error> { + let (sender, receiver) = sys::connect(path)?; + Ok(wrap(sender, receiver)) } impl Sender { - pub fn send_to(&mut self, msg: &T, id: ChannelPublisherId) -> Result<(), Error> { - raw_send_message_to(&self.inner, msg, id) + pub fn send_to(&self, message: &T, peer: &PeerAddr) -> Result<(), Error> { + let message = flexbuffers::to_vec(message).map_err(Error::SerializeError)?; + self.inner + .send_message_to(&message, peer) + .map_err(Error::IoError) } - pub fn send_file(&mut self, fd: RawFd, id: ChannelPublisherId) -> Result<(), Error> { - raw_send_file_to(&self.inner, &fd, id) + #[inline] + pub fn send_to_host(&self, message: &T) -> Result<(), Error> { + self.send_to(message, self.inner.host_addr()) + } + + pub fn send_file_to(&self, file: &F, peer: &PeerAddr) -> Result<(), Error> { + self.inner.send_file_to(file, peer).map_err(Error::IoError) } } -impl Receiver { - pub fn receive_message(&mut self) -> Result<(ChannelPublisherId, T), Error> { +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl Receiver { + pub fn receive(&mut self) -> Result<(PeerAddr, Either), Error> { + let (peer, message) = self.inner.receive(&mut self.buffer)?; + let message = message + .try_map_left(|len| flexbuffers::from_slice(&self.buffer[..len])) + .map_err(Error::DeserializeError)?; + Ok((peer, message)) + } + + pub fn receive_message(&mut self) -> Result<(PeerAddr, U), Error> { loop { - let (id, message) = self.receive_raw()?; + let (peer, message) = self.receive()?; - if let Message::Data(data) = message { - break Ok((id, data)); + if let Either::Left(message) = message { + break Ok((peer, message)); } } } - pub fn receive_raw(&mut self) -> Result<(ChannelPublisherId, Message), Error> { - let (id, metadata) = self.inner.receive_raw(&mut self.buffer)?; + pub fn receive_file(&mut self) -> Result<(PeerAddr, OwnedFd), Error> { + loop { + let (peer, message) = self.receive()?; - match metadata { - ReceivedMessageMetadata::Data(len) => { - let msg = flexbuffers::from_slice(&self.buffer[..len])?; - Ok((id, Message::Data(msg))) + if let Either::Right(fd) = message { + break Ok((peer, fd)); } - ReceivedMessageMetadata::File(fd) => Ok((id, Message::File(fd))), } } - - pub fn as_poll_fd(&self) -> RawFd { - self.as_raw_fd() - } } impl AsRawFd for Receiver { @@ -122,21 +147,3 @@ impl AsRawFd for Receiver { self.inner.as_raw_fd() } } - -impl From for Error { - fn from(value: flexbuffers::SerializationError) -> Self { - Self::SerializeError(value) - } -} - -impl From for Error { - fn from(value: flexbuffers::DeserializationError) -> Self { - Self::DeserializeError(value) - } -} - -impl From for Error { - fn from(value: io::Error) -> Self { - Self::IoError(value) - } -} diff --git a/userspace/lib/serde-ipc/src/sys/mod.rs b/userspace/lib/serde-ipc/src/sys/mod.rs new file mode 100644 index 00000000..1928130d --- /dev/null +++ b/userspace/lib/serde-ipc/src/sys/mod.rs @@ -0,0 +1,36 @@ +use std::{io, os::fd::{AsRawFd, OwnedFd}}; + +#[cfg(target_os = "yggdrasil")] +pub mod yggdrasil; +#[cfg(target_os = "yggdrasil")] +pub use yggdrasil::*; + +#[cfg(any(unix, rust_analyzer))] +pub mod unix; +#[cfg(unix)] +pub use unix::*; + +pub enum Either { + Left(T), + Right(U) +} + +pub trait IpcSender: AsRawFd { + fn send_message_to(&self, message: &[u8], peer: &PeerAddr) -> Result<(), io::Error>; + fn send_file_to(&self, file: &F, peer: &PeerAddr) -> Result<(), io::Error>; + + fn host_addr(&self) -> &PeerAddr; +} + +pub trait IpcReceiver: AsRawFd { + fn receive(&self, buffer: &mut [u8]) -> Result<(PeerAddr, Either), io::Error>; +} + +impl Either { + pub fn try_map_left Result>(self, map: F) -> Result, E> { + match self { + Self::Left(val) => map(val).map(Either::Left), + Self::Right(val) => Ok(Either::Right(val)) + } + } +} diff --git a/userspace/lib/serde-ipc/src/sys/unix.rs b/userspace/lib/serde-ipc/src/sys/unix.rs new file mode 100644 index 00000000..0276fd37 --- /dev/null +++ b/userspace/lib/serde-ipc/src/sys/unix.rs @@ -0,0 +1,210 @@ +use std::{ + io::{self, IoSlice, IoSliceMut}, + os::{ + fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, + unix::net::{AncillaryData, SocketAddr, SocketAncillary, UnixDatagram}, + }, + path::Path, + sync::Arc, +}; + +use tempfile::{tempdir, NamedTempFile, TempDir, TempPath}; + +use crate::Either; + +use super::{IpcReceiver, IpcSender}; + +#[derive(Debug, Clone)] +pub struct PeerAddr(SocketAddr); + +struct Inner { + socket: UnixDatagram, + listener_addr: PeerAddr, + client_addr: Option, +} + +pub struct RawSender { + inner: Arc, +} + +pub struct RawReceiver { + inner: Arc, +} + +impl AsRawFd for Inner { + fn as_raw_fd(&self) -> RawFd { + self.socket.as_raw_fd() + } +} + +impl IpcSender for RawSender { + fn send_message_to(&self, message: &[u8], peer: &PeerAddr) -> Result<(), io::Error> { + self.inner.socket.send_to_addr(message, &peer.0).map(|_| ()) + } + + fn send_file_to(&self, file: &F, peer: &PeerAddr) -> Result<(), io::Error> { + let peer = peer.0.as_pathname().expect("Peer address is not a path"); + let mut ancillary_buffer = [0; 128]; + let mut ancillary = SocketAncillary::new(&mut ancillary_buffer); + + ancillary.add_fds(&[file.as_raw_fd()]); + + self.inner.socket.send_vectored_with_ancillary_to(&[], &mut ancillary, peer).map(|_| ()) + } + + fn host_addr(&self) -> &PeerAddr { + &self.inner.listener_addr + } +} + +impl AsRawFd for RawReceiver { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IpcReceiver for RawReceiver { + fn receive(&self, buffer: &mut [u8]) -> Result<(PeerAddr, Either), io::Error> { + let mut ancillary_buffer = [0; 128]; + let mut ancillary = SocketAncillary::new(&mut ancillary_buffer); + + let (len, truncated, addr) = self + .inner + .socket + .recv_vectored_with_ancillary_from(&mut [IoSliceMut::new(buffer)], &mut ancillary)?; + let peer = PeerAddr(addr); + + for message in ancillary.messages() { + if let Ok(AncillaryData::ScmRights(mut rights)) = message { + if let Some(fd) = rights.next() { + return Ok((peer, Either::Right(unsafe { OwnedFd::from_raw_fd(fd) }))); + } + } + } + + assert_ne!(len, 0); + Ok((peer, Either::Left(len))) + } +} + +impl AsRawFd for RawSender { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +pub fn listen>(path: P) -> Result<(RawSender, RawReceiver), io::Error> { + let socket = UnixDatagram::bind(&path)?; + let listener_addr = PeerAddr(socket.local_addr()?); + let inner = Arc::new(Inner { + client_addr: None, + socket, + listener_addr, + }); + + let sender = RawSender { + inner: inner.clone(), + }; + let receiver = RawReceiver { inner }; + Ok((sender, receiver)) +} + +pub fn connect>(path: P) -> Result<(RawSender, RawReceiver), io::Error> { + let temp_dir = tempdir()?; + let client_addr = temp_dir.path().join("socket"); + let socket = UnixDatagram::bind(&client_addr)?; + socket.connect(&path)?; + let listener_addr = PeerAddr(socket.peer_addr()?); + + let inner = Arc::new(Inner { + client_addr: Some(temp_dir), + socket, + listener_addr, + }); + let sender = RawSender { + inner: inner.clone(), + }; + let receiver = RawReceiver { inner }; + Ok((sender, receiver)) +} + +#[cfg(test)] +mod tests { + use std::{fs::File, io::{Read, Write}, os::{fd::{AsRawFd, FromRawFd}, unix::net::SocketAddr}}; + + use tempfile::{tempdir, NamedTempFile}; + + use crate::{sys::{IpcReceiver, IpcSender}, Either}; + + use super::{connect, listen, PeerAddr}; + + #[test] + fn peer_addr() { + let temp_dir = tempdir().unwrap(); + let listen_path = temp_dir.path().join("listener"); + let (lsender, _) = listen(&listen_path).unwrap(); + let (csender, _) = connect(listen_path).unwrap(); + + let PeerAddr(l) = lsender.host_addr(); + let PeerAddr(c) = csender.host_addr(); + + let lp = l.as_pathname().unwrap(); + let cp = c.as_pathname().unwrap(); + + assert_eq!(lp, cp); + } + + #[test] + fn receive_message() { + let temp_dir = tempdir().unwrap(); + let listen_path = temp_dir.path().join("listener"); + let (_, lreceiver) = listen(&listen_path).unwrap(); + let (csender, _) = connect(listen_path).unwrap(); + + let msg = b"Hello"; + csender.send_message_to(msg, csender.host_addr()).unwrap(); + let mut buffer = [0; 512]; + let (_, message) = lreceiver.receive(&mut buffer).unwrap(); + let Either::Left(len) = message else { + panic!(); + }; + + assert_eq!(len, msg.len()); + assert_eq!(&buffer[..len], msg); + } + + #[test] + fn receive_file() { + let text = b"Hello"; + + let temp_dir = tempdir().unwrap(); + let temp_file = temp_dir.path().join("testfile"); + + { + let mut file = File::create(&temp_file).unwrap(); + file.write_all(text).unwrap(); + } + + let listen_path = temp_dir.path().join("listener"); + let (_, lreceiver) = listen(&listen_path).unwrap(); + let (csender, _) = connect(listen_path).unwrap(); + + { + let file = File::open(&temp_file).unwrap(); + csender.send_file_to(&file, csender.host_addr()).unwrap(); + } + + { + let mut buffer = [0; 512]; + let (_, message) = lreceiver.receive(&mut buffer).unwrap(); + let Either::Right(fd) = message else { + panic!(); + }; + + let mut file = unsafe { File::from_raw_fd(fd.as_raw_fd()) }; + let len = file.read(&mut buffer).unwrap(); + assert_eq!(len, text.len()); + assert_eq!(&buffer[..len], text); + } + } +} diff --git a/userspace/lib/serde-ipc/src/sys/yggdrasil.rs b/userspace/lib/serde-ipc/src/sys/yggdrasil.rs new file mode 100644 index 00000000..dd1057a0 --- /dev/null +++ b/userspace/lib/serde-ipc/src/sys/yggdrasil.rs @@ -0,0 +1,71 @@ +use std::{ + io, + os::{ + fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, + yggdrasil::io::message_channel::{ + ChannelPublisherId, MessageChannel, MessageChannelReceiver, MessageChannelSender, + MessageDestination, MessageReceiver, MessageSender, ReceivedMessageMetadata, + }, + }, +}; + +use super::{Either, IpcReceiver, IpcSender}; + +#[derive(Debug, Clone)] +pub struct PeerAddr(ChannelPublisherId); + +pub struct RawSender(MessageChannelSender); +pub struct RawReceiver(MessageChannelReceiver); + +const HOST_ADDR: PeerAddr = PeerAddr(ChannelPublisherId::ZERO); + +impl IpcSender for RawSender { + fn send_file_to(&self, file: &F, peer: &PeerAddr) -> Result<(), io::Error> { + self.0.send_raw_fd( + file.as_raw_fd(), + MessageDestination::Specific(peer.0.into()), + ) + } + + fn send_message_to(&self, message: &[u8], peer: &PeerAddr) -> Result<(), io::Error> { + self.0 + .send_message(message, MessageDestination::Specific(peer.0.into())) + } + + fn host_addr(&self) -> &PeerAddr { + // TODO + &HOST_ADDR + } +} + +impl AsRawFd for RawSender { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +impl IpcReceiver for RawReceiver { + fn receive(&self, buffer: &mut [u8]) -> Result<(PeerAddr, Either), io::Error> { + let (addr, metadata) = self.0.receive_raw(buffer)?; + let peer = PeerAddr(addr); + + match metadata { + ReceivedMessageMetadata::File(fd) => { + Ok((peer, Either::Right(unsafe { OwnedFd::from_raw_fd(fd) }))) + } + ReceivedMessageMetadata::Data(len) => Ok((peer, Either::Left(len))), + } + } +} + +impl AsRawFd for RawReceiver { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +pub fn create_channel(name: &str) -> Result<(RawSender, RawReceiver), io::Error> { + let channel = MessageChannel::open(name, true)?; + let (sender, receiver) = channel.split(); + Ok((RawSender(sender), RawReceiver(receiver))) +}