diff --git a/Cargo.lock b/Cargo.lock index 9254dda6..a5c73872 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2533,13 +2533,13 @@ dependencies = [ name = "ironrdp-dvc-pipe-proxy" version = "0.1.0" dependencies = [ + "async-trait", "ironrdp-core", "ironrdp-dvc", "ironrdp-pdu", "ironrdp-svc", + "tokio", "tracing", - "widestring", - "windows 0.61.3", ] [[package]] diff --git a/crates/ironrdp-dvc-pipe-proxy/Cargo.toml b/crates/ironrdp-dvc-pipe-proxy/Cargo.toml index 628e8d2d..924114ab 100644 --- a/crates/ironrdp-dvc-pipe-proxy/Cargo.toml +++ b/crates/ironrdp-dvc-pipe-proxy/Cargo.toml @@ -22,17 +22,8 @@ ironrdp-dvc = { path = "../ironrdp-dvc", version = "0.3" } ironrdp-svc = { path = "../ironrdp-svc", version = "0.4" } # public (SvcMessage type) tracing = { version = "0.1", features = ["log"] } - -[target.'cfg(windows)'.dependencies] -widestring = "1" -windows = { version = "0.61", features = [ - "Win32_Foundation", - "Win32_Security", - "Win32_System_Threading", - "Win32_Storage_FileSystem", - "Win32_System_Pipes", - "Win32_System_IO", -] } +tokio = { version = "1", features = ["net", "rt", "sync", "macros", "io-util"]} +async-trait = "0.1" [lints] workspace = true diff --git a/crates/ironrdp-dvc-pipe-proxy/src/error.rs b/crates/ironrdp-dvc-pipe-proxy/src/error.rs new file mode 100644 index 00000000..9014d010 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/error.rs @@ -0,0 +1,23 @@ +#[derive(Debug)] +pub(crate) enum DvcPipeProxyError { + Io(std::io::Error), + EncodeDvcMessage(ironrdp_core::EncodeError), +} + +impl core::fmt::Display for DvcPipeProxyError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + DvcPipeProxyError::Io(_) => write!(f, "IO error"), + DvcPipeProxyError::EncodeDvcMessage(_) => write!(f, "DVC message encoding error"), + } + } +} + +impl core::error::Error for DvcPipeProxyError { + fn source(&self) -> Option<&(dyn core::error::Error + 'static)> { + match self { + DvcPipeProxyError::Io(err) => Some(err), + DvcPipeProxyError::EncodeDvcMessage(src) => Some(src), + } + } +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/lib.rs b/crates/ironrdp-dvc-pipe-proxy/src/lib.rs index ac61cfc8..34049399 100644 --- a/crates/ironrdp-dvc-pipe-proxy/src/lib.rs +++ b/crates/ironrdp-dvc-pipe-proxy/src/lib.rs @@ -4,8 +4,11 @@ #[macro_use] extern crate tracing; -#[cfg(target_os = "windows")] -mod windows; - +mod error; +mod message; +mod os_pipe; mod platform; -pub use self::platform::DvcNamedPipeProxy; +mod proxy; +mod worker; + +pub use self::proxy::DvcNamedPipeProxy; diff --git a/crates/ironrdp-dvc-pipe-proxy/src/message.rs b/crates/ironrdp-dvc-pipe-proxy/src/message.rs new file mode 100644 index 00000000..3dbb330c --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/message.rs @@ -0,0 +1,22 @@ +use ironrdp_core::{ensure_size, Encode, EncodeResult}; +use ironrdp_dvc::DvcEncode; + +pub(crate) struct RawDataDvcMessage(pub Vec); + +impl Encode for RawDataDvcMessage { + fn encode(&self, dst: &mut ironrdp_core::WriteCursor<'_>) -> EncodeResult<()> { + ensure_size!(in: dst, size: self.size()); + dst.write_slice(&self.0); + Ok(()) + } + + fn name(&self) -> &'static str { + "RawDataDvcMessage" + } + + fn size(&self) -> usize { + self.0.len() + } +} + +impl DvcEncode for RawDataDvcMessage {} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/os_pipe.rs b/crates/ironrdp-dvc-pipe-proxy/src/os_pipe.rs new file mode 100644 index 00000000..e11d8300 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/os_pipe.rs @@ -0,0 +1,19 @@ +use async_trait::async_trait; + +use crate::error::DvcPipeProxyError; + +#[async_trait] +pub(crate) trait OsPipe: Send + Sync { + /// Creates a new OS pipe and waits for the connection. + async fn connect(pipe_name: &str) -> Result + where + Self: Sized; + + /// Reads data from the pipe and returns the number of bytes read. + /// + /// Returned future should be stateless and can be polled multiple times. + async fn read(&mut self, buffer: &mut [u8]) -> Result; + + /// Writes data to the pipe and returns the number of bytes written. + async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError>; +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/mod.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/mod.rs index 5cec8ee4..346a06db 100644 --- a/crates/ironrdp-dvc-pipe-proxy/src/platform/mod.rs +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/mod.rs @@ -1,9 +1,5 @@ #[cfg(target_os = "windows")] -mod windows; -#[cfg(target_os = "windows")] -pub use self::windows::DvcNamedPipeProxy; +pub(crate) mod windows; #[cfg(not(target_os = "windows"))] -mod unix; -#[cfg(not(target_os = "windows"))] -pub use self::unix::DvcNamedPipeProxy; +pub(crate) mod unix; diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs index e0a221e8..4950618f 100644 --- a/crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/unix.rs @@ -1,48 +1,66 @@ -use ironrdp_core::impl_as_any; -use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor}; -use ironrdp_pdu::{pdu_other_err, PduResult}; -use ironrdp_svc::SvcMessage; +use async_trait::async_trait; +use tokio::fs; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; -/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server. -pub struct DvcNamedPipeProxy { - channel_name: String, +use crate::error::DvcPipeProxyError; +use crate::os_pipe::OsPipe; + +/// Unix-specific implementation of the OS pipe trait. +pub(crate) struct UnixPipe { + socket: tokio::net::UnixStream, } -impl DvcNamedPipeProxy { - /// Creates a new DVC named pipe proxy. - /// `dvc_write_callback` is called when the proxy receives a DVC message from the - /// named pipe server and the SVC message is ready to be sent to the DVC channel in the main - /// IronRDP active session loop. - pub fn new(channel_name: &str, _named_pipe_name: &str, _dvc_write_callback: F) -> Self - where - F: Fn(u32, Vec) -> PduResult<()> + Send + 'static, - { - error!("DvcNamedPipeProxy is not implemented on Unix-like systems, using a stub implementation"); +#[async_trait] +impl OsPipe for UnixPipe { + async fn connect(pipe_name: &str) -> Result { + // Domain socket file could already exist from a previous run. + match fs::metadata(&pipe_name).await { + Ok(metadata) => { + use std::os::unix::fs::FileTypeExt as _; - Self { - channel_name: channel_name.to_owned(), + info!( + %pipe_name, + "DVC pipe already exists, removing stale file." + ); + + // Just to be sure, check if it's indeed a socket - + // throw an error if calling code accidentally passed a regular file. + if !metadata.file_type().is_socket() { + return Err(DvcPipeProxyError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Path {pipe_name} is not a socket"), + ))); + } + + fs::remove_file(pipe_name).await.map_err(DvcPipeProxyError::Io)?; + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + trace!( + %pipe_name, + "DVC pipe does not exist, creating it." + ); + } + Err(e) => { + return Err(DvcPipeProxyError::Io(e)); + } } + + let listener = tokio::net::UnixListener::bind(pipe_name).map_err(DvcPipeProxyError::Io)?; + + let (socket, _) = listener.accept().await.map_err(DvcPipeProxyError::Io)?; + + Ok(Self { socket }) + } + + async fn read(&mut self, buffer: &mut [u8]) -> Result { + self.socket.read(buffer).await.map_err(DvcPipeProxyError::Io) + } + + async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError> { + self.socket + .write_all(buffer) + .await + .map_err(DvcPipeProxyError::Io) + .map(|_| ()) } } - -impl_as_any!(DvcNamedPipeProxy); - -impl DvcProcessor for DvcNamedPipeProxy { - fn channel_name(&self) -> &str { - &self.channel_name - } - - fn start(&mut self, _channel_id: u32) -> PduResult> { - Err(pdu_other_err!( - "DvcNamedPipeProxy is not implemented on Unix-like systems" - )) - } - - fn process(&mut self, _channel_id: u32, _payload: &[u8]) -> PduResult> { - Err(pdu_other_err!( - "DvcNamedPipeProxy is not implemented on Unix-like systems" - )) - } -} - -impl DvcClientProcessor for DvcNamedPipeProxy {} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows.rs new file mode 100644 index 00000000..4c2b9c27 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows.rs @@ -0,0 +1,47 @@ +use async_trait::async_trait; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +use tokio::net::windows::named_pipe; + +use crate::error::DvcPipeProxyError; +use crate::os_pipe::OsPipe; + +const PIPE_BUFFER_SIZE: u32 = 64 * 1024; + +/// Unix-specific implementation of the OS pipe trait. +pub(crate) struct WindowsPipe { + pipe_server: named_pipe::NamedPipeServer, +} + +#[async_trait] +impl OsPipe for WindowsPipe { + async fn connect(pipe_name: &str) -> Result { + let pipe_name = format!("\\\\.\\pipe\\{pipe_name}"); + + let pipe_server = named_pipe::ServerOptions::new() + .first_pipe_instance(true) + .access_inbound(true) + .access_outbound(true) + .max_instances(2) + .in_buffer_size(PIPE_BUFFER_SIZE) + .out_buffer_size(PIPE_BUFFER_SIZE) + .pipe_mode(named_pipe::PipeMode::Message) + .create(pipe_name) + .map_err(DvcPipeProxyError::Io)?; + + pipe_server.connect().await.map_err(DvcPipeProxyError::Io)?; + + Ok(Self { pipe_server }) + } + + async fn read(&mut self, buffer: &mut [u8]) -> Result { + self.pipe_server.read(buffer).await.map_err(DvcPipeProxyError::Io) + } + + async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError> { + self.pipe_server + .write_all(buffer) + .await + .map_err(DvcPipeProxyError::Io) + .map(|_| ()) + } +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/error.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/error.rs deleted file mode 100644 index b0e7ccd1..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/error.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::windows::WindowsError; - -#[derive(Debug)] -pub(crate) enum DvcPipeProxyError { - Windows(WindowsError), - MpscIo, - DvcIncompleteWrite, - EncodeDvcMessage, -} - -impl From for DvcPipeProxyError { - fn from(err: WindowsError) -> Self { - DvcPipeProxyError::Windows(err) - } -} - -impl core::fmt::Display for DvcPipeProxyError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - DvcPipeProxyError::Windows(err) => err.fmt(f), - DvcPipeProxyError::MpscIo => write!(f, "MPSC IO error"), - DvcPipeProxyError::DvcIncompleteWrite => write!(f, "DVC incomplete write"), - DvcPipeProxyError::EncodeDvcMessage => write!(f, "DVC message encoding error"), - } - } -} - -impl core::error::Error for DvcPipeProxyError { - fn source(&self) -> Option<&(dyn core::error::Error + 'static)> { - match self { - DvcPipeProxyError::Windows(err) => Some(err), - DvcPipeProxyError::MpscIo => None, - DvcPipeProxyError::DvcIncompleteWrite => None, - DvcPipeProxyError::EncodeDvcMessage => None, - } - } -} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/mod.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/mod.rs deleted file mode 100644 index cebceede..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/mod.rs +++ /dev/null @@ -1,149 +0,0 @@ -mod error; -mod worker; - -use std::sync::mpsc; - -use ironrdp_core::impl_as_any; -use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor}; -use ironrdp_pdu::{pdu_other_err, PduResult}; -use ironrdp_svc::SvcMessage; - -use crate::platform::windows::error::DvcPipeProxyError; -use crate::platform::windows::worker::{worker_thread_func, OnWriteDvcMessage, WorkerCtx}; -use crate::windows::{Event, MessagePipeServer, Semaphore}; - -const IO_MPSC_CHANNEL_SIZE: usize = 100; - -struct WorkerControlCtx { - to_pipe_tx: mpsc::SyncSender>, - to_pipe_semaphore: Semaphore, - abort_event: Event, -} - -/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server. -pub struct DvcNamedPipeProxy { - channel_name: String, - named_pipe_name: String, - dvc_write_callback: Option, - worker_control_ctx: Option, -} - -impl DvcNamedPipeProxy { - /// Creates a new DVC named pipe proxy. - /// `dvc_write_callback` is called when the proxy receives a DVC message from the - /// named pipe server and the SVC message is ready to be sent to the DVC channel in the main - /// IronRDP active session loop. - pub fn new(channel_name: &str, named_pipe_name: &str, dvc_write_callback: F) -> Self - where - F: Fn(u32, Vec) -> PduResult<()> + Send + 'static, - { - let named_pipe_name = format!("\\\\.\\pipe\\{named_pipe_name}"); - - Self { - channel_name: channel_name.to_owned(), - named_pipe_name, - dvc_write_callback: Some(Box::new(dvc_write_callback)), - worker_control_ctx: None, - } - } -} - -impl_as_any!(DvcNamedPipeProxy); - -impl Drop for DvcNamedPipeProxy { - fn drop(&mut self) { - if let Some(ctx) = &self.worker_control_ctx { - // Signal the worker thread to abort. - ctx.abort_event.set().ok(); - } - } -} - -impl DvcNamedPipeProxy { - fn start_impl(&mut self, channel_id: u32) -> Result<(), DvcPipeProxyError> { - // PIPE -> DVC channel - handled via callback passed to the constructor. - // DVC -> PIPE channel - handled via mpsc internally in the worker thread. - let (to_pipe_tx, to_pipe_rx) = mpsc::sync_channel(IO_MPSC_CHANNEL_SIZE); - - let semaphore_max_count = IO_MPSC_CHANNEL_SIZE - .try_into() - .expect("Channel size is too large for underlying WinAPI semaphore"); - - let to_pipe_semaphore = Semaphore::new_unnamed(0, semaphore_max_count)?; - - let abort_event = Event::new_unnamed()?; - - let worker_control_ctx = WorkerControlCtx { - to_pipe_tx, - to_pipe_semaphore: to_pipe_semaphore.clone(), - abort_event: abort_event.clone(), - }; - - let pipe = MessagePipeServer::new(&self.named_pipe_name)?; - - let dvc_write_callback = self - .dvc_write_callback - .take() - .expect("DVC write callback already taken"); - - let worker_ctx = WorkerCtx { - pipe, - to_pipe_rx, - to_pipe_semaphore, - abort_event, - dvc_write_callback, - pipe_name: self.named_pipe_name.clone(), - channel_name: self.channel_name.clone(), - channel_id, - }; - - let pipe_name = self.named_pipe_name.clone(); - let channel_name = self.channel_name.clone(); - - self.worker_control_ctx = Some(worker_control_ctx); - - std::thread::spawn(move || { - if let Err(error) = worker_thread_func(worker_ctx) { - error!(%error, %pipe_name, %channel_name, "DVC pipe proxy worker thread failed"); - } - }); - - Ok(()) - } -} - -impl DvcProcessor for DvcNamedPipeProxy { - fn channel_name(&self) -> &str { - &self.channel_name - } - - fn start(&mut self, channel_id: u32) -> PduResult> { - self.start_impl(channel_id) - .map_err(|e| pdu_other_err!("dvc named pipe proxy failed", source: e))?; - - Ok(Vec::new()) - } - - fn process(&mut self, _channel_id: u32, payload: &[u8]) -> PduResult> { - // Send the payload to the worker thread via the mpsc channel. - let ctx = match &self.worker_control_ctx { - Some(ctx) => ctx, - None => { - return Err(pdu_other_err!("DVC pipe proxy not started")); - } - }; - - ctx.to_pipe_tx - .send(payload.to_vec()) - .map_err(|_| pdu_other_err!("DVC pipe proxy send failed"))?; - - // Signal WinAPI-based worker IO loop. - ctx.to_pipe_semaphore - .release(1) - .map_err(|_| pdu_other_err!("DVC pipe proxy semaphore release failed"))?; - - Ok(Vec::new()) - } -} - -impl DvcClientProcessor for DvcNamedPipeProxy {} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/worker.rs b/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/worker.rs deleted file mode 100644 index 5edab7cf..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/platform/windows/worker.rs +++ /dev/null @@ -1,172 +0,0 @@ -use std::sync::mpsc; - -use ironrdp_core::{ensure_size, Encode, EncodeResult}; -use ironrdp_dvc::{encode_dvc_messages, DvcEncode}; -use ironrdp_pdu::PduResult; -use ironrdp_svc::{ChannelFlags, SvcMessage}; - -use crate::platform::windows::error::DvcPipeProxyError; -use crate::windows::{wait_any, wait_any_with_timeout, Event, MessagePipeServer, Semaphore, WindowsError}; - -const PIPE_CONNECT_TIMEOUT_SECS: u32 = 10_000; // 10 seconds -const PIPE_WRITE_TIMEOUT_SECS: u32 = 3_000; // 3 seconds -const MESSAGE_BUFFER_SIZE: usize = 64 * 1024; // 64 KiB - -pub(crate) type OnWriteDvcMessage = Box) -> PduResult<()> + Send>; - -pub(crate) struct WorkerCtx { - pub pipe: MessagePipeServer, - pub to_pipe_rx: mpsc::Receiver>, - pub to_pipe_semaphore: Semaphore, - pub abort_event: Event, - pub dvc_write_callback: OnWriteDvcMessage, - pub pipe_name: String, - pub channel_name: String, - pub channel_id: u32, -} - -pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipeProxyError> { - let WorkerCtx { - mut pipe, - to_pipe_rx, - to_pipe_semaphore, - abort_event, - dvc_write_callback, - pipe_name, - channel_name, - channel_id, - } = worker_ctx; - - info!(%channel_name, %pipe_name, "Connecting DVC pipe proxy"); - - { - let mut connect_ctx = pipe.prepare_connect_overlapped()?; - - if !connect_ctx.overlapped_connect()? { - const EVENT_ID_ABORT: usize = 0; - let events = [abort_event.borrow(), connect_ctx.borrow_event()]; - let wait_result = match wait_any_with_timeout(&events, PIPE_CONNECT_TIMEOUT_SECS) { - Ok(idx) => idx, - Err(WindowsError::WaitForMultipleObjectsTimeout) => { - warn!(%channel_name, %pipe_name, "DVC pipe proxy connection timed out"); - return Ok(()); - } - Err(err) => { - return Err(DvcPipeProxyError::Windows(err)); - } - }; - - if wait_result == EVENT_ID_ABORT { - info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted"); - return Ok(()); - } - - connect_ctx.get_result()?; - } - } - - info!(%channel_name, %pipe_name, "DVC pipe proxy connected"); - - let mut read_ctx = pipe.prepare_read_overlapped(MESSAGE_BUFFER_SIZE)?; - - const EVENT_ID_ABORT: usize = 0; - const EVENT_ID_READ: usize = 1; - const EVENT_ID_WRITE_MPSC: usize = 2; - - read_ctx.overlapped_read()?; - - info!(%channel_name, %pipe_name, "DVC pipe proxy IO loop started"); - - loop { - let events = [ - abort_event.borrow(), - read_ctx.borrow_event(), - to_pipe_semaphore.borrow(), - ]; - let wait_result = wait_any(&events)?; - - if wait_result == EVENT_ID_ABORT { - info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted"); - return Ok(()); - } - - // Read end of pipe is ready, forward received data to DVC. - if wait_result == EVENT_ID_READ { - let read_result = read_ctx.get_result()?.to_vec(); - - trace!(%channel_name, %pipe_name, "DVC proxy read {} bytes from pipe", read_result.len()); - - if !read_result.is_empty() { - let messages = encode_dvc_messages( - channel_id, - vec![Box::new(RawDataDvcMessage(read_result))], - ChannelFlags::empty(), - ) - .map_err(|_| DvcPipeProxyError::EncodeDvcMessage)?; - - if let Err(err) = dvc_write_callback(0, messages) { - error!(%err, %channel_name, %pipe_name, "DVC pipe proxy write callback failed"); - } - } - - // Queue the read operation again. - read_ctx.overlapped_read()?; - continue; - } - - // DVC data received, forward it to the pipe. - if wait_result == EVENT_ID_WRITE_MPSC { - let payload = to_pipe_rx.recv().map_err(|_| DvcPipeProxyError::MpscIo)?; - - let payload_len = payload.len(); - - if payload_len == 0 { - warn!(%channel_name, %pipe_name, "Rejected empty DVC data (not sent to pipe)"); - continue; - } - - trace!(%channel_name, %pipe_name, "DVC proxy write {} bytes to pipe,", payload_len); - - let mut overlapped_write = pipe.prepare_write_overlapped(payload)?; - - overlapped_write.overlapped_write()?; - - let events = [abort_event.borrow(), overlapped_write.borrow_event()]; - let wait_result = wait_any_with_timeout(&events, PIPE_WRITE_TIMEOUT_SECS)?; - - if wait_result == EVENT_ID_ABORT { - info!(%channel_name, %pipe_name, "DVC pipe proxy write aborted"); - return Ok(()); - } - - let bytes_written = overlapped_write.get_result()?; - - if bytes_written as usize != payload_len { - // Message-based pipe write failed. - return Err(DvcPipeProxyError::DvcIncompleteWrite); - } - - continue; - } - } -} - -struct RawDataDvcMessage(Vec); - -impl Encode for RawDataDvcMessage { - fn encode(&self, dst: &mut ironrdp_core::WriteCursor<'_>) -> EncodeResult<()> { - ensure_size!(in: dst, size: self.size()); - dst.write_slice(&self.0); - Ok(()) - } - - fn name(&self) -> &'static str { - "RawDataDvcMessage" - } - - fn size(&self) -> usize { - self.0.len() - } -} - -impl DvcEncode for RawDataDvcMessage {} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/proxy.rs b/crates/ironrdp-dvc-pipe-proxy/src/proxy.rs new file mode 100644 index 00000000..07bb2720 --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/proxy.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; + +use ironrdp_core::impl_as_any; +use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor}; +use ironrdp_pdu::{pdu_other_err, PduResult}; +use ironrdp_svc::SvcMessage; + +use crate::worker::{run_worker, OnWriteDvcMessage, WorkerCtx}; + +const IO_MPSC_CHANNEL_SIZE: usize = 100; + +struct WorkerControlCtx { + to_pipe_tx: tokio::sync::mpsc::Sender>, + abort_event: Arc, +} + +/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server. +pub struct DvcNamedPipeProxy { + channel_name: String, + named_pipe_name: String, + dvc_write_callback: Option, + worker: Option, +} + +impl DvcNamedPipeProxy { + /// Creates a new DVC named pipe proxy. + /// `dvc_write_callback` is called when the proxy receives a DVC message from the + /// named pipe server and the SVC message is ready to be sent to the DVC channel in the main + /// IronRDP active session loop. + pub fn new(channel_name: &str, named_pipe_name: &str, dvc_write_callback: F) -> Self + where + F: Fn(u32, Vec) -> PduResult<()> + Send + 'static, + { + Self { + channel_name: channel_name.to_owned(), + named_pipe_name: named_pipe_name.to_owned(), + dvc_write_callback: Some(Box::new(dvc_write_callback)), + worker: None, + } + } +} + +impl_as_any!(DvcNamedPipeProxy); + +impl DvcProcessor for DvcNamedPipeProxy { + fn channel_name(&self) -> &str { + &self.channel_name + } + + fn start(&mut self, channel_id: u32) -> PduResult> { + info!(%self.channel_name, %self.named_pipe_name, "Starting DVC named pipe proxy"); + + let on_write_dvc = self + .dvc_write_callback + .take() + .expect("DvcProcessor::start called multiple times"); + + let (to_pipe_tx, to_pipe_rx) = tokio::sync::mpsc::channel(IO_MPSC_CHANNEL_SIZE); + + let abort_event = Arc::new(tokio::sync::Notify::new()); + + let ctx = WorkerCtx { + on_write_dvc, + to_pipe_rx, + abort_event: Arc::clone(&abort_event), + pipe_name: self.named_pipe_name.clone(), + channel_name: self.channel_name.clone(), + channel_id, + }; + + self.worker = Some(WorkerControlCtx { + to_pipe_tx, + abort_event, + }); + + #[cfg(not(target_os = "windows"))] + run_worker::(ctx); + + #[cfg(target_os = "windows")] + run_worker::(ctx); + + Ok(vec![]) + } + + fn process(&mut self, _channel_id: u32, payload: &[u8]) -> PduResult> { + if let Some(worker) = &self.worker { + if let Err(error) = worker.to_pipe_tx.try_send(payload.to_vec()) { + match error { + tokio::sync::mpsc::error::TrySendError::Full(_) => { + return Err(pdu_other_err!("DVC pipe proxy channel is full")); + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + return Err(pdu_other_err!("DVC pipe proxy channel is closed")); + } + } + } + } else { + debug!("Attempt to process DVC packet on non-initialized DVC pipe proxy."); + } + + Ok(vec![]) + } +} + +impl DvcClientProcessor for DvcNamedPipeProxy {} + +impl Drop for DvcNamedPipeProxy { + fn drop(&mut self) { + if let Some(ctx) = &self.worker { + // Signal the worker thread to abort. + ctx.abort_event.notify_one(); + } + self.worker = None; + } +} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/error.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/error.rs deleted file mode 100644 index b0dacb2c..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/windows/error.rs +++ /dev/null @@ -1,58 +0,0 @@ -#[derive(Debug)] -pub(crate) enum WindowsError { - CreateNamedPipe(windows::core::Error), - CreateEvent(windows::core::Error), - SetEvent(windows::core::Error), - ReleaseSemaphore(windows::core::Error), - InvalidSemaphoreParams(&'static str), - WaitForMultipleObjectsFailed(windows::core::Error), - WaitForMultipleObjectsTimeout, - WaitForMultipleObjectsAbandoned(u32), - OverlappedConnect(windows::core::Error), - OverlappedRead(windows::core::Error), - OverlappedWrite(windows::core::Error), - CreateSemaphore(windows::core::Error), - InvalidPipeName(String), -} - -impl core::fmt::Display for WindowsError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - WindowsError::CreateNamedPipe(_) => write!(f, "failed to create named pipe"), - WindowsError::CreateEvent(_) => write!(f, "failed to create event object"), - WindowsError::SetEvent(_) => write!(f, "failed to set event to signaled state"), - WindowsError::InvalidSemaphoreParams(cause) => write!(f, "invalid semaphore parameters: {cause}"), - WindowsError::ReleaseSemaphore(_) => write!(f, "failed to release semaphore"), - WindowsError::WaitForMultipleObjectsFailed(_) => write!(f, "failed to wait for multiple objects"), - WindowsError::WaitForMultipleObjectsTimeout => write!(f, "timed out waiting for multiple objects"), - WindowsError::WaitForMultipleObjectsAbandoned(idx) => { - write!(f, "wait for multiple objects failed, handle #{idx} was abandoned") - } - WindowsError::OverlappedConnect(_) => write!(f, "overlapped connect failed"), - WindowsError::OverlappedRead(_) => write!(f, "overlapped read failed"), - WindowsError::OverlappedWrite(_) => write!(f, "overlapped write failed"), - WindowsError::CreateSemaphore(_) => write!(f, "failed to create semaphore object"), - WindowsError::InvalidPipeName(cause) => write!(f, "invalid pipe name: `{cause}`"), - } - } -} - -impl core::error::Error for WindowsError { - fn source(&self) -> Option<&(dyn core::error::Error + 'static)> { - match self { - WindowsError::CreateNamedPipe(err) - | WindowsError::SetEvent(err) - | WindowsError::ReleaseSemaphore(err) - | WindowsError::WaitForMultipleObjectsFailed(err) - | WindowsError::OverlappedConnect(err) - | WindowsError::OverlappedRead(err) - | WindowsError::OverlappedWrite(err) - | WindowsError::CreateSemaphore(err) => Some(err), - WindowsError::CreateEvent(err) => Some(err), - WindowsError::InvalidSemaphoreParams(_) - | WindowsError::WaitForMultipleObjectsTimeout - | WindowsError::InvalidPipeName(_) => None, - WindowsError::WaitForMultipleObjectsAbandoned(_) => None, - } - } -} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/event.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/event.rs deleted file mode 100644 index d0421df5..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/windows/event.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::sync::Arc; - -use windows::core::Owned; -use windows::Win32::Foundation::HANDLE; -use windows::Win32::System::Threading::{CreateEventW, SetEvent}; - -use crate::windows::{BorrowedHandle, WindowsError}; - -/// RAII wrapper for WinAPI event handle. -#[derive(Debug, Clone)] -pub(crate) struct Event { - handle: Arc>, -} - -// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via -// Event wrapper API itself by restricting handle usage: -// - set() method which calls SetEvent inside (which is thread-safe). -// - borrow() method which returns a BorrowedHandle for waiting on the event. -// - Handle lifetime is ensured by Arc, so it is always valid when used. -unsafe impl Send for Event {} - -impl Event { - pub(crate) fn new_unnamed() -> Result { - // SAFETY: FFI call with no outstanding preconditions. - let handle = unsafe { CreateEventW(None, false, false, None).map_err(WindowsError::CreateEvent)? }; - - // SAFETY: Handle is valid and we are the owner of the handle. - let handle = unsafe { Owned::new(handle) }; - - // CreateEventW returns a valid handle on success. - Ok(Self { - // See `unsafe impl Send` comment. - #[expect(clippy::arc_with_non_send_sync)] - handle: Arc::new(handle), - }) - } - - pub(crate) fn set(&self) -> Result<(), WindowsError> { - // SAFETY: The handle is valid and we are the owner of the handle. - unsafe { - SetEvent(self.raw()).map_err(WindowsError::SetEvent)?; - } - Ok(()) - } - - pub(super) fn raw(&self) -> HANDLE { - **self.handle - } - - pub(crate) fn borrow(&self) -> BorrowedHandle<'_> { - BorrowedHandle(&self.handle) - } -} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/mod.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/mod.rs deleted file mode 100644 index 306296a6..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/windows/mod.rs +++ /dev/null @@ -1,101 +0,0 @@ -//! WinAPI wrappers for the DVC pipe proxy IO loop logic. -//! -//! Some of the wrappers are based on `win-api-wrappers` code (simplified/reduced functionality). - -use windows::Win32::Foundation::{ - ERROR_IO_PENDING, HANDLE, WAIT_ABANDONED_0, WAIT_EVENT, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT, -}; -use windows::Win32::System::Threading::{WaitForMultipleObjects, INFINITE}; - -mod error; -pub(crate) use self::error::WindowsError; - -mod event; -pub(crate) use self::event::Event; - -mod pipe; -pub(crate) use self::pipe::MessagePipeServer; - -mod semaphore; -pub(crate) use self::semaphore::Semaphore; - -/// Thin wrapper around borrowed `windows` crate `HANDLE` reference. -/// This is used to ensure handle lifetime when passing it to FFI functions -/// (see `wait_any_with_timeout` for example). -#[repr(transparent)] -pub(crate) struct BorrowedHandle<'a>(&'a HANDLE); - -/// Safe wrapper around `WaitForMultipleObjects`. -pub(crate) fn wait_any_with_timeout(handles: &[BorrowedHandle<'_>], timeout: u32) -> Result { - let handles = cast_handles(handles); - - // SAFETY: - // - BorrowedHandle alongside with rust type system ensures that the HANDLEs are valid for - // the duration of the call. - // - All handles in this module have SYNCHRONIZE access rights. - // - cast_handles ensures no handle duplicates. - let result = unsafe { WaitForMultipleObjects(handles, false, timeout) }; - - match result { - WAIT_FAILED => Err(WindowsError::WaitForMultipleObjectsFailed( - windows::core::Error::from_win32(), - )), - WAIT_TIMEOUT => Err(WindowsError::WaitForMultipleObjectsTimeout), - WAIT_EVENT(idx) if idx >= WAIT_ABANDONED_0.0 => { - let idx = idx - WAIT_ABANDONED_0.0; - Err(WindowsError::WaitForMultipleObjectsAbandoned(idx)) - } - WAIT_EVENT(id) => Ok((id - WAIT_OBJECT_0.0) as usize), - } -} - -/// Safe `WaitForMultipleObjects` wrapper with infinite timeout. -pub(crate) fn wait_any(handles: &[BorrowedHandle<'_>]) -> Result { - // Standard generic syntax is used instead if `impl` because of the following lint: - // > warning: lifetime parameter `'a` only used once - // - // Fixing this lint (use of '_ lifetime) produces compiler error. - wait_any_with_timeout(handles, INFINITE) -} - -fn cast_handles<'a>(handles: &'a [BorrowedHandle<'a>]) -> &'a [HANDLE] { - // Very basic sanity checks to ensure that the handles are valid - // and there are no duplicates. - // This is only done in debug builds to avoid performance overhead in release builds, while - // still catching undefined behavior early in development. - #[cfg(debug_assertions)] - { - // Ensure that there are no duplicate handles without hash. - for (i, handle) in handles.iter().enumerate() { - for other_handle in &handles[i + 1..] { - if handle.0 == other_handle.0 { - panic!("Duplicate handle found in wait_any_with_timeout"); - } - } - } - } - - for handle in handles { - // Ensure that the handle is valid. - if handle.0.is_invalid() { - panic!("Invalid handle in wait_any_with_timeout"); - } - } - - // SAFETY: - // - BorrowedHandle is #[repr(transparent)] over *const c_void, and so is HANDLE, - // so the layout is the same. - // - We ensure the lifetime is preserved. - unsafe { core::slice::from_raw_parts(handles.as_ptr() as *const HANDLE, handles.len()) } -} - -/// Maps ERROR_IO_PENDING to Ok(()) and returns other errors as is. -fn ensure_overlapped_io_result(result: windows::core::Result<()>) -> Result, WindowsError> { - if let Err(error) = &result { - if error.code() == ERROR_IO_PENDING.to_hresult() { - return Ok(Ok(())); - } - } - - Ok(result) -} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/pipe.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/pipe.rs deleted file mode 100644 index 6a7e69b4..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/windows/pipe.rs +++ /dev/null @@ -1,291 +0,0 @@ -use core::ops::DerefMut as _; -use core::pin::Pin; - -use windows::core::{Owned, PCWSTR}; -use windows::Win32::Foundation::{ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, HANDLE}; -use windows::Win32::Storage::FileSystem::{ - ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, -}; -use windows::Win32::System::Pipes::{ - ConnectNamedPipe, CreateNamedPipeW, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_WAIT, -}; -use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; - -use crate::windows::{ensure_overlapped_io_result, BorrowedHandle, Event, WindowsError}; - -const PIPE_INSTANCES: u32 = 2; -const PIPE_BUFFER_SIZE: u32 = 64 * 1024; // 64KB -const DEFAULT_PIPE_TIMEOUT: u32 = 10_000; // 10 seconds - -/// RAII wrapper for WinAPI named pipe server. -#[derive(Debug)] -pub(crate) struct MessagePipeServer { - handle: Owned, - connected: bool, -} - -/// SAFETY: It is safe to send pipe HANDLE between threads. -unsafe impl Send for MessagePipeServer {} - -impl MessagePipeServer { - /// Creates a new named pipe server. - pub(crate) fn new(name: &str) -> Result { - // Create a named pipe with the specified name. - let lpname = - widestring::U16CString::from_str(name).map_err(|_| WindowsError::InvalidPipeName(name.to_owned()))?; - - // SAFETY: lpname is a valid pointer to a null-terminated wide string. - let handle = unsafe { - CreateNamedPipeW( - PCWSTR(lpname.as_ptr()), - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, - PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, - PIPE_INSTANCES, - PIPE_BUFFER_SIZE, - PIPE_BUFFER_SIZE, - DEFAULT_PIPE_TIMEOUT, - None, - ) - }; - - // `windows` crate API inconsistency: CreateNamedPipeW returns invalid handle on error - // instead of Result::Err. - if handle.is_invalid() { - return Err(WindowsError::CreateNamedPipe(windows::core::Error::from_win32())); - } - - // SAFETY: Handle is valid and we are the owner of the handle. - let handle = unsafe { Owned::new(handle) }; - - Ok(Self { - handle, - connected: false, - }) - } - - fn raw(&self) -> HANDLE { - *self.handle - } - - /// Initializes context for overlapped connect operation. - pub(crate) fn prepare_connect_overlapped(&mut self) -> Result, WindowsError> { - OverlappedPipeConnectCtx::new(self) - } - - /// Initializes context for overlapped read operation. - pub(crate) fn prepare_read_overlapped( - &self, - buffer_size: usize, - ) -> Result, WindowsError> { - OverlappedPipeReadCtx::new(self, buffer_size) - } - - /// Initializes context for overlapped write operation. - pub(crate) fn prepare_write_overlapped(&self, data: Vec) -> Result, WindowsError> { - OverlappedWriteCtx::new(self, data) - } -} - -pub(crate) struct OverlappedPipeConnectCtx<'a> { - pipe: &'a mut MessagePipeServer, - overlapped: Pin>, - event: Event, -} - -impl<'a> OverlappedPipeConnectCtx<'a> { - fn new(pipe: &'a mut MessagePipeServer) -> Result { - let event = Event::new_unnamed()?; - - let overlapped = Box::pin(OVERLAPPED { - hEvent: event.raw(), - ..Default::default() - }); - - Ok(Self { - pipe, - overlapped, - event, - }) - } - - /// Connects to the named pipe server. - /// Returns true if pipe is already connected prior to this call and no additional - /// overlapped io is needed. If false is returned, the caller should call `get_result()` to - /// after returned event handle is signaled to complete the connection. - pub(crate) fn overlapped_connect(&mut self) -> Result { - // SAFETY: The handle is valid and we are the owner of the handle. - let result = unsafe { ConnectNamedPipe(self.pipe.raw(), Some(self.overlapped.deref_mut() as *mut OVERLAPPED)) }; - - match result { - Ok(()) => { - self.pipe.connected = true; - Ok(true) - } - Err(error) => { - if error.code() == ERROR_PIPE_CONNECTED.to_hresult() { - // The pipe is already connected. - self.pipe.connected = true; - Ok(true) - } else if error.code() == ERROR_IO_PENDING.to_hresult() { - // Overlapped I/O is pending. - Ok(false) - } else { - // Connection failed. - Err(WindowsError::OverlappedConnect(error)) - } - } - } - } - - pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> { - self.event.borrow() - } - - pub(crate) fn get_result(&mut self) -> Result<(), WindowsError> { - let mut bytes_read = 0u32; - - // SAFETY: The handle is valid and we are the owner of the handle. - unsafe { - GetOverlappedResult( - self.pipe.raw(), - self.overlapped.deref_mut() as *mut OVERLAPPED, - &mut bytes_read as *mut u32, - false, - ) - .map_err(WindowsError::OverlappedConnect)? - }; - - self.pipe.connected = true; - - Ok(()) - } -} - -pub(crate) struct OverlappedPipeReadCtx<'a> { - pipe: &'a MessagePipeServer, - buffer: Vec, - overlapped: Pin>, - event: Event, -} - -impl<'a> OverlappedPipeReadCtx<'a> { - fn new(pipe: &'a MessagePipeServer, buffer_size: usize) -> Result { - let event = Event::new_unnamed()?; - - let overlapped = Box::pin(OVERLAPPED { - hEvent: event.raw(), - ..Default::default() - }); - - Ok(Self { - pipe, - buffer: vec![0; buffer_size], - overlapped, - event, - }) - } - - pub(crate) fn overlapped_read(&mut self) -> Result<(), WindowsError> { - // SAFETY: self.pipe.raw() returns a valid handle. The read buffer pointer returned - // by self.buffer.as_mut_slice() is valid and remains alive for the entire duration - // of the overlapped I/O operation. The OVERLAPPED structure is pinned and not moved - // in memory, ensuring its address remains stable until the operation completes. - let result = unsafe { - ReadFile( - self.pipe.raw(), - Some(self.buffer.as_mut_slice()), - None, - Some(self.overlapped.deref_mut() as *mut OVERLAPPED), - ) - }; - - ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedRead) - } - - pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> { - self.event.borrow() - } - - pub(crate) fn get_result(&mut self) -> Result<&[u8], WindowsError> { - let mut bytes_read = 0u32; - - // SAFETY: The handle is valid and we are the owner of the handle. - unsafe { - GetOverlappedResult( - self.pipe.raw(), - self.overlapped.deref_mut() as *mut OVERLAPPED, - &mut bytes_read as *mut u32, - false, - ) - .map_err(WindowsError::OverlappedRead)? - }; - - Ok(&self.buffer[..bytes_read as usize]) - } -} - -pub(crate) struct OverlappedWriteCtx<'a> { - pipe: &'a MessagePipeServer, - data: Vec, - overlapped: Pin>, - event: Event, -} - -impl<'a> OverlappedWriteCtx<'a> { - fn new(pipe: &'a MessagePipeServer, data: Vec) -> Result { - let event = Event::new_unnamed()?; - - let mut overlapped = Box::pin(OVERLAPPED { - hEvent: event.raw(), - ..Default::default() - }); - - // Set write mode to append - overlapped.Anonymous.Anonymous.Offset = 0xFFFFFFFF; - overlapped.Anonymous.Anonymous.OffsetHigh = 0xFFFFFFFF; - - Ok(Self { - pipe, - data, - overlapped, - event, - }) - } - - pub(crate) fn overlapped_write(&mut self) -> Result<(), WindowsError> { - // SAFETY: self.pipe.raw() returns a valid handle. The write buffer pointer (&self.data) is valid - // and remains alive for the entire duration of the overlapped I/O operation. The OVERLAPPED - // structure is pinned and not moved in memory, ensuring its address remains stable until the - // operation completes. - let result = unsafe { - WriteFile( - self.pipe.raw(), - Some(&self.data), - None, - Some(self.overlapped.deref_mut() as *mut OVERLAPPED), - ) - }; - - ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedWrite) - } - - pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> { - self.event.borrow() - } - - pub(crate) fn get_result(&mut self) -> Result { - let mut bytes_written = 0u32; - // SAFETY: The pipe handle is valid and we are the owner of the handle. - unsafe { - GetOverlappedResult( - self.pipe.raw(), - self.overlapped.deref_mut() as *const OVERLAPPED, - &mut bytes_written as *mut u32, - true, - ) - .map_err(WindowsError::OverlappedWrite)?; - }; - - Ok(bytes_written) - } -} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/windows/semaphore.rs b/crates/ironrdp-dvc-pipe-proxy/src/windows/semaphore.rs deleted file mode 100644 index 2b8d6fdd..00000000 --- a/crates/ironrdp-dvc-pipe-proxy/src/windows/semaphore.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::sync::Arc; - -use windows::core::Owned; -use windows::Win32::Foundation::HANDLE; -use windows::Win32::System::Threading::{CreateSemaphoreW, ReleaseSemaphore}; - -use crate::windows::{BorrowedHandle, WindowsError}; - -/// RAII wrapper for WinAPI semaphore handle. -#[derive(Debug, Clone)] -pub(crate) struct Semaphore { - handle: Arc>, -} - -// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via -// Semaphore wrapper API itself by restricting handle usage: -// - release() method which calls ReleaseSemaphore inside (which is thread-safe). -// - borrow() method which returns a BorrowedHandle for waiting on the semaphore. -// - Handle lifetime is ensured by Arc, so it is always valid when used. -unsafe impl Send for Semaphore {} - -impl Semaphore { - /// Creates a new unnamed semaphore with the specified initial and maximum counts. - pub(crate) fn new_unnamed(initial_count: u32, maximum_count: u32) -> Result { - if maximum_count == 0 { - return Err(WindowsError::InvalidSemaphoreParams( - "maximum_count must be greater than 0", - )); - } - - if initial_count > maximum_count { - return Err(WindowsError::InvalidSemaphoreParams( - "initial_count must be less than or equal to maximum_count", - )); - } - - let initial_count = i32::try_from(initial_count) - .map_err(|_| WindowsError::InvalidSemaphoreParams("initial_count should be positive"))?; - - let maximum_count = i32::try_from(maximum_count) - .map_err(|_| WindowsError::InvalidSemaphoreParams("maximum_count should be positive"))?; - - // SAFETY: All parameters are checked for validity above: - // - initial_count is always <= maximum_count. - // - maximum_count is always > 0. - // - all values are positive. - let handle = unsafe { - CreateSemaphoreW(None, initial_count, maximum_count, None).map_err(WindowsError::CreateSemaphore)? - }; - - // SAFETY: Handle is valid and we are the owner of the handle. - let handle = unsafe { Owned::new(handle) }; - - // CreateSemaphoreW returns a valid handle on success. - Ok(Self { - // See `unsafe impl Send` comment. - // TODO(@CBenoit): Verify this comment. - #[expect(clippy::arc_with_non_send_sync)] - handle: Arc::new(handle), - }) - } - - fn raw(&self) -> HANDLE { - **self.handle - } - - pub(crate) fn borrow(&self) -> BorrowedHandle<'_> { - BorrowedHandle(&self.handle) - } - - pub(crate) fn release(&self, release_count: u16) -> Result { - let release_count = i32::from(release_count); - - if release_count == 0 { - // semaphore release count must be greater than 0 - return Err(WindowsError::InvalidSemaphoreParams( - "release_count must be greater than 0", - )); - } - - let mut previous_count = 0; - // SAFETY: All parameters are checked for validity above: - // - release_count > 0. - // - lpPreviousCount points to valid stack memory. - // - handle is valid and owned by this struct. - unsafe { - ReleaseSemaphore(self.raw(), release_count, Some(&mut previous_count)) - .map_err(WindowsError::ReleaseSemaphore)?; - } - Ok(previous_count.try_into().expect("semaphore count is negative")) - } -} diff --git a/crates/ironrdp-dvc-pipe-proxy/src/worker.rs b/crates/ironrdp-dvc-pipe-proxy/src/worker.rs new file mode 100644 index 00000000..4cfbcc7b --- /dev/null +++ b/crates/ironrdp-dvc-pipe-proxy/src/worker.rs @@ -0,0 +1,157 @@ +use std::sync::Arc; + +use ironrdp_dvc::encode_dvc_messages; +use ironrdp_pdu::PduResult; +use ironrdp_svc::{ChannelFlags, SvcMessage}; +use tokio::sync::{mpsc, Notify}; + +use crate::error::DvcPipeProxyError; +use crate::message::RawDataDvcMessage; +use crate::os_pipe::OsPipe; + +const IO_BUFFER_SIZE: usize = 1024 * 64; // 64K + +pub(crate) type OnWriteDvcMessage = Box) -> PduResult<()> + Send>; + +pub(crate) struct WorkerCtx { + pub(crate) on_write_dvc: OnWriteDvcMessage, + pub(crate) to_pipe_rx: mpsc::Receiver>, + pub(crate) abort_event: Arc, + pub(crate) pipe_name: String, + pub(crate) channel_name: String, + pub(crate) channel_id: u32, +} + +pub(crate) fn run_worker(ctx: WorkerCtx) { + let _ = std::thread::spawn(move || { + let channel_name = ctx.channel_name.clone(); + let pipe_name = ctx.pipe_name.clone(); + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(DvcPipeProxyError::Io); + + let runtime = match runtime { + Ok(runtime) => runtime, + Err(error) => { + error!( + %channel_name, + %pipe_name, + ?error, + "DVC pipe proxy worker thread initialization failed." + ); + return; + } + }; + + if let Err(error) = runtime.block_on(worker::

(ctx)) { + error!( + %channel_name, + %pipe_name, + ?error, + "DVC pipe proxy worker thread has failed." + ); + } + }); +} + +enum NextWorkerState { + Abort, + Reconnect, +} + +async fn process_client(ctx: &mut WorkerCtx) -> Result { + let pipe_name = &ctx.pipe_name; + let channel_name = &ctx.channel_name; + + let mut pipe = tokio::select! { + pipe = P::connect(pipe_name) => { + info!(%channel_name, %pipe_name,"DVC proxy worker thread has started."); + pipe? + } + _ = ctx.abort_event.notified() => { + info!(%channel_name, %pipe_name, "DVC proxy worker thread has been aborted."); + return Ok(NextWorkerState::Abort); + } + }; + + let mut from_pipe_buffer = [0u8; IO_BUFFER_SIZE]; + + loop { + let abort = ctx.abort_event.notified(); + let read_pipe = pipe.read(&mut from_pipe_buffer); + let read_dvc = ctx.to_pipe_rx.recv(); + + tokio::select! { + () = abort => { + info!(%channel_name, %pipe_name, "Received abort signal for DVC proxy worker thread."); + return Ok(NextWorkerState::Abort); + } + read_bytes_result = read_pipe => { + let read_bytes = read_bytes_result?; + + if read_bytes == 0 { + info!(%channel_name, %pipe_name, "DVC proxy pipe returned EOF"); + + // If client unexpectedly closed the connection, we should + // still be able to reconnect to same session. + return Ok(NextWorkerState::Reconnect); + } + + let messages = encode_dvc_messages( + ctx.channel_id, + vec![Box::new(RawDataDvcMessage(from_pipe_buffer[..read_bytes].to_vec()))], + ChannelFlags::empty(), + ) + .map_err(DvcPipeProxyError::EncodeDvcMessage)?; + + if let Err(error) = (ctx.on_write_dvc)(0, messages) { + error!(%channel_name, %pipe_name, ?error, "DVC pipe proxy write callback failed"); + } + } + dvc_input = read_dvc => { + let data = match dvc_input { + Some(data) => data, + None => { + info!(%channel_name, %pipe_name, "DVC mpsc channel returned EOF."); + // Server DVC has been closed, there is no point in + // trying to reconnect. + return Ok(NextWorkerState::Abort); + } + }; + + if let Err(error) = pipe.write_all(&data).await + { + error!(%channel_name, %pipe_name, ?error, "Failed to write to DVC pipe"); + continue; + } + } + }; + } +} + +async fn worker(mut ctx: WorkerCtx) -> Result<(), DvcPipeProxyError> { + loop { + match process_client::

(&mut ctx).await? { + NextWorkerState::Abort => { + info!( + channel_name = %ctx.channel_name, + pipe_name = %ctx.pipe_name, + "Aborting DVC proxy worker thread." + ); + break; + } + NextWorkerState::Reconnect => { + info!( + channel_name = %ctx.channel_name, + pipe_name = %ctx.pipe_name, + "Reconnecting to DVC pipe..." + ); + continue; + } + }; + } + + Ok(()) +}