feat(dvc): make dvc named pipe proxy cross-platform (#896)
Some checks are pending
CI / Check typos (push) Waiting to run
CI / Success (push) Blocked by required conditions
CI / Check formatting (push) Waiting to run
CI / Checks [linux] (push) Blocked by required conditions
CI / Checks [macos] (push) Blocked by required conditions
CI / Checks [windows] (push) Blocked by required conditions
CI / Fuzzing (push) Blocked by required conditions
CI / Web Client (push) Blocked by required conditions
CI / FFI (push) Blocked by required conditions
Coverage / Coverage Report (push) Waiting to run
Release crates / Open release PR (push) Waiting to run
Release crates / Release crates (push) Waiting to run

### Changes
- Make dvc named pipe proxy cross-platform (Unix implementation via
`tokio::net::unix::UnixStream`)
- Removed unsafe code for Windows implementation, switched to
`tokio::net::windows::named_pipe`

### Testing
This feature can be used in the [same
way](https://github.com/Devolutions/IronRDP/pull/791) as on Windows,
however instead of GUI test app there is new basic
[CLI](https://github.com/Devolutions/now-proto/pull/31) app
This commit is contained in:
Vladyslav Nikonov 2025-08-04 14:56:02 +00:00 committed by GitHub
parent 4df5dd8762
commit 166b76010c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 455 additions and 1017 deletions

4
Cargo.lock generated
View file

@ -2533,13 +2533,13 @@ dependencies = [
name = "ironrdp-dvc-pipe-proxy"
version = "0.1.0"
dependencies = [
"async-trait",
"ironrdp-core",
"ironrdp-dvc",
"ironrdp-pdu",
"ironrdp-svc",
"tokio",
"tracing",
"widestring",
"windows 0.61.3",
]
[[package]]

View file

@ -22,17 +22,8 @@ ironrdp-dvc = { path = "../ironrdp-dvc", version = "0.3" }
ironrdp-svc = { path = "../ironrdp-svc", version = "0.4" } # public (SvcMessage type)
tracing = { version = "0.1", features = ["log"] }
[target.'cfg(windows)'.dependencies]
widestring = "1"
windows = { version = "0.61", features = [
"Win32_Foundation",
"Win32_Security",
"Win32_System_Threading",
"Win32_Storage_FileSystem",
"Win32_System_Pipes",
"Win32_System_IO",
] }
tokio = { version = "1", features = ["net", "rt", "sync", "macros", "io-util"]}
async-trait = "0.1"
[lints]
workspace = true

View file

@ -0,0 +1,23 @@
#[derive(Debug)]
pub(crate) enum DvcPipeProxyError {
Io(std::io::Error),
EncodeDvcMessage(ironrdp_core::EncodeError),
}
impl core::fmt::Display for DvcPipeProxyError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
DvcPipeProxyError::Io(_) => write!(f, "IO error"),
DvcPipeProxyError::EncodeDvcMessage(_) => write!(f, "DVC message encoding error"),
}
}
}
impl core::error::Error for DvcPipeProxyError {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match self {
DvcPipeProxyError::Io(err) => Some(err),
DvcPipeProxyError::EncodeDvcMessage(src) => Some(src),
}
}
}

View file

@ -4,8 +4,11 @@
#[macro_use]
extern crate tracing;
#[cfg(target_os = "windows")]
mod windows;
mod error;
mod message;
mod os_pipe;
mod platform;
pub use self::platform::DvcNamedPipeProxy;
mod proxy;
mod worker;
pub use self::proxy::DvcNamedPipeProxy;

View file

@ -0,0 +1,22 @@
use ironrdp_core::{ensure_size, Encode, EncodeResult};
use ironrdp_dvc::DvcEncode;
pub(crate) struct RawDataDvcMessage(pub Vec<u8>);
impl Encode for RawDataDvcMessage {
fn encode(&self, dst: &mut ironrdp_core::WriteCursor<'_>) -> EncodeResult<()> {
ensure_size!(in: dst, size: self.size());
dst.write_slice(&self.0);
Ok(())
}
fn name(&self) -> &'static str {
"RawDataDvcMessage"
}
fn size(&self) -> usize {
self.0.len()
}
}
impl DvcEncode for RawDataDvcMessage {}

View file

@ -0,0 +1,19 @@
use async_trait::async_trait;
use crate::error::DvcPipeProxyError;
#[async_trait]
pub(crate) trait OsPipe: Send + Sync {
/// Creates a new OS pipe and waits for the connection.
async fn connect(pipe_name: &str) -> Result<Self, DvcPipeProxyError>
where
Self: Sized;
/// Reads data from the pipe and returns the number of bytes read.
///
/// Returned future should be stateless and can be polled multiple times.
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, DvcPipeProxyError>;
/// Writes data to the pipe and returns the number of bytes written.
async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError>;
}

View file

@ -1,9 +1,5 @@
#[cfg(target_os = "windows")]
mod windows;
#[cfg(target_os = "windows")]
pub use self::windows::DvcNamedPipeProxy;
pub(crate) mod windows;
#[cfg(not(target_os = "windows"))]
mod unix;
#[cfg(not(target_os = "windows"))]
pub use self::unix::DvcNamedPipeProxy;
pub(crate) mod unix;

View file

@ -1,48 +1,66 @@
use ironrdp_core::impl_as_any;
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
use ironrdp_pdu::{pdu_other_err, PduResult};
use ironrdp_svc::SvcMessage;
use async_trait::async_trait;
use tokio::fs;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server.
pub struct DvcNamedPipeProxy {
channel_name: String,
use crate::error::DvcPipeProxyError;
use crate::os_pipe::OsPipe;
/// Unix-specific implementation of the OS pipe trait.
pub(crate) struct UnixPipe {
socket: tokio::net::UnixStream,
}
impl DvcNamedPipeProxy {
/// Creates a new DVC named pipe proxy.
/// `dvc_write_callback` is called when the proxy receives a DVC message from the
/// named pipe server and the SVC message is ready to be sent to the DVC channel in the main
/// IronRDP active session loop.
pub fn new<F>(channel_name: &str, _named_pipe_name: &str, _dvc_write_callback: F) -> Self
where
F: Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send + 'static,
{
error!("DvcNamedPipeProxy is not implemented on Unix-like systems, using a stub implementation");
#[async_trait]
impl OsPipe for UnixPipe {
async fn connect(pipe_name: &str) -> Result<Self, DvcPipeProxyError> {
// Domain socket file could already exist from a previous run.
match fs::metadata(&pipe_name).await {
Ok(metadata) => {
use std::os::unix::fs::FileTypeExt as _;
Self {
channel_name: channel_name.to_owned(),
info!(
%pipe_name,
"DVC pipe already exists, removing stale file."
);
// Just to be sure, check if it's indeed a socket -
// throw an error if calling code accidentally passed a regular file.
if !metadata.file_type().is_socket() {
return Err(DvcPipeProxyError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Path {pipe_name} is not a socket"),
)));
}
fs::remove_file(pipe_name).await.map_err(DvcPipeProxyError::Io)?;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
trace!(
%pipe_name,
"DVC pipe does not exist, creating it."
);
}
Err(e) => {
return Err(DvcPipeProxyError::Io(e));
}
}
let listener = tokio::net::UnixListener::bind(pipe_name).map_err(DvcPipeProxyError::Io)?;
let (socket, _) = listener.accept().await.map_err(DvcPipeProxyError::Io)?;
Ok(Self { socket })
}
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, DvcPipeProxyError> {
self.socket.read(buffer).await.map_err(DvcPipeProxyError::Io)
}
async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError> {
self.socket
.write_all(buffer)
.await
.map_err(DvcPipeProxyError::Io)
.map(|_| ())
}
}
impl_as_any!(DvcNamedPipeProxy);
impl DvcProcessor for DvcNamedPipeProxy {
fn channel_name(&self) -> &str {
&self.channel_name
}
fn start(&mut self, _channel_id: u32) -> PduResult<Vec<DvcMessage>> {
Err(pdu_other_err!(
"DvcNamedPipeProxy is not implemented on Unix-like systems"
))
}
fn process(&mut self, _channel_id: u32, _payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
Err(pdu_other_err!(
"DvcNamedPipeProxy is not implemented on Unix-like systems"
))
}
}
impl DvcClientProcessor for DvcNamedPipeProxy {}

View file

@ -0,0 +1,47 @@
use async_trait::async_trait;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio::net::windows::named_pipe;
use crate::error::DvcPipeProxyError;
use crate::os_pipe::OsPipe;
const PIPE_BUFFER_SIZE: u32 = 64 * 1024;
/// Unix-specific implementation of the OS pipe trait.
pub(crate) struct WindowsPipe {
pipe_server: named_pipe::NamedPipeServer,
}
#[async_trait]
impl OsPipe for WindowsPipe {
async fn connect(pipe_name: &str) -> Result<Self, DvcPipeProxyError> {
let pipe_name = format!("\\\\.\\pipe\\{pipe_name}");
let pipe_server = named_pipe::ServerOptions::new()
.first_pipe_instance(true)
.access_inbound(true)
.access_outbound(true)
.max_instances(2)
.in_buffer_size(PIPE_BUFFER_SIZE)
.out_buffer_size(PIPE_BUFFER_SIZE)
.pipe_mode(named_pipe::PipeMode::Message)
.create(pipe_name)
.map_err(DvcPipeProxyError::Io)?;
pipe_server.connect().await.map_err(DvcPipeProxyError::Io)?;
Ok(Self { pipe_server })
}
async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, DvcPipeProxyError> {
self.pipe_server.read(buffer).await.map_err(DvcPipeProxyError::Io)
}
async fn write_all(&mut self, buffer: &[u8]) -> Result<(), DvcPipeProxyError> {
self.pipe_server
.write_all(buffer)
.await
.map_err(DvcPipeProxyError::Io)
.map(|_| ())
}
}

View file

@ -1,37 +0,0 @@
use crate::windows::WindowsError;
#[derive(Debug)]
pub(crate) enum DvcPipeProxyError {
Windows(WindowsError),
MpscIo,
DvcIncompleteWrite,
EncodeDvcMessage,
}
impl From<WindowsError> for DvcPipeProxyError {
fn from(err: WindowsError) -> Self {
DvcPipeProxyError::Windows(err)
}
}
impl core::fmt::Display for DvcPipeProxyError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
DvcPipeProxyError::Windows(err) => err.fmt(f),
DvcPipeProxyError::MpscIo => write!(f, "MPSC IO error"),
DvcPipeProxyError::DvcIncompleteWrite => write!(f, "DVC incomplete write"),
DvcPipeProxyError::EncodeDvcMessage => write!(f, "DVC message encoding error"),
}
}
}
impl core::error::Error for DvcPipeProxyError {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match self {
DvcPipeProxyError::Windows(err) => Some(err),
DvcPipeProxyError::MpscIo => None,
DvcPipeProxyError::DvcIncompleteWrite => None,
DvcPipeProxyError::EncodeDvcMessage => None,
}
}
}

View file

@ -1,149 +0,0 @@
mod error;
mod worker;
use std::sync::mpsc;
use ironrdp_core::impl_as_any;
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
use ironrdp_pdu::{pdu_other_err, PduResult};
use ironrdp_svc::SvcMessage;
use crate::platform::windows::error::DvcPipeProxyError;
use crate::platform::windows::worker::{worker_thread_func, OnWriteDvcMessage, WorkerCtx};
use crate::windows::{Event, MessagePipeServer, Semaphore};
const IO_MPSC_CHANNEL_SIZE: usize = 100;
struct WorkerControlCtx {
to_pipe_tx: mpsc::SyncSender<Vec<u8>>,
to_pipe_semaphore: Semaphore,
abort_event: Event,
}
/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server.
pub struct DvcNamedPipeProxy {
channel_name: String,
named_pipe_name: String,
dvc_write_callback: Option<OnWriteDvcMessage>,
worker_control_ctx: Option<WorkerControlCtx>,
}
impl DvcNamedPipeProxy {
/// Creates a new DVC named pipe proxy.
/// `dvc_write_callback` is called when the proxy receives a DVC message from the
/// named pipe server and the SVC message is ready to be sent to the DVC channel in the main
/// IronRDP active session loop.
pub fn new<F>(channel_name: &str, named_pipe_name: &str, dvc_write_callback: F) -> Self
where
F: Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send + 'static,
{
let named_pipe_name = format!("\\\\.\\pipe\\{named_pipe_name}");
Self {
channel_name: channel_name.to_owned(),
named_pipe_name,
dvc_write_callback: Some(Box::new(dvc_write_callback)),
worker_control_ctx: None,
}
}
}
impl_as_any!(DvcNamedPipeProxy);
impl Drop for DvcNamedPipeProxy {
fn drop(&mut self) {
if let Some(ctx) = &self.worker_control_ctx {
// Signal the worker thread to abort.
ctx.abort_event.set().ok();
}
}
}
impl DvcNamedPipeProxy {
fn start_impl(&mut self, channel_id: u32) -> Result<(), DvcPipeProxyError> {
// PIPE -> DVC channel - handled via callback passed to the constructor.
// DVC -> PIPE channel - handled via mpsc internally in the worker thread.
let (to_pipe_tx, to_pipe_rx) = mpsc::sync_channel(IO_MPSC_CHANNEL_SIZE);
let semaphore_max_count = IO_MPSC_CHANNEL_SIZE
.try_into()
.expect("Channel size is too large for underlying WinAPI semaphore");
let to_pipe_semaphore = Semaphore::new_unnamed(0, semaphore_max_count)?;
let abort_event = Event::new_unnamed()?;
let worker_control_ctx = WorkerControlCtx {
to_pipe_tx,
to_pipe_semaphore: to_pipe_semaphore.clone(),
abort_event: abort_event.clone(),
};
let pipe = MessagePipeServer::new(&self.named_pipe_name)?;
let dvc_write_callback = self
.dvc_write_callback
.take()
.expect("DVC write callback already taken");
let worker_ctx = WorkerCtx {
pipe,
to_pipe_rx,
to_pipe_semaphore,
abort_event,
dvc_write_callback,
pipe_name: self.named_pipe_name.clone(),
channel_name: self.channel_name.clone(),
channel_id,
};
let pipe_name = self.named_pipe_name.clone();
let channel_name = self.channel_name.clone();
self.worker_control_ctx = Some(worker_control_ctx);
std::thread::spawn(move || {
if let Err(error) = worker_thread_func(worker_ctx) {
error!(%error, %pipe_name, %channel_name, "DVC pipe proxy worker thread failed");
}
});
Ok(())
}
}
impl DvcProcessor for DvcNamedPipeProxy {
fn channel_name(&self) -> &str {
&self.channel_name
}
fn start(&mut self, channel_id: u32) -> PduResult<Vec<DvcMessage>> {
self.start_impl(channel_id)
.map_err(|e| pdu_other_err!("dvc named pipe proxy failed", source: e))?;
Ok(Vec::new())
}
fn process(&mut self, _channel_id: u32, payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
// Send the payload to the worker thread via the mpsc channel.
let ctx = match &self.worker_control_ctx {
Some(ctx) => ctx,
None => {
return Err(pdu_other_err!("DVC pipe proxy not started"));
}
};
ctx.to_pipe_tx
.send(payload.to_vec())
.map_err(|_| pdu_other_err!("DVC pipe proxy send failed"))?;
// Signal WinAPI-based worker IO loop.
ctx.to_pipe_semaphore
.release(1)
.map_err(|_| pdu_other_err!("DVC pipe proxy semaphore release failed"))?;
Ok(Vec::new())
}
}
impl DvcClientProcessor for DvcNamedPipeProxy {}

View file

@ -1,172 +0,0 @@
use std::sync::mpsc;
use ironrdp_core::{ensure_size, Encode, EncodeResult};
use ironrdp_dvc::{encode_dvc_messages, DvcEncode};
use ironrdp_pdu::PduResult;
use ironrdp_svc::{ChannelFlags, SvcMessage};
use crate::platform::windows::error::DvcPipeProxyError;
use crate::windows::{wait_any, wait_any_with_timeout, Event, MessagePipeServer, Semaphore, WindowsError};
const PIPE_CONNECT_TIMEOUT_SECS: u32 = 10_000; // 10 seconds
const PIPE_WRITE_TIMEOUT_SECS: u32 = 3_000; // 3 seconds
const MESSAGE_BUFFER_SIZE: usize = 64 * 1024; // 64 KiB
pub(crate) type OnWriteDvcMessage = Box<dyn Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send>;
pub(crate) struct WorkerCtx {
pub pipe: MessagePipeServer,
pub to_pipe_rx: mpsc::Receiver<Vec<u8>>,
pub to_pipe_semaphore: Semaphore,
pub abort_event: Event,
pub dvc_write_callback: OnWriteDvcMessage,
pub pipe_name: String,
pub channel_name: String,
pub channel_id: u32,
}
pub(crate) fn worker_thread_func(worker_ctx: WorkerCtx) -> Result<(), DvcPipeProxyError> {
let WorkerCtx {
mut pipe,
to_pipe_rx,
to_pipe_semaphore,
abort_event,
dvc_write_callback,
pipe_name,
channel_name,
channel_id,
} = worker_ctx;
info!(%channel_name, %pipe_name, "Connecting DVC pipe proxy");
{
let mut connect_ctx = pipe.prepare_connect_overlapped()?;
if !connect_ctx.overlapped_connect()? {
const EVENT_ID_ABORT: usize = 0;
let events = [abort_event.borrow(), connect_ctx.borrow_event()];
let wait_result = match wait_any_with_timeout(&events, PIPE_CONNECT_TIMEOUT_SECS) {
Ok(idx) => idx,
Err(WindowsError::WaitForMultipleObjectsTimeout) => {
warn!(%channel_name, %pipe_name, "DVC pipe proxy connection timed out");
return Ok(());
}
Err(err) => {
return Err(DvcPipeProxyError::Windows(err));
}
};
if wait_result == EVENT_ID_ABORT {
info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted");
return Ok(());
}
connect_ctx.get_result()?;
}
}
info!(%channel_name, %pipe_name, "DVC pipe proxy connected");
let mut read_ctx = pipe.prepare_read_overlapped(MESSAGE_BUFFER_SIZE)?;
const EVENT_ID_ABORT: usize = 0;
const EVENT_ID_READ: usize = 1;
const EVENT_ID_WRITE_MPSC: usize = 2;
read_ctx.overlapped_read()?;
info!(%channel_name, %pipe_name, "DVC pipe proxy IO loop started");
loop {
let events = [
abort_event.borrow(),
read_ctx.borrow_event(),
to_pipe_semaphore.borrow(),
];
let wait_result = wait_any(&events)?;
if wait_result == EVENT_ID_ABORT {
info!(%channel_name, %pipe_name, "DVC pipe proxy connection has been aborted");
return Ok(());
}
// Read end of pipe is ready, forward received data to DVC.
if wait_result == EVENT_ID_READ {
let read_result = read_ctx.get_result()?.to_vec();
trace!(%channel_name, %pipe_name, "DVC proxy read {} bytes from pipe", read_result.len());
if !read_result.is_empty() {
let messages = encode_dvc_messages(
channel_id,
vec![Box::new(RawDataDvcMessage(read_result))],
ChannelFlags::empty(),
)
.map_err(|_| DvcPipeProxyError::EncodeDvcMessage)?;
if let Err(err) = dvc_write_callback(0, messages) {
error!(%err, %channel_name, %pipe_name, "DVC pipe proxy write callback failed");
}
}
// Queue the read operation again.
read_ctx.overlapped_read()?;
continue;
}
// DVC data received, forward it to the pipe.
if wait_result == EVENT_ID_WRITE_MPSC {
let payload = to_pipe_rx.recv().map_err(|_| DvcPipeProxyError::MpscIo)?;
let payload_len = payload.len();
if payload_len == 0 {
warn!(%channel_name, %pipe_name, "Rejected empty DVC data (not sent to pipe)");
continue;
}
trace!(%channel_name, %pipe_name, "DVC proxy write {} bytes to pipe,", payload_len);
let mut overlapped_write = pipe.prepare_write_overlapped(payload)?;
overlapped_write.overlapped_write()?;
let events = [abort_event.borrow(), overlapped_write.borrow_event()];
let wait_result = wait_any_with_timeout(&events, PIPE_WRITE_TIMEOUT_SECS)?;
if wait_result == EVENT_ID_ABORT {
info!(%channel_name, %pipe_name, "DVC pipe proxy write aborted");
return Ok(());
}
let bytes_written = overlapped_write.get_result()?;
if bytes_written as usize != payload_len {
// Message-based pipe write failed.
return Err(DvcPipeProxyError::DvcIncompleteWrite);
}
continue;
}
}
}
struct RawDataDvcMessage(Vec<u8>);
impl Encode for RawDataDvcMessage {
fn encode(&self, dst: &mut ironrdp_core::WriteCursor<'_>) -> EncodeResult<()> {
ensure_size!(in: dst, size: self.size());
dst.write_slice(&self.0);
Ok(())
}
fn name(&self) -> &'static str {
"RawDataDvcMessage"
}
fn size(&self) -> usize {
self.0.len()
}
}
impl DvcEncode for RawDataDvcMessage {}

View file

@ -0,0 +1,115 @@
use std::sync::Arc;
use ironrdp_core::impl_as_any;
use ironrdp_dvc::{DvcClientProcessor, DvcMessage, DvcProcessor};
use ironrdp_pdu::{pdu_other_err, PduResult};
use ironrdp_svc::SvcMessage;
use crate::worker::{run_worker, OnWriteDvcMessage, WorkerCtx};
const IO_MPSC_CHANNEL_SIZE: usize = 100;
struct WorkerControlCtx {
to_pipe_tx: tokio::sync::mpsc::Sender<Vec<u8>>,
abort_event: Arc<tokio::sync::Notify>,
}
/// A proxy DVC pipe client that forwards DVC messages to/from a named pipe server.
pub struct DvcNamedPipeProxy {
channel_name: String,
named_pipe_name: String,
dvc_write_callback: Option<OnWriteDvcMessage>,
worker: Option<WorkerControlCtx>,
}
impl DvcNamedPipeProxy {
/// Creates a new DVC named pipe proxy.
/// `dvc_write_callback` is called when the proxy receives a DVC message from the
/// named pipe server and the SVC message is ready to be sent to the DVC channel in the main
/// IronRDP active session loop.
pub fn new<F>(channel_name: &str, named_pipe_name: &str, dvc_write_callback: F) -> Self
where
F: Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send + 'static,
{
Self {
channel_name: channel_name.to_owned(),
named_pipe_name: named_pipe_name.to_owned(),
dvc_write_callback: Some(Box::new(dvc_write_callback)),
worker: None,
}
}
}
impl_as_any!(DvcNamedPipeProxy);
impl DvcProcessor for DvcNamedPipeProxy {
fn channel_name(&self) -> &str {
&self.channel_name
}
fn start(&mut self, channel_id: u32) -> PduResult<Vec<DvcMessage>> {
info!(%self.channel_name, %self.named_pipe_name, "Starting DVC named pipe proxy");
let on_write_dvc = self
.dvc_write_callback
.take()
.expect("DvcProcessor::start called multiple times");
let (to_pipe_tx, to_pipe_rx) = tokio::sync::mpsc::channel(IO_MPSC_CHANNEL_SIZE);
let abort_event = Arc::new(tokio::sync::Notify::new());
let ctx = WorkerCtx {
on_write_dvc,
to_pipe_rx,
abort_event: Arc::clone(&abort_event),
pipe_name: self.named_pipe_name.clone(),
channel_name: self.channel_name.clone(),
channel_id,
};
self.worker = Some(WorkerControlCtx {
to_pipe_tx,
abort_event,
});
#[cfg(not(target_os = "windows"))]
run_worker::<crate::platform::unix::UnixPipe>(ctx);
#[cfg(target_os = "windows")]
run_worker::<crate::platform::windows::WindowsPipe>(ctx);
Ok(vec![])
}
fn process(&mut self, _channel_id: u32, payload: &[u8]) -> PduResult<Vec<DvcMessage>> {
if let Some(worker) = &self.worker {
if let Err(error) = worker.to_pipe_tx.try_send(payload.to_vec()) {
match error {
tokio::sync::mpsc::error::TrySendError::Full(_) => {
return Err(pdu_other_err!("DVC pipe proxy channel is full"));
}
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
return Err(pdu_other_err!("DVC pipe proxy channel is closed"));
}
}
}
} else {
debug!("Attempt to process DVC packet on non-initialized DVC pipe proxy.");
}
Ok(vec![])
}
}
impl DvcClientProcessor for DvcNamedPipeProxy {}
impl Drop for DvcNamedPipeProxy {
fn drop(&mut self) {
if let Some(ctx) = &self.worker {
// Signal the worker thread to abort.
ctx.abort_event.notify_one();
}
self.worker = None;
}
}

View file

@ -1,58 +0,0 @@
#[derive(Debug)]
pub(crate) enum WindowsError {
CreateNamedPipe(windows::core::Error),
CreateEvent(windows::core::Error),
SetEvent(windows::core::Error),
ReleaseSemaphore(windows::core::Error),
InvalidSemaphoreParams(&'static str),
WaitForMultipleObjectsFailed(windows::core::Error),
WaitForMultipleObjectsTimeout,
WaitForMultipleObjectsAbandoned(u32),
OverlappedConnect(windows::core::Error),
OverlappedRead(windows::core::Error),
OverlappedWrite(windows::core::Error),
CreateSemaphore(windows::core::Error),
InvalidPipeName(String),
}
impl core::fmt::Display for WindowsError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
WindowsError::CreateNamedPipe(_) => write!(f, "failed to create named pipe"),
WindowsError::CreateEvent(_) => write!(f, "failed to create event object"),
WindowsError::SetEvent(_) => write!(f, "failed to set event to signaled state"),
WindowsError::InvalidSemaphoreParams(cause) => write!(f, "invalid semaphore parameters: {cause}"),
WindowsError::ReleaseSemaphore(_) => write!(f, "failed to release semaphore"),
WindowsError::WaitForMultipleObjectsFailed(_) => write!(f, "failed to wait for multiple objects"),
WindowsError::WaitForMultipleObjectsTimeout => write!(f, "timed out waiting for multiple objects"),
WindowsError::WaitForMultipleObjectsAbandoned(idx) => {
write!(f, "wait for multiple objects failed, handle #{idx} was abandoned")
}
WindowsError::OverlappedConnect(_) => write!(f, "overlapped connect failed"),
WindowsError::OverlappedRead(_) => write!(f, "overlapped read failed"),
WindowsError::OverlappedWrite(_) => write!(f, "overlapped write failed"),
WindowsError::CreateSemaphore(_) => write!(f, "failed to create semaphore object"),
WindowsError::InvalidPipeName(cause) => write!(f, "invalid pipe name: `{cause}`"),
}
}
}
impl core::error::Error for WindowsError {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match self {
WindowsError::CreateNamedPipe(err)
| WindowsError::SetEvent(err)
| WindowsError::ReleaseSemaphore(err)
| WindowsError::WaitForMultipleObjectsFailed(err)
| WindowsError::OverlappedConnect(err)
| WindowsError::OverlappedRead(err)
| WindowsError::OverlappedWrite(err)
| WindowsError::CreateSemaphore(err) => Some(err),
WindowsError::CreateEvent(err) => Some(err),
WindowsError::InvalidSemaphoreParams(_)
| WindowsError::WaitForMultipleObjectsTimeout
| WindowsError::InvalidPipeName(_) => None,
WindowsError::WaitForMultipleObjectsAbandoned(_) => None,
}
}
}

View file

@ -1,53 +0,0 @@
use std::sync::Arc;
use windows::core::Owned;
use windows::Win32::Foundation::HANDLE;
use windows::Win32::System::Threading::{CreateEventW, SetEvent};
use crate::windows::{BorrowedHandle, WindowsError};
/// RAII wrapper for WinAPI event handle.
#[derive(Debug, Clone)]
pub(crate) struct Event {
handle: Arc<Owned<HANDLE>>,
}
// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via
// Event wrapper API itself by restricting handle usage:
// - set() method which calls SetEvent inside (which is thread-safe).
// - borrow() method which returns a BorrowedHandle for waiting on the event.
// - Handle lifetime is ensured by Arc, so it is always valid when used.
unsafe impl Send for Event {}
impl Event {
pub(crate) fn new_unnamed() -> Result<Self, WindowsError> {
// SAFETY: FFI call with no outstanding preconditions.
let handle = unsafe { CreateEventW(None, false, false, None).map_err(WindowsError::CreateEvent)? };
// SAFETY: Handle is valid and we are the owner of the handle.
let handle = unsafe { Owned::new(handle) };
// CreateEventW returns a valid handle on success.
Ok(Self {
// See `unsafe impl Send` comment.
#[expect(clippy::arc_with_non_send_sync)]
handle: Arc::new(handle),
})
}
pub(crate) fn set(&self) -> Result<(), WindowsError> {
// SAFETY: The handle is valid and we are the owner of the handle.
unsafe {
SetEvent(self.raw()).map_err(WindowsError::SetEvent)?;
}
Ok(())
}
pub(super) fn raw(&self) -> HANDLE {
**self.handle
}
pub(crate) fn borrow(&self) -> BorrowedHandle<'_> {
BorrowedHandle(&self.handle)
}
}

View file

@ -1,101 +0,0 @@
//! WinAPI wrappers for the DVC pipe proxy IO loop logic.
//!
//! Some of the wrappers are based on `win-api-wrappers` code (simplified/reduced functionality).
use windows::Win32::Foundation::{
ERROR_IO_PENDING, HANDLE, WAIT_ABANDONED_0, WAIT_EVENT, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT,
};
use windows::Win32::System::Threading::{WaitForMultipleObjects, INFINITE};
mod error;
pub(crate) use self::error::WindowsError;
mod event;
pub(crate) use self::event::Event;
mod pipe;
pub(crate) use self::pipe::MessagePipeServer;
mod semaphore;
pub(crate) use self::semaphore::Semaphore;
/// Thin wrapper around borrowed `windows` crate `HANDLE` reference.
/// This is used to ensure handle lifetime when passing it to FFI functions
/// (see `wait_any_with_timeout` for example).
#[repr(transparent)]
pub(crate) struct BorrowedHandle<'a>(&'a HANDLE);
/// Safe wrapper around `WaitForMultipleObjects`.
pub(crate) fn wait_any_with_timeout(handles: &[BorrowedHandle<'_>], timeout: u32) -> Result<usize, WindowsError> {
let handles = cast_handles(handles);
// SAFETY:
// - BorrowedHandle alongside with rust type system ensures that the HANDLEs are valid for
// the duration of the call.
// - All handles in this module have SYNCHRONIZE access rights.
// - cast_handles ensures no handle duplicates.
let result = unsafe { WaitForMultipleObjects(handles, false, timeout) };
match result {
WAIT_FAILED => Err(WindowsError::WaitForMultipleObjectsFailed(
windows::core::Error::from_win32(),
)),
WAIT_TIMEOUT => Err(WindowsError::WaitForMultipleObjectsTimeout),
WAIT_EVENT(idx) if idx >= WAIT_ABANDONED_0.0 => {
let idx = idx - WAIT_ABANDONED_0.0;
Err(WindowsError::WaitForMultipleObjectsAbandoned(idx))
}
WAIT_EVENT(id) => Ok((id - WAIT_OBJECT_0.0) as usize),
}
}
/// Safe `WaitForMultipleObjects` wrapper with infinite timeout.
pub(crate) fn wait_any(handles: &[BorrowedHandle<'_>]) -> Result<usize, WindowsError> {
// Standard generic syntax is used instead if `impl` because of the following lint:
// > warning: lifetime parameter `'a` only used once
//
// Fixing this lint (use of '_ lifetime) produces compiler error.
wait_any_with_timeout(handles, INFINITE)
}
fn cast_handles<'a>(handles: &'a [BorrowedHandle<'a>]) -> &'a [HANDLE] {
// Very basic sanity checks to ensure that the handles are valid
// and there are no duplicates.
// This is only done in debug builds to avoid performance overhead in release builds, while
// still catching undefined behavior early in development.
#[cfg(debug_assertions)]
{
// Ensure that there are no duplicate handles without hash.
for (i, handle) in handles.iter().enumerate() {
for other_handle in &handles[i + 1..] {
if handle.0 == other_handle.0 {
panic!("Duplicate handle found in wait_any_with_timeout");
}
}
}
}
for handle in handles {
// Ensure that the handle is valid.
if handle.0.is_invalid() {
panic!("Invalid handle in wait_any_with_timeout");
}
}
// SAFETY:
// - BorrowedHandle is #[repr(transparent)] over *const c_void, and so is HANDLE,
// so the layout is the same.
// - We ensure the lifetime is preserved.
unsafe { core::slice::from_raw_parts(handles.as_ptr() as *const HANDLE, handles.len()) }
}
/// Maps ERROR_IO_PENDING to Ok(()) and returns other errors as is.
fn ensure_overlapped_io_result(result: windows::core::Result<()>) -> Result<windows::core::Result<()>, WindowsError> {
if let Err(error) = &result {
if error.code() == ERROR_IO_PENDING.to_hresult() {
return Ok(Ok(()));
}
}
Ok(result)
}

View file

@ -1,291 +0,0 @@
use core::ops::DerefMut as _;
use core::pin::Pin;
use windows::core::{Owned, PCWSTR};
use windows::Win32::Foundation::{ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, HANDLE};
use windows::Win32::Storage::FileSystem::{
ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
};
use windows::Win32::System::Pipes::{
ConnectNamedPipe, CreateNamedPipeW, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_WAIT,
};
use windows::Win32::System::IO::{GetOverlappedResult, OVERLAPPED};
use crate::windows::{ensure_overlapped_io_result, BorrowedHandle, Event, WindowsError};
const PIPE_INSTANCES: u32 = 2;
const PIPE_BUFFER_SIZE: u32 = 64 * 1024; // 64KB
const DEFAULT_PIPE_TIMEOUT: u32 = 10_000; // 10 seconds
/// RAII wrapper for WinAPI named pipe server.
#[derive(Debug)]
pub(crate) struct MessagePipeServer {
handle: Owned<HANDLE>,
connected: bool,
}
/// SAFETY: It is safe to send pipe HANDLE between threads.
unsafe impl Send for MessagePipeServer {}
impl MessagePipeServer {
/// Creates a new named pipe server.
pub(crate) fn new(name: &str) -> Result<Self, WindowsError> {
// Create a named pipe with the specified name.
let lpname =
widestring::U16CString::from_str(name).map_err(|_| WindowsError::InvalidPipeName(name.to_owned()))?;
// SAFETY: lpname is a valid pointer to a null-terminated wide string.
let handle = unsafe {
CreateNamedPipeW(
PCWSTR(lpname.as_ptr()),
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
PIPE_INSTANCES,
PIPE_BUFFER_SIZE,
PIPE_BUFFER_SIZE,
DEFAULT_PIPE_TIMEOUT,
None,
)
};
// `windows` crate API inconsistency: CreateNamedPipeW returns invalid handle on error
// instead of Result::Err.
if handle.is_invalid() {
return Err(WindowsError::CreateNamedPipe(windows::core::Error::from_win32()));
}
// SAFETY: Handle is valid and we are the owner of the handle.
let handle = unsafe { Owned::new(handle) };
Ok(Self {
handle,
connected: false,
})
}
fn raw(&self) -> HANDLE {
*self.handle
}
/// Initializes context for overlapped connect operation.
pub(crate) fn prepare_connect_overlapped(&mut self) -> Result<OverlappedPipeConnectCtx<'_>, WindowsError> {
OverlappedPipeConnectCtx::new(self)
}
/// Initializes context for overlapped read operation.
pub(crate) fn prepare_read_overlapped(
&self,
buffer_size: usize,
) -> Result<OverlappedPipeReadCtx<'_>, WindowsError> {
OverlappedPipeReadCtx::new(self, buffer_size)
}
/// Initializes context for overlapped write operation.
pub(crate) fn prepare_write_overlapped(&self, data: Vec<u8>) -> Result<OverlappedWriteCtx<'_>, WindowsError> {
OverlappedWriteCtx::new(self, data)
}
}
pub(crate) struct OverlappedPipeConnectCtx<'a> {
pipe: &'a mut MessagePipeServer,
overlapped: Pin<Box<OVERLAPPED>>,
event: Event,
}
impl<'a> OverlappedPipeConnectCtx<'a> {
fn new(pipe: &'a mut MessagePipeServer) -> Result<Self, WindowsError> {
let event = Event::new_unnamed()?;
let overlapped = Box::pin(OVERLAPPED {
hEvent: event.raw(),
..Default::default()
});
Ok(Self {
pipe,
overlapped,
event,
})
}
/// Connects to the named pipe server.
/// Returns true if pipe is already connected prior to this call and no additional
/// overlapped io is needed. If false is returned, the caller should call `get_result()` to
/// after returned event handle is signaled to complete the connection.
pub(crate) fn overlapped_connect(&mut self) -> Result<bool, WindowsError> {
// SAFETY: The handle is valid and we are the owner of the handle.
let result = unsafe { ConnectNamedPipe(self.pipe.raw(), Some(self.overlapped.deref_mut() as *mut OVERLAPPED)) };
match result {
Ok(()) => {
self.pipe.connected = true;
Ok(true)
}
Err(error) => {
if error.code() == ERROR_PIPE_CONNECTED.to_hresult() {
// The pipe is already connected.
self.pipe.connected = true;
Ok(true)
} else if error.code() == ERROR_IO_PENDING.to_hresult() {
// Overlapped I/O is pending.
Ok(false)
} else {
// Connection failed.
Err(WindowsError::OverlappedConnect(error))
}
}
}
}
pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> {
self.event.borrow()
}
pub(crate) fn get_result(&mut self) -> Result<(), WindowsError> {
let mut bytes_read = 0u32;
// SAFETY: The handle is valid and we are the owner of the handle.
unsafe {
GetOverlappedResult(
self.pipe.raw(),
self.overlapped.deref_mut() as *mut OVERLAPPED,
&mut bytes_read as *mut u32,
false,
)
.map_err(WindowsError::OverlappedConnect)?
};
self.pipe.connected = true;
Ok(())
}
}
pub(crate) struct OverlappedPipeReadCtx<'a> {
pipe: &'a MessagePipeServer,
buffer: Vec<u8>,
overlapped: Pin<Box<OVERLAPPED>>,
event: Event,
}
impl<'a> OverlappedPipeReadCtx<'a> {
fn new(pipe: &'a MessagePipeServer, buffer_size: usize) -> Result<Self, WindowsError> {
let event = Event::new_unnamed()?;
let overlapped = Box::pin(OVERLAPPED {
hEvent: event.raw(),
..Default::default()
});
Ok(Self {
pipe,
buffer: vec![0; buffer_size],
overlapped,
event,
})
}
pub(crate) fn overlapped_read(&mut self) -> Result<(), WindowsError> {
// SAFETY: self.pipe.raw() returns a valid handle. The read buffer pointer returned
// by self.buffer.as_mut_slice() is valid and remains alive for the entire duration
// of the overlapped I/O operation. The OVERLAPPED structure is pinned and not moved
// in memory, ensuring its address remains stable until the operation completes.
let result = unsafe {
ReadFile(
self.pipe.raw(),
Some(self.buffer.as_mut_slice()),
None,
Some(self.overlapped.deref_mut() as *mut OVERLAPPED),
)
};
ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedRead)
}
pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> {
self.event.borrow()
}
pub(crate) fn get_result(&mut self) -> Result<&[u8], WindowsError> {
let mut bytes_read = 0u32;
// SAFETY: The handle is valid and we are the owner of the handle.
unsafe {
GetOverlappedResult(
self.pipe.raw(),
self.overlapped.deref_mut() as *mut OVERLAPPED,
&mut bytes_read as *mut u32,
false,
)
.map_err(WindowsError::OverlappedRead)?
};
Ok(&self.buffer[..bytes_read as usize])
}
}
pub(crate) struct OverlappedWriteCtx<'a> {
pipe: &'a MessagePipeServer,
data: Vec<u8>,
overlapped: Pin<Box<OVERLAPPED>>,
event: Event,
}
impl<'a> OverlappedWriteCtx<'a> {
fn new(pipe: &'a MessagePipeServer, data: Vec<u8>) -> Result<Self, WindowsError> {
let event = Event::new_unnamed()?;
let mut overlapped = Box::pin(OVERLAPPED {
hEvent: event.raw(),
..Default::default()
});
// Set write mode to append
overlapped.Anonymous.Anonymous.Offset = 0xFFFFFFFF;
overlapped.Anonymous.Anonymous.OffsetHigh = 0xFFFFFFFF;
Ok(Self {
pipe,
data,
overlapped,
event,
})
}
pub(crate) fn overlapped_write(&mut self) -> Result<(), WindowsError> {
// SAFETY: self.pipe.raw() returns a valid handle. The write buffer pointer (&self.data) is valid
// and remains alive for the entire duration of the overlapped I/O operation. The OVERLAPPED
// structure is pinned and not moved in memory, ensuring its address remains stable until the
// operation completes.
let result = unsafe {
WriteFile(
self.pipe.raw(),
Some(&self.data),
None,
Some(self.overlapped.deref_mut() as *mut OVERLAPPED),
)
};
ensure_overlapped_io_result(result)?.map_err(WindowsError::OverlappedWrite)
}
pub(crate) fn borrow_event(&'a self) -> BorrowedHandle<'a> {
self.event.borrow()
}
pub(crate) fn get_result(&mut self) -> Result<u32, WindowsError> {
let mut bytes_written = 0u32;
// SAFETY: The pipe handle is valid and we are the owner of the handle.
unsafe {
GetOverlappedResult(
self.pipe.raw(),
self.overlapped.deref_mut() as *const OVERLAPPED,
&mut bytes_written as *mut u32,
true,
)
.map_err(WindowsError::OverlappedWrite)?;
};
Ok(bytes_written)
}
}

View file

@ -1,92 +0,0 @@
use std::sync::Arc;
use windows::core::Owned;
use windows::Win32::Foundation::HANDLE;
use windows::Win32::System::Threading::{CreateSemaphoreW, ReleaseSemaphore};
use crate::windows::{BorrowedHandle, WindowsError};
/// RAII wrapper for WinAPI semaphore handle.
#[derive(Debug, Clone)]
pub(crate) struct Semaphore {
handle: Arc<Owned<HANDLE>>,
}
// SAFETY: We ensure that inner handle is indeed could be sent and shared between threads via
// Semaphore wrapper API itself by restricting handle usage:
// - release() method which calls ReleaseSemaphore inside (which is thread-safe).
// - borrow() method which returns a BorrowedHandle for waiting on the semaphore.
// - Handle lifetime is ensured by Arc, so it is always valid when used.
unsafe impl Send for Semaphore {}
impl Semaphore {
/// Creates a new unnamed semaphore with the specified initial and maximum counts.
pub(crate) fn new_unnamed(initial_count: u32, maximum_count: u32) -> Result<Self, WindowsError> {
if maximum_count == 0 {
return Err(WindowsError::InvalidSemaphoreParams(
"maximum_count must be greater than 0",
));
}
if initial_count > maximum_count {
return Err(WindowsError::InvalidSemaphoreParams(
"initial_count must be less than or equal to maximum_count",
));
}
let initial_count = i32::try_from(initial_count)
.map_err(|_| WindowsError::InvalidSemaphoreParams("initial_count should be positive"))?;
let maximum_count = i32::try_from(maximum_count)
.map_err(|_| WindowsError::InvalidSemaphoreParams("maximum_count should be positive"))?;
// SAFETY: All parameters are checked for validity above:
// - initial_count is always <= maximum_count.
// - maximum_count is always > 0.
// - all values are positive.
let handle = unsafe {
CreateSemaphoreW(None, initial_count, maximum_count, None).map_err(WindowsError::CreateSemaphore)?
};
// SAFETY: Handle is valid and we are the owner of the handle.
let handle = unsafe { Owned::new(handle) };
// CreateSemaphoreW returns a valid handle on success.
Ok(Self {
// See `unsafe impl Send` comment.
// TODO(@CBenoit): Verify this comment.
#[expect(clippy::arc_with_non_send_sync)]
handle: Arc::new(handle),
})
}
fn raw(&self) -> HANDLE {
**self.handle
}
pub(crate) fn borrow(&self) -> BorrowedHandle<'_> {
BorrowedHandle(&self.handle)
}
pub(crate) fn release(&self, release_count: u16) -> Result<u32, WindowsError> {
let release_count = i32::from(release_count);
if release_count == 0 {
// semaphore release count must be greater than 0
return Err(WindowsError::InvalidSemaphoreParams(
"release_count must be greater than 0",
));
}
let mut previous_count = 0;
// SAFETY: All parameters are checked for validity above:
// - release_count > 0.
// - lpPreviousCount points to valid stack memory.
// - handle is valid and owned by this struct.
unsafe {
ReleaseSemaphore(self.raw(), release_count, Some(&mut previous_count))
.map_err(WindowsError::ReleaseSemaphore)?;
}
Ok(previous_count.try_into().expect("semaphore count is negative"))
}
}

View file

@ -0,0 +1,157 @@
use std::sync::Arc;
use ironrdp_dvc::encode_dvc_messages;
use ironrdp_pdu::PduResult;
use ironrdp_svc::{ChannelFlags, SvcMessage};
use tokio::sync::{mpsc, Notify};
use crate::error::DvcPipeProxyError;
use crate::message::RawDataDvcMessage;
use crate::os_pipe::OsPipe;
const IO_BUFFER_SIZE: usize = 1024 * 64; // 64K
pub(crate) type OnWriteDvcMessage = Box<dyn Fn(u32, Vec<SvcMessage>) -> PduResult<()> + Send>;
pub(crate) struct WorkerCtx {
pub(crate) on_write_dvc: OnWriteDvcMessage,
pub(crate) to_pipe_rx: mpsc::Receiver<Vec<u8>>,
pub(crate) abort_event: Arc<Notify>,
pub(crate) pipe_name: String,
pub(crate) channel_name: String,
pub(crate) channel_id: u32,
}
pub(crate) fn run_worker<P: OsPipe>(ctx: WorkerCtx) {
let _ = std::thread::spawn(move || {
let channel_name = ctx.channel_name.clone();
let pipe_name = ctx.pipe_name.clone();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(DvcPipeProxyError::Io);
let runtime = match runtime {
Ok(runtime) => runtime,
Err(error) => {
error!(
%channel_name,
%pipe_name,
?error,
"DVC pipe proxy worker thread initialization failed."
);
return;
}
};
if let Err(error) = runtime.block_on(worker::<P>(ctx)) {
error!(
%channel_name,
%pipe_name,
?error,
"DVC pipe proxy worker thread has failed."
);
}
});
}
enum NextWorkerState {
Abort,
Reconnect,
}
async fn process_client<P: OsPipe>(ctx: &mut WorkerCtx) -> Result<NextWorkerState, DvcPipeProxyError> {
let pipe_name = &ctx.pipe_name;
let channel_name = &ctx.channel_name;
let mut pipe = tokio::select! {
pipe = P::connect(pipe_name) => {
info!(%channel_name, %pipe_name,"DVC proxy worker thread has started.");
pipe?
}
_ = ctx.abort_event.notified() => {
info!(%channel_name, %pipe_name, "DVC proxy worker thread has been aborted.");
return Ok(NextWorkerState::Abort);
}
};
let mut from_pipe_buffer = [0u8; IO_BUFFER_SIZE];
loop {
let abort = ctx.abort_event.notified();
let read_pipe = pipe.read(&mut from_pipe_buffer);
let read_dvc = ctx.to_pipe_rx.recv();
tokio::select! {
() = abort => {
info!(%channel_name, %pipe_name, "Received abort signal for DVC proxy worker thread.");
return Ok(NextWorkerState::Abort);
}
read_bytes_result = read_pipe => {
let read_bytes = read_bytes_result?;
if read_bytes == 0 {
info!(%channel_name, %pipe_name, "DVC proxy pipe returned EOF");
// If client unexpectedly closed the connection, we should
// still be able to reconnect to same session.
return Ok(NextWorkerState::Reconnect);
}
let messages = encode_dvc_messages(
ctx.channel_id,
vec![Box::new(RawDataDvcMessage(from_pipe_buffer[..read_bytes].to_vec()))],
ChannelFlags::empty(),
)
.map_err(DvcPipeProxyError::EncodeDvcMessage)?;
if let Err(error) = (ctx.on_write_dvc)(0, messages) {
error!(%channel_name, %pipe_name, ?error, "DVC pipe proxy write callback failed");
}
}
dvc_input = read_dvc => {
let data = match dvc_input {
Some(data) => data,
None => {
info!(%channel_name, %pipe_name, "DVC mpsc channel returned EOF.");
// Server DVC has been closed, there is no point in
// trying to reconnect.
return Ok(NextWorkerState::Abort);
}
};
if let Err(error) = pipe.write_all(&data).await
{
error!(%channel_name, %pipe_name, ?error, "Failed to write to DVC pipe");
continue;
}
}
};
}
}
async fn worker<P: OsPipe>(mut ctx: WorkerCtx) -> Result<(), DvcPipeProxyError> {
loop {
match process_client::<P>(&mut ctx).await? {
NextWorkerState::Abort => {
info!(
channel_name = %ctx.channel_name,
pipe_name = %ctx.pipe_name,
"Aborting DVC proxy worker thread."
);
break;
}
NextWorkerState::Reconnect => {
info!(
channel_name = %ctx.channel_name,
pipe_name = %ctx.pipe_name,
"Reconnecting to DVC pipe..."
);
continue;
}
};
}
Ok(())
}