feat(dvc): add DVC named pipe proxy support (#791)

This commit is contained in:
Vladyslav Nikonov 2025-06-17 13:16:41 +03:00 committed by GitHub
parent e5f92ae11c
commit 5482365655
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 1270 additions and 17 deletions

View file

@ -48,6 +48,7 @@ ironrdp-rdpsnd-native = { path = "../ironrdp-rdpsnd-native", version = "0.3" }
ironrdp-tls = { path = "../ironrdp-tls", version = "0.1" }
ironrdp-tokio = { path = "../ironrdp-tokio", version = "0.5", features = ["reqwest"] }
ironrdp-rdcleanpath.path = "../ironrdp-rdcleanpath"
ironrdp-dvc-pipe-proxy.path = "../ironrdp-dvc-pipe-proxy"
# Windowing and rendering
winit = { version = "0.30", features = ["rwh_06"] }

View file

@ -21,6 +21,13 @@ pub struct Config {
pub connector: connector::Config,
pub clipboard_type: ClipboardType,
pub rdcleanpath: Option<RDCleanPathConfig>,
/// DVC channel <-> named pipe proxy configuration.
///
/// Each configured proxy enables IronRDP to connect to DVC channel and create a named pipe
/// server, which will be used for proxying DVC messages to/from user-defined DVC logic
/// implemented as named pipe clients (either in the same process or in a different process).
pub dvc_pipe_proxies: Vec<DvcProxyInfo>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
@ -137,6 +144,33 @@ pub struct RDCleanPathConfig {
pub auth_token: String,
}
#[derive(Clone, Debug)]
pub struct DvcProxyInfo {
pub channel_name: String,
pub pipe_name: String,
}
impl FromStr for DvcProxyInfo {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split('=');
let channel_name = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing DVC channel name"))?
.to_owned();
let pipe_name = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing DVC proxy pipe name"))?
.to_owned();
Ok(Self {
channel_name,
pipe_name,
})
}
}
/// Devolutions IronRDP client
#[derive(Parser, Debug)]
#[clap(author = "Devolutions", about = "Devolutions-IronRDP client")]
@ -238,6 +272,14 @@ struct Args {
/// The bitmap codecs to use (remotefx:on, ...)
#[clap(long, value_parser, num_args = 1.., value_delimiter = ',')]
codecs: Vec<String>,
/// Add DVC channel named pipe proxy.
/// the format is <name>=<pipe>
/// e.g. `ChannelName=PipeName` where `ChannelName` is the name of the channel,
/// and `PipeName` is the name of the named pipe to connect to (without OS-specific prefix),
/// e.g. PipeName will automatically be prefixed with `\\.\pipe\` on Windows.
#[clap(long, value_parser)]
dvc_proxy: Vec<DvcProxyInfo>,
}
impl Config {
@ -357,6 +399,7 @@ impl Config {
connector,
clipboard_type,
rdcleanpath,
dvc_pipe_proxies: args.dvc_proxy,
})
}
}

View file

@ -6,7 +6,7 @@ extern crate tracing;
use anyhow::Context as _;
use ironrdp_client::app::App;
use ironrdp_client::config::{ClipboardType, Config};
use ironrdp_client::rdp::{RdpClient, RdpInputEvent, RdpOutputEvent};
use ironrdp_client::rdp::{DvcPipeProxyFactory, RdpClient, RdpInputEvent, RdpOutputEvent};
use tokio::runtime;
use winit::event_loop::EventLoop;
@ -50,7 +50,7 @@ fn main() -> anyhow::Result<()> {
use ironrdp_client::clipboard::ClientClipboardMessageProxy;
use ironrdp_cliprdr_native::WinClipboard;
let cliprdr = WinClipboard::new(ClientClipboardMessageProxy::new(input_event_sender))?;
let cliprdr = WinClipboard::new(ClientClipboardMessageProxy::new(input_event_sender.clone()))?;
let factory = cliprdr.backend_factory();
_win_clipboard = cliprdr;
@ -59,11 +59,14 @@ fn main() -> anyhow::Result<()> {
_ => None,
};
let dvc_pipe_proxy_factory = DvcPipeProxyFactory::new(input_event_sender);
let client = RdpClient {
config,
event_loop_proxy,
input_event_receiver,
cliprdr_factory,
dvc_pipe_proxy_factory,
};
debug!("Start RDP thread");

View file

@ -8,8 +8,10 @@ use ironrdp::displaycontrol::pdu::MonitorLayoutEntry;
use ironrdp::graphics::image_processing::PixelFormat;
use ironrdp::graphics::pointer::DecodedPointer;
use ironrdp::pdu::input::fast_path::FastPathInputEvent;
use ironrdp::pdu::{pdu_other_err, PduResult};
use ironrdp::session::image::DecodedImage;
use ironrdp::session::{fast_path, ActiveStage, ActiveStageOutput, GracefulDisconnectReason, SessionResult};
use ironrdp::svc::SvcMessage;
use ironrdp::{cliprdr, connector, rdpdr, rdpsnd, session};
use ironrdp_core::WriteBuf;
use ironrdp_rdpsnd_native::cpal;
@ -23,6 +25,7 @@ use tokio::sync::mpsc;
use winit::event_loop::EventLoopProxy;
use crate::config::{Config, RDCleanPathConfig};
use ironrdp_dvc_pipe_proxy::DvcNamedPipeProxy;
#[derive(Debug)]
pub enum RdpOutputEvent {
@ -47,6 +50,10 @@ pub enum RdpInputEvent {
FastPath(SmallVec<[FastPathInputEvent; 2]>),
Close,
Clipboard(ClipboardMessage),
SendDvcMessages {
channel_id: u32,
messages: Vec<SvcMessage>,
},
}
impl RdpInputEvent {
@ -55,18 +62,50 @@ impl RdpInputEvent {
}
}
pub struct DvcPipeProxyFactory {
rdp_input_sender: mpsc::UnboundedSender<RdpInputEvent>,
}
impl DvcPipeProxyFactory {
pub fn new(rdp_input_sender: mpsc::UnboundedSender<RdpInputEvent>) -> Self {
Self { rdp_input_sender }
}
pub fn create(&self, channel_name: String, pipe_name: String) -> DvcNamedPipeProxy {
let rdp_input_sender = self.rdp_input_sender.clone();
DvcNamedPipeProxy::new(&channel_name, &pipe_name, move |channel_id, messages| {
rdp_input_sender
.send(RdpInputEvent::SendDvcMessages { channel_id, messages })
.map_err(|_error| pdu_other_err!("send DVC messages to the event loop",))?;
Ok(())
})
}
}
pub type WriteDvcMessageFn = Box<dyn Fn(u32, SvcMessage) -> PduResult<()> + Send + 'static>;
pub struct RdpClient {
pub config: Config,
pub event_loop_proxy: EventLoopProxy<RdpOutputEvent>,
pub input_event_receiver: mpsc::UnboundedReceiver<RdpInputEvent>,
pub cliprdr_factory: Option<Box<dyn CliprdrBackendFactory + Send>>,
pub dvc_pipe_proxy_factory: DvcPipeProxyFactory,
}
impl RdpClient {
pub async fn run(mut self) {
loop {
let (connection_result, framed) = if let Some(rdcleanpath) = self.config.rdcleanpath.as_ref() {
match connect_ws(&self.config, rdcleanpath, self.cliprdr_factory.as_deref()).await {
match connect_ws(
&self.config,
rdcleanpath,
self.cliprdr_factory.as_deref(),
&self.dvc_pipe_proxy_factory,
)
.await
{
Ok(result) => result,
Err(e) => {
let _ = self.event_loop_proxy.send_event(RdpOutputEvent::ConnectionFailure(e));
@ -74,7 +113,13 @@ impl RdpClient {
}
}
} else {
match connect(&self.config, self.cliprdr_factory.as_deref()).await {
match connect(
&self.config,
self.cliprdr_factory.as_deref(),
&self.dvc_pipe_proxy_factory,
)
.await
{
Ok(result) => result,
Err(e) => {
let _ = self.event_loop_proxy.send_event(RdpOutputEvent::ConnectionFailure(e));
@ -122,6 +167,7 @@ type UpgradedFramed = ironrdp_tokio::TokioFramed<Box<dyn AsyncReadWrite + Unpin
async fn connect(
config: &Config,
cliprdr_factory: Option<&(dyn CliprdrBackendFactory + Send)>,
dvc_pipe_proxy_factory: &DvcPipeProxyFactory,
) -> ConnectorResult<(ConnectionResult, UpgradedFramed)> {
let dest = format!("{}:{}", config.destination.name(), config.destination.port());
@ -135,10 +181,21 @@ async fn connect(
let mut framed = ironrdp_tokio::TokioFramed::new(socket);
let mut drdynvc =
ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new())));
// Instantiate all DVC proxies
for proxy in config.dvc_pipe_proxies.iter() {
let channel_name = proxy.channel_name.clone();
let pipe_name = proxy.pipe_name.clone();
trace!(%channel_name, %pipe_name, "Creating DVC proxy");
drdynvc = drdynvc.with_dynamic_channel(dvc_pipe_proxy_factory.create(channel_name, pipe_name));
}
let mut connector = connector::ClientConnector::new(config.connector.clone(), client_addr)
.with_static_channel(
ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new()))),
)
.with_static_channel(drdynvc)
.with_static_channel(rdpsnd::client::Rdpsnd::new(Box::new(cpal::RdpsndBackend::new())))
.with_static_channel(rdpdr::Rdpdr::new(Box::new(NoopRdpdrBackend {}), "IronRDP".to_owned()).with_smartcard(0));
@ -186,6 +243,7 @@ async fn connect_ws(
config: &Config,
rdcleanpath: &RDCleanPathConfig,
cliprdr_factory: Option<&(dyn CliprdrBackendFactory + Send)>,
dvc_pipe_proxy_factory: &DvcPipeProxyFactory,
) -> ConnectorResult<(ConnectionResult, UpgradedFramed)> {
let hostname = rdcleanpath
.url
@ -214,10 +272,21 @@ async fn connect_ws(
let mut framed = ironrdp_tokio::TokioFramed::new(ws);
let mut drdynvc =
ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new())));
// Instantiate all DVC proxies
for proxy in config.dvc_pipe_proxies.iter() {
let channel_name = proxy.channel_name.clone();
let pipe_name = proxy.pipe_name.clone();
trace!(%channel_name, %pipe_name, "Creating DVC proxy");
drdynvc = drdynvc.with_dynamic_channel(dvc_pipe_proxy_factory.create(channel_name, pipe_name));
}
let mut connector = connector::ClientConnector::new(config.connector.clone(), client_addr)
.with_static_channel(
ironrdp::dvc::DrdynvcClient::new().with_dynamic_channel(DisplayControlClient::new(|_| Ok(Vec::new()))),
)
.with_static_channel(drdynvc)
.with_static_channel(rdpsnd::client::Rdpsnd::new(Box::new(cpal::RdpsndBackend::new())))
.with_static_channel(rdpdr::Rdpdr::new(Box::new(NoopRdpdrBackend {}), "IronRDP".to_owned()).with_smartcard(0));
@ -468,6 +537,12 @@ async fn active_session(
Vec::new()
}
}
RdpInputEvent::SendDvcMessages { channel_id, messages } => {
trace!(channel_id, ?messages, "Send DVC messages");
let frame = active_stage.encode_dvc_messages(messages)?;
vec![ActiveStageOutput::ResponseFrame(frame)]
}
}
}
};

View file

@ -0,0 +1,6 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

View file

@ -0,0 +1,40 @@
[package]
name = "ironrdp-dvc-pipe-proxy"
version = "0.1.0"
readme = "README.md"
description = "DVC named pipe proxy for IronRDP"
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
authors.workspace = true
keywords.workspace = true
categories.workspace = true
[lib]
doctest = false
test = false
[dependencies]
ironrdp-core.path = "../ironrdp-core"
ironrdp-dvc.path = "../ironrdp-dvc"
ironrdp-pdu.path = "../ironrdp-pdu"
ironrdp-svc.path = "../ironrdp-svc"
tracing = { version = "0.1", features = ["log"] }
[target.'cfg(windows)'.dependencies]
widestring = "1"
windows = { version = "0.61", features = [
"Win32_Foundation",
"Win32_Security",
"Win32_System_Threading",
"Win32_Storage_FileSystem",
"Win32_System_Pipes",
"Win32_System_IO",
] }
[lints]
workspace = true

View file

@ -0,0 +1 @@
../../LICENSE-APACHE

View file

@ -0,0 +1 @@
../../LICENSE-MIT

View file

@ -0,0 +1,15 @@
# IronRDP DVC pipe proxy
This crate provides a Device Virtual Channel (DVC) handler for IronRDP, enabling proxying of RDP DVC
traffic over a named pipe.
It was originally designed to simplify custom DVC integration within Devolutions Remote Desktop
Manager (RDM). By implementing a thin pipe proxy for target RDP clients (such as IronRDP, FreeRDP,
mstsc, etc.), the main client logic can be centralized and reused across all supported clients via a
named pipe.
This approach allows you to implement your DVC logic in one place, making it easier to support
multiple RDP clients without duplicating code.
Additionally, this crate can be used for other scenarios, such as testing your own custom DVC
channel client, without needing to patch or rebuild IronRDP itself.

View file

@ -0,0 +1,12 @@
#![doc = include_str!("../README.md")]
#![doc(html_logo_url = "https://cdnweb.devolutions.net/images/projects/devolutions/logos/devolutions-icon-shadow.svg")]
#[macro_use]
extern crate tracing;
#[cfg(target_os = "windows")]
mod windows;
mod platform;
pub use platform::DvcNamedPipeProxy;

View file

@ -0,0 +1,11 @@
#[cfg(target_os = "windows")]
mod windows;
#[cfg(not(target_os = "windows"))]
mod unix;
#[cfg(target_os = "windows")]
pub use windows::DvcNamedPipeProxy;
#[cfg(not(target_os = "windows"))]
pub use unix::DvcNamedPipeProxy;

View file

@ -0,0 +1,48 @@
use ironrdp_core::impl_as_any;
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
use ironrdp_pdu::{pdu_other_err, PduResult};
use ironrdp_svc::SvcMessage;
/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server.
pub struct DvcNamedPipeProxy {
channel_name: String,
}
impl DvcNamedPipeProxy {
/// Creates a new DVC named pipe proxy.
/// `dvc_write_callback` is called when the proxy receives a DVC message from the
/// named pipe server and the SVC message is ready to be sent to the DVC channel in the main
/// IronRDP active session loop.
pub fn new<F>(channel_name: &str, _named_pipe_name: &str, _dvc_write_callback: F) -> Self
where
F: Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send + 'static,
{
error!("DvcNamedPipeProxy is not implemented on Unix-like systems, using a stub implementation");
Self {
channel_name: channel_name.to_owned(),
}
}
}
impl_as_any!(DvcNamedPipeProxy);
impl DvcProcessor for DvcNamedPipeProxy {
fn channel_name(&self) -> &str {
&self.channel_name
}
fn start(&mut self, _channel_id: u32) -> PduResult<Vec<DvcMessage>> {
Err(pdu_other_err!(
"DvcNamedPipeProxy is not implemented on Unix-like systems"
))
}
fn process(&mut self, _channel_id: u32, _payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
Err(pdu_other_err!(
"DvcNamedPipeProxy is not implemented on Unix-like systems"
))
}
}
impl DvcClientProcessor for DvcNamedPipeProxy {}

View file

@ -0,0 +1,37 @@
use crate::windows::WindowsError;
#[derive(Debug)]
pub(crate) enum DvcPipeProxyError {
Windows(WindowsError),
MpscIo,
DvcIncompleteWrite,
EncodeDvcMessage,
}
impl From<WindowsError> for DvcPipeProxyError {
fn from(err: WindowsError) -> Self {
DvcPipeProxyError::Windows(err)
}
}
impl core::fmt::Display for DvcPipeProxyError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
DvcPipeProxyError::Windows(err) => err.fmt(f),
DvcPipeProxyError::MpscIo => write!(f, "MPSC IO error"),
DvcPipeProxyError::DvcIncompleteWrite => write!(f, "DVC incomplete write"),
DvcPipeProxyError::EncodeDvcMessage => write!(f, "DVC message encoding error"),
}
}
}
impl core::error::Error for DvcPipeProxyError {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match self {
DvcPipeProxyError::Windows(err) => Some(err),
DvcPipeProxyError::MpscIo => None,
DvcPipeProxyError::DvcIncompleteWrite => None,
DvcPipeProxyError::EncodeDvcMessage => None,
}
}
}

View file

@ -0,0 +1,149 @@
mod error;
mod worker;
use std::sync::mpsc;
use ironrdp_core::impl_as_any;
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
use ironrdp_pdu::{pdu_other_err, PduResult};
use ironrdp_svc::SvcMessage;
use crate::platform::windows::error::DvcPipeProxyError;
use crate::platform::windows::worker::{worker_thread_func, OnWriteDvcMessage, WorkerCtx};
use crate::windows::{Event, MessagePipeServer, Semaphore};
const IO_MPSC_CHANNEL_SIZE: usize = 100;
struct WorkerControlCtx {
to_pipe_tx: mpsc::SyncSender<Vec<u8>>,
to_pipe_semaphore: Semaphore,
abort_event: Event,
}
/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server.
pub struct DvcNamedPipeProxy {
channel_name: String,
named_pipe_name: String,
dvc_write_callback: Option<OnWriteDvcMessage>,
worker_control_ctx: Option<WorkerControlCtx>,
}
impl DvcNamedPipeProxy {
/// Creates a new DVC named pipe proxy.
/// `dvc_write_callback` is called when the proxy receives a DVC message from the
/// named pipe server and the SVC message is ready to be sent to the DVC channel in the main
/// IronRDP active session loop.
pub fn new<F>(channel_name: &str, named_pipe_name: &str, dvc_write_callback: F) -> Self
where
F: Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send + 'static,
{
let named_pipe_name = format!("\\\\.\\pipe\\{named_pipe_name}");
Self {
channel_name: channel_name.to_owned(),
named_pipe_name,
dvc_write_callback: Some(Box::new(dvc_write_callback)),
worker_control_ctx: None,
}
}
}
impl_as_any!(DvcNamedPipeProxy);
impl Drop for DvcNamedPipeProxy {
fn drop(&mut self) {
if let Some(ctx) = &self.worker_control_ctx {
// Signal the worker thread to abort.
ctx.abort_event.set().ok();
}
}
}
impl DvcNamedPipeProxy {
fn start_impl(&mut self, channel_id: u32) -> Result<(), DvcPipeProxyError> {
// PIPE -> DVC channel - handled via callback passed to the constructor.
// DVC -> PIPE channel - handled via mpsc internally in the worker thread.
let (to_pipe_tx, to_pipe_rx) = mpsc::sync_channel(IO_MPSC_CHANNEL_SIZE);
let semaphore_max_count = IO_MPSC_CHANNEL_SIZE
.try_into()
.expect("Channel size is too large for underlying WinAPI semaphore");
let to_pipe_semaphore = Semaphore::new_unnamed(0, semaphore_max_count)?;
let abort_event = Event::new_unnamed()?;
let worker_control_ctx = WorkerControlCtx {
to_pipe_tx,
to_pipe_semaphore: to_pipe_semaphore.clone(),
abort_event: abort_event.clone(),
};
let pipe = MessagePipeServer::new(&self.named_pipe_name)?;
let dvc_write_callback = self
.dvc_write_callback
.take()
.expect("DVC write callback already taken");
let worker_ctx = WorkerCtx {
pipe,
to_pipe_rx,
to_pipe_semaphore,
abort_event,
dvc_write_callback,
pipe_name: self.named_pipe_name.clone(),
channel_name: self.channel_name.clone(),
channel_id,
};
let pipe_name = self.named_pipe_name.clone();
let channel_name = self.channel_name.clone();
self.worker_control_ctx = Some(worker_control_ctx);
std::thread::spawn(move || {
if let Err(error) = worker_thread_func(worker_ctx) {
error!(%error, %pipe_name, %channel_name, "DVC pipe proxy worker thread failed");
}
});
Ok(())
}
}
impl DvcProcessor for DvcNamedPipeProxy {
fn channel_name(&self) -> &str {
&self.channel_name
}
fn start(&mut self, channel_id: u32) -> PduResult<Vec<DvcMessage>> {
self.start_impl(channel_id)
.map_err(|e| pdu_other_err!("dvc named pipe proxy failed", source: e))?;
Ok(Vec::new())
}
fn process(&mut self, _channel_id: u32, payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
// Send the payload to the worker thread via the mpsc channel.
let ctx = match &self.worker_control_ctx {
Some(ctx) => ctx,
None => {
return Err(pdu_other_err!("DVC pipe proxy not started"));
}
};
ctx.to_pipe_tx
.send(payload.to_vec())
.map_err(|_| pdu_other_err!("DVC pipe proxy send failed"))?;
// Signal WinAPI-based worker IO loop.
ctx.to_pipe_semaphore
.release(1)
.map_err(|_| pdu_other_err!("DVC pipe proxy semaphore release failed"))?;
Ok(Vec::new())
}
}
impl DvcClientProcessor for DvcNamedPipeProxy {}

View file

@ -0,0 +1,172 @@
use std::sync::mpsc;
use ironrdp_core::{ensure_size, Encode, EncodeResult};
use ironrdp_dvc::{encode_dvc_messages, DvcEncode};
use ironrdp_pdu::PduResult;
use ironrdp_svc::{ChannelFlags, SvcMessage};
use crate::platform::windows::error::DvcPipeProxyError;
use crate::windows::{wait_any, wait_any_with_timeout, Event, MessagePipeServer, Semaphore, WindowsError};
const PIPE_CONNECT_TIMEOUT_SECS: u32 = 10_000; // 10 seconds
const PIPE_WRITE_TIMEOUT_SECS: u32 = 3_000; // 3 seconds
const MESSAGE_BUFFER_SIZE: usize = 64 * 1024; // 64 KiB
pub(crate) type OnWriteDvcMessage = Box<dyn Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send>;
pub(crate) struct WorkerCtx {
pub pipe: MessagePipeServer,
pub to_pipe_rx: mpsc::Receiver<Vec<u8>>,
pub to_pipe_semaphore: Semaphore,
pub abort_event: Event,
pub dvc_write_callback: OnWriteDvcMessage,
pub pipe_name: String,
pub channel_name: String,
pub channel_id: u32,
}
pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipeProxyError> {
let WorkerCtx {
mut pipe,
to_pipe_rx,
to_pipe_semaphore,
abort_event,
dvc_write_callback,
pipe_name,
channel_name,
channel_id,
} = worker_ctx;
info!(%channel_name, %pipe_name, "Connecting DVC pipe proxy");
{
let mut connect_ctx = pipe.prepare_connect_overlapped()?;
if !connect_ctx.overlapped_connect()? {
const EVENT_ID_ABORT: usize = 0;
let events = [abort_event.borrow(), connect_ctx.borrow_event()];
let wait_result = match wait_any_with_timeout(&events, PIPE_CONNECT_TIMEOUT_SECS) {
Ok(idx) => idx,
Err(WindowsError::WaitForMultipleObjectsTimeout) => {
warn!(%channel_name, %pipe_name, "DVC pipe proxy connection timed out");
return Ok(());
}
Err(err) => {
return Err(DvcPipeProxyError::Windows(err));
}
};
if wait_result == EVENT_ID_ABORT {
info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted");
return Ok(());
}
connect_ctx.get_result()?;
}
}
info!(%channel_name, %pipe_name, "DVC pipe proxy connected");
let mut read_ctx = pipe.prepare_read_overlapped(MESSAGE_BUFFER_SIZE)?;
const EVENT_ID_ABORT: usize = 0;
const EVENT_ID_READ: usize = 1;
const EVENT_ID_WRITE_MPSC: usize = 2;
read_ctx.overlapped_read()?;
info!(%channel_name, %pipe_name, "DVC pipe proxy IO loop started");
loop {
let events = [
abort_event.borrow(),
read_ctx.borrow_event(),
to_pipe_semaphore.borrow(),
];
let wait_result = wait_any(&events)?;
if wait_result == EVENT_ID_ABORT {
info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted");
return Ok(());
}
// Read end of pipe is ready, forward received data to DVC.
if wait_result == EVENT_ID_READ {
let read_result = read_ctx.get_result()?.to_vec();
trace!(%channel_name, %pipe_name, "DVC proxy read {} bytes from pipe", read_result.len());
if !read_result.is_empty() {
let messages = encode_dvc_messages(
channel_id,
vec![Box::new(RawDataDvcMessage(read_result))],
ChannelFlags::empty(),
)
.map_err(|_| DvcPipeProxyError::EncodeDvcMessage)?;
if let Err(err) = dvc_write_callback(0, messages) {
error!(%err, %channel_name, %pipe_name, "DVC pipe proxy write callback failed");
}
}
// Queue the read operation again.
read_ctx.overlapped_read()?;
continue;
}
// DVC data received, forward it to the pipe.
if wait_result == EVENT_ID_WRITE_MPSC {
let payload = to_pipe_rx.recv().map_err(|_| DvcPipeProxyError::MpscIo)?;
let payload_len = payload.len();
if payload_len == 0 {
warn!(%channel_name, %pipe_name, "Rejected empty DVC data (not sent to pipe)");
continue;
}
trace!(%channel_name, %pipe_name, "DVC proxy write {} bytes to pipe,", payload_len);
let mut overlapped_write = pipe.prepare_write_overlapped(payload)?;
overlapped_write.overlapped_write()?;
let events = [abort_event.borrow(), overlapped_write.borrow_event()];
let wait_result = wait_any_with_timeout(&events, PIPE_WRITE_TIMEOUT_SECS)?;
if wait_result == EVENT_ID_ABORT {
info!(%channel_name, %pipe_name, "DVC pipe proxy write aborted");
return Ok(());
}
let bytes_written = overlapped_write.get_result()?;
if bytes_written as usize != payload_len {
// Message-based pipe write failed.
return Err(DvcPipeProxyError::DvcIncompleteWrite);
}
continue;
}
}
}
struct RawDataDvcMessage(Vec<u8>);
impl Encode for RawDataDvcMessage {
fn encode(&self, dst: &mut ironrdp_core::WriteCursor<'_>) -> EncodeResult<()> {
ensure_size!(in: dst, size: self.size());
dst.write_slice(&self.0);
Ok(())
}
fn name(&self) -> &'static str {
"RawDataDvcMessage"
}
fn size(&self) -> usize {
self.0.len()
}
}
impl DvcEncode for RawDataDvcMessage {}

View file

@ -0,0 +1,58 @@
#[derive(Debug)]
pub(crate) enum WindowsError {
CreateNamedPipe(windows::core::Error),
CreateEvent(windows::core::Error),
SetEvent(windows::core::Error),
ReleaseSemaphore(windows::core::Error),
InvalidSemaphoreParams(&'static str),
WaitForMultipleObjectsFailed(windows::core::Error),
WaitForMultipleObjectsTimeout,
WaitForMultipleObjectsAbandoned(u32),
OverlappedConnect(windows::core::Error),
OverlappedRead(windows::core::Error),
OverlappedWrite(windows::core::Error),
CreateSemaphore(windows::core::Error),
InvalidPipeName(String),
}
impl core::fmt::Display for WindowsError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
WindowsError::CreateNamedPipe(_) => write!(f, "failed to create named pipe"),
WindowsError::CreateEvent(_) => write!(f, "failed to create event object"),
WindowsError::SetEvent(_) => write!(f, "failed to set event to signaled state"),
WindowsError::InvalidSemaphoreParams(cause) => write!(f, "invalid semaphore parameters: {}", cause),
WindowsError::ReleaseSemaphore(_) => write!(f, "failed to release semaphore"),
WindowsError::WaitForMultipleObjectsFailed(_) => write!(f, "failed to wait for multiple objects"),
WindowsError::WaitForMultipleObjectsTimeout => write!(f, "timed out waiting for multiple objects"),
WindowsError::WaitForMultipleObjectsAbandoned(idx) => {
write!(f, "wait for multiple objects failed, handle #{idx} was abandoned")
}
WindowsError::OverlappedConnect(_) => write!(f, "overlapped connect failed"),
WindowsError::OverlappedRead(_) => write!(f, "overlapped read failed"),
WindowsError::OverlappedWrite(_) => write!(f, "overlapped write failed"),
WindowsError::CreateSemaphore(_) => write!(f, "failed to create semaphore object"),
WindowsError::InvalidPipeName(cause) => write!(f, "invalid pipe name: `{}`", cause),
}
}
}
impl core::error::Error for WindowsError {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match self {
WindowsError::CreateNamedPipe(err)
| WindowsError::SetEvent(err)
| WindowsError::ReleaseSemaphore(err)
| WindowsError::WaitForMultipleObjectsFailed(err)
| WindowsError::OverlappedConnect(err)
| WindowsError::OverlappedRead(err)
| WindowsError::OverlappedWrite(err)
| WindowsError::CreateSemaphore(err) => Some(err),
WindowsError::CreateEvent(err) => Some(err),
WindowsError::InvalidSemaphoreParams(_)
| WindowsError::WaitForMultipleObjectsTimeout
| WindowsError::InvalidPipeName(_) => None,
WindowsError::WaitForMultipleObjectsAbandoned(_) => None,
}
}
}

View file

@ -0,0 +1,53 @@
use std::sync::Arc;
use windows::core::Owned;
use windows::Win32::Foundation::HANDLE;
use windows::Win32::System::Threading::{CreateEventW, SetEvent};
use crate::windows::{BorrowedHandle, WindowsError};
/// RAII wrapper for WinAPI event handle.
#[derive(Debug, Clone)]
pub(crate) struct Event {
handle: Arc<Owned<HANDLE>>,
}
// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via
// Event wrapper API itself by restricting handle usage:
// - set() method which calls SetEvent inside (which is thread-safe).
// - borrow() method which returns a BorrowedHandle for waiting on the event.
// - Handle lifetime is ensured by Arc, so it is always valid when used.
unsafe impl Send for Event {}
impl Event {
pub(crate) fn new_unnamed() -> Result<Self, WindowsError> {
// SAFETY: FFI call with no outstanding preconditions.
let handle = unsafe { CreateEventW(None, false, false, None).map_err(WindowsError::CreateEvent)? };
// SAFETY: Handle is valid and we are the owner of the handle.
let handle = unsafe { Owned::new(handle) };
// CreateEventW returns a valid handle on success.
Ok(Self {
// See `unsafe impl Send` comment.
#[allow(clippy::arc_with_non_send_sync)]
handle: Arc::new(handle),
})
}
pub(crate) fn set(&self) -> Result<(), WindowsError> {
// SAFETY: The handle is valid and we are the owner of the handle.
unsafe {
SetEvent(self.raw()).map_err(WindowsError::SetEvent)?;
}
Ok(())
}
pub(super) fn raw(&self) -> HANDLE {
**self.handle
}
pub(crate) fn borrow(&self) -> BorrowedHandle<'_> {
BorrowedHandle(&self.handle)
}
}

View file

@ -0,0 +1,99 @@
//! WinAPI wrappers for the DVC pipe proxy IO loop logic.
//!
//! Some of the wrappers are based on `win-api-wrappers` code (simplified/reduced functionality).
mod error;
mod event;
mod pipe;
mod semaphore;
pub(crate) use error::WindowsError;
pub(crate) use event::Event;
pub(crate) use pipe::MessagePipeServer;
pub(crate) use semaphore::Semaphore;
use windows::Win32::Foundation::{
ERROR_IO_PENDING, HANDLE, WAIT_ABANDONED_0, WAIT_EVENT, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT,
};
use windows::Win32::System::Threading::{WaitForMultipleObjects, INFINITE};
/// Thin wrapper around borrowed `windows` crate `HANDLE` reference.
/// This is used to ensure handle lifetime when passing it to FFI functions
/// (see `wait_any_with_timeout` for example).
#[repr(transparent)]
pub(crate) struct BorrowedHandle<'a>(&'a HANDLE);
/// Safe wrapper around `WaitForMultipleObjects`.
pub(crate) fn wait_any_with_timeout(handles: &[BorrowedHandle<'_>], timeout: u32) -> Result<usize, WindowsError> {
let handles = cast_handles(handles);
// SAFETY:
// - BorrowedHandle alongside with rust type system ensures that the HANDLEs are valid for
// the duration of the call.
// - All handles in this module have SYNCHRONIZE access rights.
// - cast_handles ensures no handle duplicates.
let result = unsafe { WaitForMultipleObjects(handles, false, timeout) };
match result {
WAIT_FAILED => Err(WindowsError::WaitForMultipleObjectsFailed(
windows::core::Error::from_win32(),
)),
WAIT_TIMEOUT => Err(WindowsError::WaitForMultipleObjectsTimeout),
WAIT_EVENT(idx) if idx >= WAIT_ABANDONED_0.0 => {
let idx = idx - WAIT_ABANDONED_0.0;
Err(WindowsError::WaitForMultipleObjectsAbandoned(idx))
}
WAIT_EVENT(id) => Ok((id - WAIT_OBJECT_0.0) as usize),
}
}
/// Safe `WaitForMultipleObjects` wrapper with infinite timeout.
pub(crate) fn wait_any(handles: &[BorrowedHandle<'_>]) -> Result<usize, WindowsError> {
// Standard generic syntax is used instead if `impl` because of the following lint:
// > warning: lifetime parameter `'a` only used once
//
// Fixing this lint (use of '_ lifetime) produces compiler error.
wait_any_with_timeout(handles, INFINITE)
}
fn cast_handles<'a>(handles: &'a [BorrowedHandle<'a>]) -> &'a [HANDLE] {
// Very basic sanity checks to ensure that the handles are valid
// and there are no duplicates.
// This is only done in debug builds to avoid performance overhead in release builds, while
// still catching undefined behavior early in development.
#[cfg(debug_assertions)]
{
// Ensure that there are no duplicate handles without hash.
for (i, handle) in handles.iter().enumerate() {
for other_handle in &handles[i + 1..] {
if handle.0 == other_handle.0 {
panic!("Duplicate handle found in wait_any_with_timeout");
}
}
}
}
for handle in handles {
// Ensure that the handle is valid.
if handle.0.is_invalid() {
panic!("Invalid handle in wait_any_with_timeout");
}
}
// SAFETY:
// - BorrowedHandle is #[repr(transparent)] over *const c_void, and so is HANDLE,
// so the layout is the same.
// - We ensure the lifetime is preserved.
unsafe { core::slice::from_raw_parts(handles.as_ptr() as *const HANDLE, handles.len()) }
}
/// Maps ERROR_IO_PENDING to Ok(()) and returns other errors as is.
fn ensure_overlapped_io_result(result: windows::core::Result<()>) -> Result<windows::core::Result<()>, WindowsError> {
if let Err(error) = &result {
if error.code() == ERROR_IO_PENDING.to_hresult() {
return Ok(Ok(()));
}
}
Ok(result)
}

View file

@ -0,0 +1,291 @@
use core::ops::DerefMut;
use core::pin::Pin;
use windows::core::{Owned, PCWSTR};
use windows::Win32::Foundation::{ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, HANDLE};
use windows::Win32::Storage::FileSystem::{
ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
};
use windows::Win32::System::Pipes::{
ConnectNamedPipe, CreateNamedPipeW, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_WAIT,
};
use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED};
use crate::windows::{ensure_overlapped_io_result, BorrowedHandle, Event, WindowsError};
const PIPE_INSTANCES: u32 = 2;
const PIPE_BUFFER_SIZE: u32 = 64 * 1024; // 64KB
const DEFAULT_PIPE_TIMEOUT: u32 = 10_000; // 10 seconds
/// RAII wrapper for WinAPI named pipe server.
#[derive(Debug)]
pub(crate) struct MessagePipeServer {
handle: Owned<HANDLE>,
connected: bool,
}
/// SAFETY: It is safe to send pipe HANDLE between threads.
unsafe impl Send for MessagePipeServer {}
impl MessagePipeServer {
/// Creates a new named pipe server.
pub(crate) fn new(name: &str) -> Result<Self, WindowsError> {
// Create a named pipe with the specified name.
let lpname =
widestring::U16CString::from_str(name).map_err(|_| WindowsError::InvalidPipeName(name.to_owned()))?;
// SAFETY: lpname is a valid pointer to a null-terminated wide string.
let handle = unsafe {
CreateNamedPipeW(
PCWSTR(lpname.as_ptr()),
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
PIPE_INSTANCES,
PIPE_BUFFER_SIZE,
PIPE_BUFFER_SIZE,
DEFAULT_PIPE_TIMEOUT,
None,
)
};
// `windows` crate API inconsistency: CreateNamedPipeW returns invalid handle on error
// instead of Result::Err.
if handle.is_invalid() {
return Err(WindowsError::CreateNamedPipe(windows::core::Error::from_win32()));
}
// SAFETY: Handle is valid and we are the owner of the handle.
let handle = unsafe { Owned::new(handle) };
Ok(Self {
handle,
connected: false,
})
}
fn raw(&self) -> HANDLE {
*self.handle
}
/// Initializes context for overlapped connect operation.
pub(crate) fn prepare_connect_overlapped(&mut self) -> Result<OverlappedPipeConnectCtx<'_>, WindowsError> {
OverlappedPipeConnectCtx::new(self)
}
/// Initializes context for overlapped read operation.
pub(crate) fn prepare_read_overlapped(
&self,
buffer_size: usize,
) -> Result<OverlappedPipeReadCtx<'_>, WindowsError> {
OverlappedPipeReadCtx::new(self, buffer_size)
}
/// Initializes context for overlapped write operation.
pub(crate) fn prepare_write_overlapped(&self, data: Vec<u8>) -> Result<OverlappedWriteCtx<'_>, WindowsError> {
OverlappedWriteCtx::new(self, data)
}
}
pub(crate) struct OverlappedPipeConnectCtx<'a> {
pipe: &'a mut MessagePipeServer,
overlapped: Pin<Box<OVERLAPPED>>,
event: Event,
}
impl<'a> OverlappedPipeConnectCtx<'a> {
fn new(pipe: &'a mut MessagePipeServer) -> Result<Self, WindowsError> {
let event = Event::new_unnamed()?;
let overlapped = Box::pin(OVERLAPPED {
hEvent: event.raw(),
..Default::default()
});
Ok(Self {
pipe,
overlapped,
event,
})
}
/// Connects to the named pipe server.
/// Returns true if pipe is already connected prior to this call and no additional
/// overlapped io is needed. If false is returned, the caller should call `get_result()` to
/// after returned event handle is signaled to complete the connection.
pub(crate) fn overlapped_connect(&mut self) -> Result<bool, WindowsError> {
// SAFETY: The handle is valid and we are the owner of the handle.
let result = unsafe { ConnectNamedPipe(self.pipe.raw(), Some(self.overlapped.deref_mut() as *mut _)) };
match result {
Ok(()) => {
self.pipe.connected = true;
Ok(true)
}
Err(error) => {
if error.code() == ERROR_PIPE_CONNECTED.to_hresult() {
// The pipe is already connected.
self.pipe.connected = true;
Ok(true)
} else if error.code() == ERROR_IO_PENDING.to_hresult() {
// Overlapped I/O is pending.
Ok(false)
} else {
// Connection failed.
Err(WindowsError::OverlappedConnect(error))
}
}
}
}
pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> {
self.event.borrow()
}
pub(crate) fn get_result(&mut self) -> Result<(), WindowsError> {
let mut bytes_read = 0u32;
// SAFETY: The handle is valid and we are the owner of the handle.
unsafe {
GetOverlappedResult(
self.pipe.raw(),
self.overlapped.deref_mut() as *mut _,
&mut bytes_read as *mut u32,
false,
)
.map_err(WindowsError::OverlappedConnect)?
};
self.pipe.connected = true;
Ok(())
}
}
pub(crate) struct OverlappedPipeReadCtx<'a> {
pipe: &'a MessagePipeServer,
buffer: Vec<u8>,
overlapped: Pin<Box<OVERLAPPED>>,
event: Event,
}
impl<'a> OverlappedPipeReadCtx<'a> {
fn new(pipe: &'a MessagePipeServer, buffer_size: usize) -> Result<Self, WindowsError> {
let event = Event::new_unnamed()?;
let overlapped = Box::pin(OVERLAPPED {
hEvent: event.raw(),
..Default::default()
});
Ok(Self {
pipe,
buffer: vec![0; buffer_size],
overlapped,
event,
})
}
pub(crate) fn overlapped_read(&mut self) -> Result<(), WindowsError> {
// SAFETY: self.pipe.raw() returns a valid handle. The read buffer pointer returned
// by self.buffer.as_mut_slice() is valid and remains alive for the entire duration
// of the overlapped I/O operation. The OVERLAPPED structure is pinned and not moved
// in memory, ensuring its address remains stable until the operation completes.
let result = unsafe {
ReadFile(
self.pipe.raw(),
Some(self.buffer.as_mut_slice()),
None,
Some(self.overlapped.deref_mut() as *mut _),
)
};
ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedRead)
}
pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> {
self.event.borrow()
}
pub(crate) fn get_result(&mut self) -> Result<&[u8], WindowsError> {
let mut bytes_read = 0u32;
// SAFETY: The handle is valid and we are the owner of the handle.
unsafe {
GetOverlappedResult(
self.pipe.raw(),
self.overlapped.deref_mut() as *mut _,
&mut bytes_read as *mut u32,
false,
)
.map_err(WindowsError::OverlappedRead)?
};
Ok(&self.buffer[..bytes_read as usize])
}
}
pub(crate) struct OverlappedWriteCtx<'a> {
pipe: &'a MessagePipeServer,
data: Vec<u8>,
overlapped: Pin<Box<OVERLAPPED>>,
event: Event,
}
impl<'a> OverlappedWriteCtx<'a> {
fn new(pipe: &'a MessagePipeServer, data: Vec<u8>) -> Result<Self, WindowsError> {
let event = Event::new_unnamed()?;
let mut overlapped = Box::pin(OVERLAPPED {
hEvent: event.raw(),
..Default::default()
});
// Set write mode to append
overlapped.Anonymous.Anonymous.Offset = 0xFFFFFFFF;
overlapped.Anonymous.Anonymous.OffsetHigh = 0xFFFFFFFF;
Ok(Self {
pipe,
data,
overlapped,
event,
})
}
pub(crate) fn overlapped_write(&mut self) -> Result<(), WindowsError> {
// SAFETY: self.pipe.raw() returns a valid handle. The write buffer pointer (&self.data) is valid
// and remains alive for the entire duration of the overlapped I/O operation. The OVERLAPPED
// structure is pinned and not moved in memory, ensuring its address remains stable until the
// operation completes.
let result = unsafe {
WriteFile(
self.pipe.raw(),
Some(&self.data),
None,
Some(self.overlapped.deref_mut() as *mut _),
)
};
ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedWrite)
}
pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> {
self.event.borrow()
}
pub(crate) fn get_result(&mut self) -> Result<u32, WindowsError> {
let mut bytes_written = 0u32;
// SAFETY: The pipe handle is valid and we are the owner of the handle.
unsafe {
GetOverlappedResult(
self.pipe.raw(),
self.overlapped.deref_mut() as *const _,
&mut bytes_written as *mut u32,
true,
)
.map_err(WindowsError::OverlappedWrite)?;
};
Ok(bytes_written)
}
}

View file

@ -0,0 +1,92 @@
use std::sync::Arc;
use windows::core::Owned;
use windows::Win32::Foundation::HANDLE;
use windows::Win32::System::Threading::{CreateSemaphoreW, ReleaseSemaphore};
use crate::windows::{BorrowedHandle, WindowsError};
/// RAII wrapper for WinAPI semaphore handle.
#[derive(Debug, Clone)]
pub(crate) struct Semaphore {
handle: Arc<Owned<HANDLE>>,
}
// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via
// Semaphore wrapper API itself by restricting handle usage:
// - release() method which calls ReleaseSemaphore inside (which is thread-safe).
// - borrow() method which returns a BorrowedHandle for waiting on the semaphore.
// - Handle lifetime is ensured by Arc, so it is always valid when used.
unsafe impl Send for Semaphore {}
impl Semaphore {
/// Creates a new unnamed semaphore with the specified initial and maximum counts.
pub(crate) fn new_unnamed(initial_count: u32, maximum_count: u32) -> Result<Self, WindowsError> {
if maximum_count == 0 {
return Err(WindowsError::InvalidSemaphoreParams(
"maximum_count must be greater than 0",
));
}
if initial_count > maximum_count {
return Err(WindowsError::InvalidSemaphoreParams(
"initial_count must be less than or equal to maximum_count",
));
}
let initial_count = i32::try_from(initial_count)
.map_err(|_| WindowsError::InvalidSemaphoreParams("initial_count should be positive"))?;
let maximum_count = i32::try_from(maximum_count)
.map_err(|_| WindowsError::InvalidSemaphoreParams("maximum_count should be positive"))?;
// SAFETY: All parameters are checked for validity above:
// - initial_count is always <= maximum_count.
// - maximum_count is always > 0.
// - all values are positive.
let handle = unsafe {
CreateSemaphoreW(None, initial_count, maximum_count, None).map_err(WindowsError::CreateSemaphore)?
};
// SAFETY: Handle is valid and we are the owner of the handle.
let handle = unsafe { Owned::new(handle) };
// CreateSemaphoreW returns a valid handle on success.
Ok(Self {
// See `unsafe impl Send` comment.
// TODO(@CBenoit): Verify this comment.
#[allow(clippy::arc_with_non_send_sync)]
handle: Arc::new(handle),
})
}
fn raw(&self) -> HANDLE {
**self.handle
}
pub(crate) fn borrow(&self) -> BorrowedHandle<'_> {
BorrowedHandle(&self.handle)
}
pub(crate) fn release(&self, release_count: u16) -> Result<u32, WindowsError> {
let release_count = i32::from(release_count);
if release_count == 0 {
// semaphore release count must be greater than 0
return Err(WindowsError::InvalidSemaphoreParams(
"release_count must be greater than 0",
));
}
let mut previous_count = 0;
// SAFETY: All parameters are checked for validity above:
// - release_count > 0.
// - lpPreviousCount points to valid stack memory.
// - handle is valid and owned by this struct.
unsafe {
ReleaseSemaphore(self.raw(), release_count, Some(&mut previous_count))
.map_err(WindowsError::ReleaseSemaphore)?;
}
Ok(previous_count.try_into().expect("semaphore count is negative"))
}
}

View file

@ -68,6 +68,10 @@ impl DrdynvcClient {
self.dynamic_channels.get_by_type_id(TypeId::of::<T>())
}
pub fn get_dvc_by_channel_id(&self, channel_id: u32) -> Option<&DynamicVirtualChannel> {
self.dynamic_channels.get_by_channel_id(channel_id)
}
fn create_capabilities_response(&mut self) -> SvcMessage {
let caps_response = DrdynvcClientPdu::Capabilities(CapabilitiesResponsePdu::new(CapsVersion::V1));
debug!("Send DVC Capabilities Response PDU: {caps_response:?}");
@ -141,7 +145,7 @@ impl SvcProcessor for DrdynvcClient {
}
DrdynvcServerPdu::Close(close_request) => {
debug!("Got DVC Close Request PDU: {close_request:?}");
self.dynamic_channels.remove_by_channel_id(&close_request.channel_id);
self.dynamic_channels.remove_by_channel_id(close_request.channel_id);
let close_response = DrdynvcClientPdu::Close(ClosePdu::new(close_request.channel_id));
@ -153,7 +157,7 @@ impl SvcProcessor for DrdynvcClient {
let messages = self
.dynamic_channels
.get_by_channel_id_mut(&channel_id)
.get_by_channel_id_mut(channel_id)
.ok_or_else(|| pdu_other_err!("access to non existing DVC channel"))?
.process(data)?;

View file

@ -202,14 +202,20 @@ impl DynamicChannelSet {
self.channels.get_mut(name)
}
fn get_by_channel_id_mut(&mut self, id: &DynamicChannelId) -> Option<&mut DynamicVirtualChannel> {
fn get_by_channel_id(&self, id: DynamicChannelId) -> Option<&DynamicVirtualChannel> {
self.channel_id_to_name
.get(id)
.get(&id)
.and_then(|name| self.channels.get(name))
}
fn get_by_channel_id_mut(&mut self, id: DynamicChannelId) -> Option<&mut DynamicVirtualChannel> {
self.channel_id_to_name
.get(&id)
.and_then(|name| self.channels.get_mut(name))
}
fn remove_by_channel_id(&mut self, id: &DynamicChannelId) -> Option<DynamicChannelId> {
if let Some(name) = self.channel_id_to_name.remove(id) {
fn remove_by_channel_id(&mut self, id: DynamicChannelId) -> Option<DynamicChannelId> {
if let Some(name) = self.channel_id_to_name.remove(&id) {
return self.name_to_channel_id.remove(&name);
// Channels are retained in the `self.channels` and `self.type_id_to_name` map to allow potential
// dynamic re-addition by the server.

View file

@ -10,7 +10,7 @@ use ironrdp_pdu::geometry::InclusiveRectangle;
use ironrdp_pdu::input::fast_path::{FastPathInput, FastPathInputEvent};
use ironrdp_pdu::rdp::headers::ShareDataPdu;
use ironrdp_pdu::{mcs, Action};
use ironrdp_svc::{SvcProcessor, SvcProcessorMessages};
use ironrdp_svc::{SvcMessage, SvcProcessor, SvcProcessorMessages};
use crate::fast_path::UpdateKind;
use crate::image::DecodedImage;
@ -187,6 +187,10 @@ impl ActiveStage {
self.x224_processor.get_dvc::<T>()
}
pub fn get_dvc_by_channel_id(&mut self, channel_id: u32) -> Option<&DynamicVirtualChannel> {
self.x224_processor.get_dvc_by_channel_id(channel_id)
}
/// Completes user's SVC request with data, required to sent it over the network and returns
/// a buffer with encoded data.
pub fn process_svc_processor_messages<C: SvcProcessor + 'static>(
@ -245,6 +249,10 @@ impl ActiveStage {
None
}
pub fn encode_dvc_messages(&mut self, messages: Vec<SvcMessage>) -> SessionResult<Vec<u8>> {
self.process_svc_processor_messages(SvcProcessorMessages::<DrdynvcClient>::new(messages))
}
}
#[derive(Debug)]

View file

@ -88,6 +88,11 @@ impl Processor {
self.get_svc_processor::<DrdynvcClient>()?.get_dvc_by_type_id::<T>()
}
pub fn get_dvc_by_channel_id(&self, channel_id: u32) -> Option<&DynamicVirtualChannel> {
self.get_svc_processor::<DrdynvcClient>()?
.get_dvc_by_channel_id(channel_id)
}
/// Processes a received PDU. Returns a vector of [`ProcessorOutput`] that must be processed
/// in the returned order.
pub fn process(&mut self, frame: &[u8]) -> SessionResult<Vec<ProcessorOutput>> {

View file

@ -83,6 +83,15 @@ pub struct SvcMessage {
flags: ChannelFlags,
}
impl fmt::Debug for SvcMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SvcMessage")
.field("pdu", &self.pdu.name())
.field("flags", &self.flags)
.finish()
}
}
impl SvcMessage {
/// Adds additional SVC header flags to the message.
#[must_use]