ipc: unix impl for serde-ipc

This commit is contained in:
Mark Poliakov 2024-08-10 22:29:02 +03:00
parent 99f4482533
commit 7c38b84c39
8 changed files with 543 additions and 150 deletions

113
userspace/Cargo.lock generated
View File

@ -287,6 +287,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "error-chain" name = "error-chain"
version = "0.12.4" version = "0.12.4"
@ -305,6 +315,12 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "fastrand"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
[[package]] [[package]]
name = "flexbuffers" name = "flexbuffers"
version = "2.0.0" version = "2.0.0"
@ -460,6 +476,12 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.11" version = "0.4.11"
@ -757,6 +779,19 @@ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
] ]
[[package]]
name = "rustix"
version = "0.38.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
dependencies = [
"bitflags 2.5.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.17" version = "1.0.17"
@ -784,6 +819,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"flexbuffers", "flexbuffers",
"serde", "serde",
"tempfile",
"thiserror", "thiserror",
] ]
@ -932,6 +968,19 @@ dependencies = [
"yggdrasil-rt", "yggdrasil-rt",
] ]
[[package]]
name = "tempfile"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64"
dependencies = [
"cfg-if",
"fastrand",
"once_cell",
"rustix",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "term" name = "term"
version = "0.1.0" version = "0.1.0"
@ -1131,7 +1180,16 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [ dependencies = [
"windows-targets 0.52.4", "windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
] ]
[[package]] [[package]]
@ -1151,17 +1209,18 @@ dependencies = [
[[package]] [[package]]
name = "windows-targets" name = "windows-targets"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm 0.52.4", "windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.4", "windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.4", "windows_i686_gnu 0.52.6",
"windows_i686_msvc 0.52.4", "windows_i686_gnullvm",
"windows_x86_64_gnu 0.52.4", "windows_i686_msvc 0.52.6",
"windows_x86_64_gnullvm 0.52.4", "windows_x86_64_gnu 0.52.6",
"windows_x86_64_msvc 0.52.4", "windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
] ]
[[package]] [[package]]
@ -1172,9 +1231,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
@ -1184,9 +1243,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
@ -1196,9 +1255,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
@ -1208,9 +1273,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
@ -1220,9 +1285,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
@ -1232,9 +1297,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
@ -1244,9 +1309,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.52.4" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]] [[package]]
name = "winnow" name = "winnow"

View File

@ -3,11 +3,11 @@
// TODO rewrite and split this into meaningful components // TODO rewrite and split this into meaningful components
use std::{ use std::{
collections::BTreeMap, collections::{BTreeMap, HashMap},
os::{ os::{
fd::{AsRawFd, RawFd}, fd::{AsRawFd, RawFd},
yggdrasil::io::{ yggdrasil::io::{
mapping::FileMapping, message_channel::ChannelPublisherId, poll::PollChannel, mapping::FileMapping, poll::PollChannel,
shared_memory::SharedMemory, shared_memory::SharedMemory,
}, },
}, },
@ -21,7 +21,7 @@ use libcolors::{
event::{Event, KeyModifiers, WindowEvent, WindowInfo}, event::{Event, KeyModifiers, WindowEvent, WindowInfo},
message::{ClientMessage, ServerMessage}, message::{ClientMessage, ServerMessage},
}; };
use serde_ipc::{Receiver, Sender}; use serde_ipc::{PeerAddr, Receiver, Sender};
use yggdrasil_abi::io::{KeyboardKey, KeyboardKeyEvent}; use yggdrasil_abi::io::{KeyboardKey, KeyboardKeyEvent};
pub mod display; pub mod display;
@ -30,7 +30,7 @@ pub mod input;
pub struct Window<'a> { pub struct Window<'a> {
window_id: u32, window_id: u32,
client_id: ChannelPublisherId, client_id: PeerAddr,
surface_mapping: FileMapping<'a>, surface_mapping: FileMapping<'a>,
surface_data: &'a [u32], surface_data: &'a [u32],
@ -62,6 +62,9 @@ pub struct Server<'a, 'd> {
input_state: InputState, input_state: InputState,
last_client_id: u32,
client_map: HashMap<u32, PeerAddr>,
// Window management // Window management
windows: BTreeMap<u32, Window<'a>>, windows: BTreeMap<u32, Window<'a>>,
rows: Vec<Row>, rows: Vec<Row>,
@ -145,11 +148,11 @@ impl<'a, 'd> Server<'a, 'd> {
let mut display = Display::open()?; let mut display = Display::open()?;
let input = KeyboardInput::open()?; let input = KeyboardInput::open()?;
let (sender, receiver) = serde_ipc::channel(libcolors::CHANNEL_NAME)?; let (sender, receiver) = serde_ipc::listen(libcolors::CHANNEL_NAME)?;
let sender = ServerSender(sender); let sender = ServerSender(sender);
poll.add(input.as_poll_fd())?; poll.add(input.as_poll_fd())?;
poll.add(receiver.as_poll_fd())?; poll.add(receiver.as_raw_fd())?;
let background = 0xFFCCCCCC; let background = 0xFFCCCCCC;
display.fill(background); display.fill(background);
@ -167,6 +170,9 @@ impl<'a, 'd> Server<'a, 'd> {
padding: 4, padding: 4,
background, background,
last_client_id: 0,
client_map: HashMap::new(),
windows: BTreeMap::new(), windows: BTreeMap::new(),
rows: vec![], rows: vec![],
last_window_id: 1, last_window_id: 1,
@ -176,7 +182,7 @@ impl<'a, 'd> Server<'a, 'd> {
fn create_window( fn create_window(
&mut self, &mut self,
client_id: ChannelPublisherId, client_id: &PeerAddr,
) -> Result<(WindowInfo, RawFd), Error> { ) -> Result<(WindowInfo, RawFd), Error> {
if self.rows.is_empty() { if self.rows.is_empty() {
self.rows.push(Row::new( self.rows.push(Row::new(
@ -211,7 +217,7 @@ impl<'a, 'd> Server<'a, 'd> {
let window = Window { let window = Window {
window_id, window_id,
client_id, client_id: client_id.clone(),
surface_mapping, surface_mapping,
surface_data, surface_data,
}; };
@ -341,7 +347,7 @@ impl<'a, 'd> Server<'a, 'd> {
self.sender self.sender
.send_event( .send_event(
Event::WindowEvent(window.window_id, WindowEvent::KeyInput(input)), Event::WindowEvent(window.window_id, WindowEvent::KeyInput(input)),
window.client_id, &window.client_id,
) )
.ok(); .ok();
} else { } else {
@ -385,7 +391,7 @@ impl<'a, 'd> Server<'a, 'd> {
self.sender self.sender
.send_event( .send_event(
Event::WindowEvent(old_window.window_id, WindowEvent::FocusChanged(false)), Event::WindowEvent(old_window.window_id, WindowEvent::FocusChanged(false)),
old_window.client_id, &old_window.client_id,
) )
.ok(); .ok();
} }
@ -406,7 +412,7 @@ impl<'a, 'd> Server<'a, 'd> {
self.sender self.sender
.send_event( .send_event(
Event::WindowEvent(window.window_id, WindowEvent::FocusChanged(true)), Event::WindowEvent(window.window_id, WindowEvent::FocusChanged(true)),
window.client_id, &window.client_id,
) )
.ok(); .ok();
} }
@ -461,7 +467,7 @@ impl<'a, 'd> Server<'a, 'd> {
height: frame.h, height: frame.h,
}, },
), ),
window.client_id, &window.client_id,
) )
.ok(); .ok();
} }
@ -470,27 +476,30 @@ impl<'a, 'd> Server<'a, 'd> {
fn handle_client_message( fn handle_client_message(
&mut self, &mut self,
client_id: ChannelPublisherId, client_id: PeerAddr,
message: ClientMessage, message: ClientMessage,
) -> Result<(), Error> { ) -> Result<(), Error> {
match message { match message {
ClientMessage::ClientHello => { ClientMessage::ClientHello => {
debug_trace!("{:?}: ClientHello", client_id); debug_trace!("{:?}: ClientHello", client_id);
// Echo the ID back // Echo the ID back
self.last_client_id += 1;
let id = self.last_client_id;
self.client_map.insert(id, client_id.clone());
self.sender self.sender
.send_event(Event::ServerHello(client_id.into()), client_id) .send_event(Event::ServerHello(id), &client_id)
} }
ClientMessage::CreateWindow => { ClientMessage::CreateWindow => {
debug_trace!("{:?}: CreateWindow", client_id); debug_trace!("{:?}: CreateWindow", client_id);
let (info, shm_fd) = self.create_window(client_id)?; let (info, shm_fd) = self.create_window(&client_id)?;
let window_id = info.window_id; let window_id = info.window_id;
self.sender self.sender
.send_event(Event::NewWindowInfo(info), client_id)?; .send_event(Event::NewWindowInfo(info), &client_id)?;
self.sender.send_fd(shm_fd, client_id)?; self.sender.send_fd(&shm_fd, &client_id)?;
self.sender.send_event( self.sender.send_event(
Event::WindowEvent(window_id, WindowEvent::RedrawRequested), Event::WindowEvent(window_id, WindowEvent::RedrawRequested),
client_id, &client_id,
)?; )?;
Ok(()) Ok(())
@ -539,7 +548,7 @@ impl<'a, 'd> Server<'a, 'd> {
let event = self.input.read_event()?; let event = self.input.read_event()?;
self.handle_keyboard_event(event)?; self.handle_keyboard_event(event)?;
} }
Some((fd, Ok(_))) if fd == self.receiver.as_poll_fd() => { Some((fd, Ok(_))) if fd == self.receiver.as_raw_fd() => {
let (client_id, message) = self.receiver.receive_message()?; let (client_id, message) = self.receiver.receive_message()?;
self.handle_client_message(client_id, message)?; self.handle_client_message(client_id, message)?;
} }
@ -566,14 +575,14 @@ impl<'a, 'd> Server<'a, 'd> {
} }
impl ServerSender { impl ServerSender {
pub fn send_event(&mut self, event: Event, client_id: ChannelPublisherId) -> Result<(), Error> { pub fn send_event(&self, event: Event, client_id: &PeerAddr) -> Result<(), Error> {
self.0 self.0
.send_to(&ServerMessage::Event(event), client_id) .send_to(&ServerMessage::Event(event), client_id)
.map_err(Error::from) .map_err(Error::from)
} }
pub fn send_fd(&mut self, fd: RawFd, client_id: ChannelPublisherId) -> Result<(), Error> { pub fn send_fd<F: AsRawFd>(&self, file: &F, client_id: &PeerAddr) -> Result<(), Error> {
self.0.send_file(fd, client_id).map_err(Error::from) self.0.send_file_to(file, client_id).map_err(Error::from)
} }
} }

View File

@ -1,13 +1,13 @@
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
os::{ os::{
fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, fd::{AsRawFd, OwnedFd, RawFd},
yggdrasil::io::{message_channel::ChannelPublisherId, poll::PollChannel}, yggdrasil::io::{poll::PollChannel},
}, },
time::Duration, time::Duration,
}; };
use serde_ipc::{Message, Receiver, Sender}; use serde_ipc::{Either, Receiver, Sender};
use crate::{ use crate::{
error::Error, error::Error,
@ -25,7 +25,7 @@ pub struct Connection {
impl Connection { impl Connection {
pub fn new() -> Result<Self, Error> { pub fn new() -> Result<Self, Error> {
let (sender, receiver) = serde_ipc::channel(crate::CHANNEL_NAME)?; let (sender, receiver) = serde_ipc::connect(crate::CHANNEL_NAME)?;
let timeout = Duration::from_secs(1); let timeout = Duration::from_secs(1);
let mut poll = PollChannel::new()?; let mut poll = PollChannel::new()?;
let event_queue = VecDeque::new(); let event_queue = VecDeque::new();
@ -51,17 +51,16 @@ impl Connection {
return Err(Error::CommunicationTimeout); return Err(Error::CommunicationTimeout);
}; };
// TODO ignore non-server messages let (_, msg) = self.receiver.receive()?;
let (_, msg) = self.receiver.receive_raw()?;
// TODO ignore non-server messages
match msg { match msg {
Message::File(fd) => { Either::Left(ServerMessage::Event(event)) => {
let file = unsafe { OwnedFd::from_raw_fd(fd) };
break Ok(file);
}
Message::Data(ServerMessage::Event(event)) => {
self.event_queue.push_back(event); self.event_queue.push_back(event);
} }
Either::Right(fd) => {
break Ok(fd);
}
} }
} }
} }
@ -77,15 +76,13 @@ impl Connection {
// Unless we're doing a request, the server should not send any FDs, so just drop // Unless we're doing a request, the server should not send any FDs, so just drop
// anything that's not a message // anything that's not a message
let (_, Message::Data(ServerMessage::Event(event))) = self.receiver.receive_raw()? let (_, Either::Left(ServerMessage::Event(event))) = self.receiver.receive()? else {
else {
continue; continue;
}; };
match predicate(event) { match predicate(event) {
Ok(val) => break Ok(val), Ok(val) => break Ok(val),
Err(ev) => { Err(ev) => {
// Predicate rejected the event
self.event_queue.push_back(ev); self.event_queue.push_back(ev);
} }
} }
@ -106,8 +103,7 @@ impl Connection {
None => continue, None => continue,
}; };
let (_, Message::Data(ServerMessage::Event(event))) = self.receiver.receive_raw()? let (_, Either::Left(ServerMessage::Event(event))) = self.receiver.receive()? else {
else {
continue; continue;
}; };
@ -116,14 +112,12 @@ impl Connection {
} }
pub fn send(&mut self, msg: ClientMessage) -> Result<(), Error> { pub fn send(&mut self, msg: ClientMessage) -> Result<(), Error> {
self.sender self.sender.send_to_host(&msg).map_err(Error::from)
.send_to(&msg, ChannelPublisherId::ZERO)
.map_err(Error::from)
} }
pub fn connect(&mut self) -> Result<u32, Error> { pub fn connect(&mut self) -> Result<u32, Error> {
self.sender self.sender
.send_to(&ClientMessage::ClientHello, ChannelPublisherId::ZERO)?; .send_to_host(&ClientMessage::ClientHello)?;
self.receive_map(|ev| { self.receive_map(|ev| {
if let Event::ServerHello(id) = ev { if let Event::ServerHello(id) = ev {

View File

@ -7,4 +7,5 @@ authors = ["Mark Poliakov <mark@alnyan.me>"]
[dependencies] [dependencies]
flexbuffers = "2.0.0" flexbuffers = "2.0.0"
serde = { version = "1.0.193", features = ["derive"] } serde = { version = "1.0.193", features = ["derive"] }
tempfile = "3.12.0"
thiserror = "1.0.56" thiserror = "1.0.56"

View File

@ -1,18 +1,20 @@
#![feature(yggdrasil_os, rustc_private)] #![cfg_attr(target_os = "yggdrasil", feature(yggdrasil_os, rustc_private))]
#![cfg_attr(unix, feature(unix_socket_ancillary_data))]
use std::{ use std::{
io, io,
marker::PhantomData, marker::PhantomData,
os::{ os::fd::{AsRawFd, OwnedFd, RawFd},
fd::{AsRawFd, RawFd},
yggdrasil::io::message_channel::{
ChannelPublisherId, MessageChannel, MessageChannelReceiver, MessageChannelSender,
MessageDestination, MessageReceiver, MessageSender, ReceivedMessageMetadata,
},
},
}; };
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use sys::{IpcReceiver, RawReceiver, RawSender};
mod sys;
pub use sys::{Either, PeerAddr};
use crate::sys::IpcSender;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum Error { pub enum Error {
@ -21,100 +23,123 @@ pub enum Error {
#[error("Deserialization error: {0}")] #[error("Deserialization error: {0}")]
DeserializeError(flexbuffers::DeserializationError), DeserializeError(flexbuffers::DeserializationError),
#[error("I/O error: {0}")] #[error("I/O error: {0}")]
IoError(io::Error), IoError(#[from] io::Error),
} }
pub struct Sender<M: Serialize> { pub struct Sender<M: Serialize> {
inner: MessageChannelSender, inner: RawSender,
// inner: MessageChannelSender,
_pd: PhantomData<M>, _pd: PhantomData<M>,
} }
pub struct Receiver<M: DeserializeOwned> { pub struct Receiver<M: DeserializeOwned> {
inner: MessageChannelReceiver, inner: RawReceiver,
// inner: MessageChannelReceiver,
buffer: [u8; 1024], buffer: [u8; 1024],
_pd: PhantomData<M>, _pd: PhantomData<M>,
} }
pub enum Message<T> { fn wrap<T: Serialize, U: DeserializeOwned>(
Data(T), sender: RawSender,
File(RawFd), receiver: RawReceiver,
} ) -> (Sender<T>, Receiver<U>) {
(
fn raw_send_message_to<T: Serialize, S: MessageSender>(
sender: &S,
msg: &T,
id: ChannelPublisherId,
) -> Result<(), Error> {
let msg = flexbuffers::to_vec(msg)?;
sender
.send_message(&msg, MessageDestination::Specific(id.into()))
.map_err(Error::from)
}
fn raw_send_file_to<F: AsRawFd, S: MessageSender>(
sender: &S,
file: &F,
id: ChannelPublisherId,
) -> Result<(), Error> {
sender
.send_raw_fd(file.as_raw_fd(), MessageDestination::Specific(id.into()))
.map_err(Error::from)
}
pub fn channel<T: Serialize, U: DeserializeOwned>(
name: &str,
) -> Result<(Sender<T>, Receiver<U>), Error> {
let raw = MessageChannel::open(name, true)?;
let (raw_sender, raw_receiver) = raw.split();
Ok((
Sender { Sender {
inner: raw_sender, inner: sender,
_pd: PhantomData, _pd: PhantomData,
}, },
Receiver { Receiver {
inner: raw_receiver, inner: receiver,
buffer: [0; 1024], buffer: [0; 1024],
_pd: PhantomData, _pd: PhantomData,
}, },
)) )
}
#[cfg(target_os = "yggdrasil")]
pub fn listen<T: Serialize, U: DeserializeOwned>(
name: &str,
) -> Result<(Sender<T>, Receiver<U>), Error> {
let (sender, receiver) = sys::create_channel(name)?;
Ok(wrap(sender, receiver))
}
#[cfg(target_os = "yggdrasil")]
pub fn connect<T: Serialize, U: DeserializeOwned>(
name: &str,
) -> Result<(Sender<T>, Receiver<U>), Error> {
let (sender, receiver) = sys::create_channel(name)?;
Ok(wrap(sender, receiver))
}
#[cfg(any(unix, rust_analyzer))]
pub fn listen<T: Serialize, U: DeserializeOwned, P: AsRef<std::path::Path>>(
path: P,
) -> Result<(Sender<T>, Receiver<U>), Error> {
let (sender, receiver) = sys::listen(path)?;
Ok(wrap(sender, receiver))
}
#[cfg(any(unix, rust_analyzer))]
pub fn connect<T: Serialize, U: DeserializeOwned, P: AsRef<std::path::Path>>(
path: P,
) -> Result<(Sender<T>, Receiver<U>), Error> {
let (sender, receiver) = sys::connect(path)?;
Ok(wrap(sender, receiver))
} }
impl<T: Serialize> Sender<T> { impl<T: Serialize> Sender<T> {
pub fn send_to(&mut self, msg: &T, id: ChannelPublisherId) -> Result<(), Error> { pub fn send_to(&self, message: &T, peer: &PeerAddr) -> Result<(), Error> {
raw_send_message_to(&self.inner, msg, id) let message = flexbuffers::to_vec(message).map_err(Error::SerializeError)?;
self.inner
.send_message_to(&message, peer)
.map_err(Error::IoError)
} }
pub fn send_file(&mut self, fd: RawFd, id: ChannelPublisherId) -> Result<(), Error> { #[inline]
raw_send_file_to(&self.inner, &fd, id) pub fn send_to_host(&self, message: &T) -> Result<(), Error> {
self.send_to(message, self.inner.host_addr())
}
pub fn send_file_to<F: AsRawFd>(&self, file: &F, peer: &PeerAddr) -> Result<(), Error> {
self.inner.send_file_to(file, peer).map_err(Error::IoError)
} }
} }
impl<T: DeserializeOwned> Receiver<T> { impl<T: Serialize> AsRawFd for Sender<T> {
pub fn receive_message(&mut self) -> Result<(ChannelPublisherId, T), Error> { fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl<U: DeserializeOwned> Receiver<U> {
pub fn receive(&mut self) -> Result<(PeerAddr, Either<U, OwnedFd>), Error> {
let (peer, message) = self.inner.receive(&mut self.buffer)?;
let message = message
.try_map_left(|len| flexbuffers::from_slice(&self.buffer[..len]))
.map_err(Error::DeserializeError)?;
Ok((peer, message))
}
pub fn receive_message(&mut self) -> Result<(PeerAddr, U), Error> {
loop { loop {
let (id, message) = self.receive_raw()?; let (peer, message) = self.receive()?;
if let Message::Data(data) = message { if let Either::Left(message) = message {
break Ok((id, data)); break Ok((peer, message));
} }
} }
} }
pub fn receive_raw(&mut self) -> Result<(ChannelPublisherId, Message<T>), Error> { pub fn receive_file(&mut self) -> Result<(PeerAddr, OwnedFd), Error> {
let (id, metadata) = self.inner.receive_raw(&mut self.buffer)?; loop {
let (peer, message) = self.receive()?;
match metadata { if let Either::Right(fd) = message {
ReceivedMessageMetadata::Data(len) => { break Ok((peer, fd));
let msg = flexbuffers::from_slice(&self.buffer[..len])?;
Ok((id, Message::Data(msg)))
} }
ReceivedMessageMetadata::File(fd) => Ok((id, Message::File(fd))),
} }
} }
pub fn as_poll_fd(&self) -> RawFd {
self.as_raw_fd()
}
} }
impl<T: DeserializeOwned> AsRawFd for Receiver<T> { impl<T: DeserializeOwned> AsRawFd for Receiver<T> {
@ -122,21 +147,3 @@ impl<T: DeserializeOwned> AsRawFd for Receiver<T> {
self.inner.as_raw_fd() self.inner.as_raw_fd()
} }
} }
impl From<flexbuffers::SerializationError> for Error {
fn from(value: flexbuffers::SerializationError) -> Self {
Self::SerializeError(value)
}
}
impl From<flexbuffers::DeserializationError> for Error {
fn from(value: flexbuffers::DeserializationError) -> Self {
Self::DeserializeError(value)
}
}
impl From<io::Error> for Error {
fn from(value: io::Error) -> Self {
Self::IoError(value)
}
}

View File

@ -0,0 +1,36 @@
use std::{io, os::fd::{AsRawFd, OwnedFd}};
#[cfg(target_os = "yggdrasil")]
pub mod yggdrasil;
#[cfg(target_os = "yggdrasil")]
pub use yggdrasil::*;
#[cfg(any(unix, rust_analyzer))]
pub mod unix;
#[cfg(unix)]
pub use unix::*;
pub enum Either<T, U> {
Left(T),
Right(U)
}
pub trait IpcSender: AsRawFd {
fn send_message_to(&self, message: &[u8], peer: &PeerAddr) -> Result<(), io::Error>;
fn send_file_to<F: AsRawFd>(&self, file: &F, peer: &PeerAddr) -> Result<(), io::Error>;
fn host_addr(&self) -> &PeerAddr;
}
pub trait IpcReceiver: AsRawFd {
fn receive(&self, buffer: &mut [u8]) -> Result<(PeerAddr, Either<usize, OwnedFd>), io::Error>;
}
impl<T, U> Either<T, U> {
pub fn try_map_left<E, R, F: FnOnce(T) -> Result<R, E>>(self, map: F) -> Result<Either<R, U>, E> {
match self {
Self::Left(val) => map(val).map(Either::Left),
Self::Right(val) => Ok(Either::Right(val))
}
}
}

View File

@ -0,0 +1,210 @@
use std::{
io::{self, IoSlice, IoSliceMut},
os::{
fd::{AsRawFd, FromRawFd, OwnedFd, RawFd},
unix::net::{AncillaryData, SocketAddr, SocketAncillary, UnixDatagram},
},
path::Path,
sync::Arc,
};
use tempfile::{tempdir, NamedTempFile, TempDir, TempPath};
use crate::Either;
use super::{IpcReceiver, IpcSender};
#[derive(Debug, Clone)]
pub struct PeerAddr(SocketAddr);
struct Inner {
socket: UnixDatagram,
listener_addr: PeerAddr,
client_addr: Option<TempDir>,
}
pub struct RawSender {
inner: Arc<Inner>,
}
pub struct RawReceiver {
inner: Arc<Inner>,
}
impl AsRawFd for Inner {
fn as_raw_fd(&self) -> RawFd {
self.socket.as_raw_fd()
}
}
impl IpcSender for RawSender {
fn send_message_to(&self, message: &[u8], peer: &PeerAddr) -> Result<(), io::Error> {
self.inner.socket.send_to_addr(message, &peer.0).map(|_| ())
}
fn send_file_to<F: AsRawFd>(&self, file: &F, peer: &PeerAddr) -> Result<(), io::Error> {
let peer = peer.0.as_pathname().expect("Peer address is not a path");
let mut ancillary_buffer = [0; 128];
let mut ancillary = SocketAncillary::new(&mut ancillary_buffer);
ancillary.add_fds(&[file.as_raw_fd()]);
self.inner.socket.send_vectored_with_ancillary_to(&[], &mut ancillary, peer).map(|_| ())
}
fn host_addr(&self) -> &PeerAddr {
&self.inner.listener_addr
}
}
impl AsRawFd for RawReceiver {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl IpcReceiver for RawReceiver {
fn receive(&self, buffer: &mut [u8]) -> Result<(PeerAddr, Either<usize, OwnedFd>), io::Error> {
let mut ancillary_buffer = [0; 128];
let mut ancillary = SocketAncillary::new(&mut ancillary_buffer);
let (len, truncated, addr) = self
.inner
.socket
.recv_vectored_with_ancillary_from(&mut [IoSliceMut::new(buffer)], &mut ancillary)?;
let peer = PeerAddr(addr);
for message in ancillary.messages() {
if let Ok(AncillaryData::ScmRights(mut rights)) = message {
if let Some(fd) = rights.next() {
return Ok((peer, Either::Right(unsafe { OwnedFd::from_raw_fd(fd) })));
}
}
}
assert_ne!(len, 0);
Ok((peer, Either::Left(len)))
}
}
impl AsRawFd for RawSender {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
pub fn listen<P: AsRef<Path>>(path: P) -> Result<(RawSender, RawReceiver), io::Error> {
let socket = UnixDatagram::bind(&path)?;
let listener_addr = PeerAddr(socket.local_addr()?);
let inner = Arc::new(Inner {
client_addr: None,
socket,
listener_addr,
});
let sender = RawSender {
inner: inner.clone(),
};
let receiver = RawReceiver { inner };
Ok((sender, receiver))
}
pub fn connect<P: AsRef<Path>>(path: P) -> Result<(RawSender, RawReceiver), io::Error> {
let temp_dir = tempdir()?;
let client_addr = temp_dir.path().join("socket");
let socket = UnixDatagram::bind(&client_addr)?;
socket.connect(&path)?;
let listener_addr = PeerAddr(socket.peer_addr()?);
let inner = Arc::new(Inner {
client_addr: Some(temp_dir),
socket,
listener_addr,
});
let sender = RawSender {
inner: inner.clone(),
};
let receiver = RawReceiver { inner };
Ok((sender, receiver))
}
#[cfg(test)]
mod tests {
use std::{fs::File, io::{Read, Write}, os::{fd::{AsRawFd, FromRawFd}, unix::net::SocketAddr}};
use tempfile::{tempdir, NamedTempFile};
use crate::{sys::{IpcReceiver, IpcSender}, Either};
use super::{connect, listen, PeerAddr};
#[test]
fn peer_addr() {
let temp_dir = tempdir().unwrap();
let listen_path = temp_dir.path().join("listener");
let (lsender, _) = listen(&listen_path).unwrap();
let (csender, _) = connect(listen_path).unwrap();
let PeerAddr(l) = lsender.host_addr();
let PeerAddr(c) = csender.host_addr();
let lp = l.as_pathname().unwrap();
let cp = c.as_pathname().unwrap();
assert_eq!(lp, cp);
}
#[test]
fn receive_message() {
let temp_dir = tempdir().unwrap();
let listen_path = temp_dir.path().join("listener");
let (_, lreceiver) = listen(&listen_path).unwrap();
let (csender, _) = connect(listen_path).unwrap();
let msg = b"Hello";
csender.send_message_to(msg, csender.host_addr()).unwrap();
let mut buffer = [0; 512];
let (_, message) = lreceiver.receive(&mut buffer).unwrap();
let Either::Left(len) = message else {
panic!();
};
assert_eq!(len, msg.len());
assert_eq!(&buffer[..len], msg);
}
#[test]
fn receive_file() {
let text = b"Hello";
let temp_dir = tempdir().unwrap();
let temp_file = temp_dir.path().join("testfile");
{
let mut file = File::create(&temp_file).unwrap();
file.write_all(text).unwrap();
}
let listen_path = temp_dir.path().join("listener");
let (_, lreceiver) = listen(&listen_path).unwrap();
let (csender, _) = connect(listen_path).unwrap();
{
let file = File::open(&temp_file).unwrap();
csender.send_file_to(&file, csender.host_addr()).unwrap();
}
{
let mut buffer = [0; 512];
let (_, message) = lreceiver.receive(&mut buffer).unwrap();
let Either::Right(fd) = message else {
panic!();
};
let mut file = unsafe { File::from_raw_fd(fd.as_raw_fd()) };
let len = file.read(&mut buffer).unwrap();
assert_eq!(len, text.len());
assert_eq!(&buffer[..len], text);
}
}
}

View File

@ -0,0 +1,71 @@
use std::{
io,
os::{
fd::{AsRawFd, FromRawFd, OwnedFd, RawFd},
yggdrasil::io::message_channel::{
ChannelPublisherId, MessageChannel, MessageChannelReceiver, MessageChannelSender,
MessageDestination, MessageReceiver, MessageSender, ReceivedMessageMetadata,
},
},
};
use super::{Either, IpcReceiver, IpcSender};
#[derive(Debug, Clone)]
pub struct PeerAddr(ChannelPublisherId);
pub struct RawSender(MessageChannelSender);
pub struct RawReceiver(MessageChannelReceiver);
const HOST_ADDR: PeerAddr = PeerAddr(ChannelPublisherId::ZERO);
impl IpcSender for RawSender {
fn send_file_to<F: AsRawFd>(&self, file: &F, peer: &PeerAddr) -> Result<(), io::Error> {
self.0.send_raw_fd(
file.as_raw_fd(),
MessageDestination::Specific(peer.0.into()),
)
}
fn send_message_to(&self, message: &[u8], peer: &PeerAddr) -> Result<(), io::Error> {
self.0
.send_message(message, MessageDestination::Specific(peer.0.into()))
}
fn host_addr(&self) -> &PeerAddr {
// TODO
&HOST_ADDR
}
}
impl AsRawFd for RawSender {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
impl IpcReceiver for RawReceiver {
fn receive(&self, buffer: &mut [u8]) -> Result<(PeerAddr, Either<usize, OwnedFd>), io::Error> {
let (addr, metadata) = self.0.receive_raw(buffer)?;
let peer = PeerAddr(addr);
match metadata {
ReceivedMessageMetadata::File(fd) => {
Ok((peer, Either::Right(unsafe { OwnedFd::from_raw_fd(fd) })))
}
ReceivedMessageMetadata::Data(len) => Ok((peer, Either::Left(len))),
}
}
}
impl AsRawFd for RawReceiver {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
pub fn create_channel(name: &str) -> Result<(RawSender, RawReceiver), io::Error> {
let channel = MessageChannel::open(name, true)?;
let (sender, receiver) = channel.split();
Ok((RawSender(sender), RawReceiver(receiver)))
}