mirror of
https://github.com/joshuadavidthomas/django-language-server.git
synced 2025-09-28 04:45:03 +00:00
switch from runner to ipc and long-running sidecar process (#21)
This commit is contained in:
parent
4c10afb602
commit
235bb4419d
23 changed files with 556 additions and 281 deletions
|
@ -1,5 +1,13 @@
|
|||
mod client;
|
||||
mod process;
|
||||
mod server;
|
||||
mod transport;
|
||||
|
||||
pub use client::Client;
|
||||
pub use process::PythonProcess;
|
||||
pub use server::Server;
|
||||
pub use transport::parse_json_response;
|
||||
pub use transport::parse_raw_response;
|
||||
pub use transport::JsonResponse;
|
||||
pub use transport::Transport;
|
||||
pub use transport::TransportError;
|
||||
|
|
81
crates/djls-ipc/src/process.rs
Normal file
81
crates/djls-ipc/src/process.rs
Normal file
|
@ -0,0 +1,81 @@
|
|||
use crate::transport::{Transport, TransportError, TransportProtocol};
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use tokio::time;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PythonProcess {
|
||||
transport: Arc<Mutex<Box<dyn TransportProtocol>>>,
|
||||
_child: Child,
|
||||
healthy: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl PythonProcess {
|
||||
pub fn new(
|
||||
module: &str,
|
||||
transport: Transport,
|
||||
health_check_interval: Option<Duration>,
|
||||
) -> Result<Self, TransportError> {
|
||||
let mut child = Command::new("python")
|
||||
.arg("-m")
|
||||
.arg(module)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
let stdin = child.stdin.take().unwrap();
|
||||
let stdout = child.stdout.take().unwrap();
|
||||
|
||||
let process = Self {
|
||||
transport: Arc::new(Mutex::new(transport.create(stdin, stdout)?)),
|
||||
_child: child,
|
||||
healthy: Arc::new(AtomicBool::new(true)),
|
||||
};
|
||||
|
||||
if let Some(interval) = health_check_interval {
|
||||
process.start_health_check_task(interval)?;
|
||||
}
|
||||
|
||||
Ok(process)
|
||||
}
|
||||
|
||||
fn start_health_check_task(&self, interval: Duration) -> Result<(), TransportError> {
|
||||
let healthy = self.healthy.clone();
|
||||
let transport = self.transport.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut interval = time::interval(interval);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
if let Ok(mut transport) = transport.lock() {
|
||||
match transport.health_check() {
|
||||
Ok(()) => {
|
||||
healthy.store(true, Ordering::SeqCst);
|
||||
}
|
||||
Err(_) => {
|
||||
healthy.store(false, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_healthy(&self) -> bool {
|
||||
self.healthy.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn send(
|
||||
&mut self,
|
||||
message: &str,
|
||||
args: Option<Vec<String>>,
|
||||
) -> Result<String, TransportError> {
|
||||
let mut transport = self.transport.lock().unwrap();
|
||||
transport.send(message, args)
|
||||
}
|
||||
}
|
214
crates/djls-ipc/src/transport.rs
Normal file
214
crates/djls-ipc/src/transport.rs
Normal file
|
@ -0,0 +1,214 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::fmt::Debug;
|
||||
use std::io::{BufRead, BufReader, BufWriter, Write};
|
||||
use std::process::{ChildStdin, ChildStdout};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum TransportError {
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("JSON error: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
#[error("Process error: {0}")]
|
||||
Process(String),
|
||||
}
|
||||
|
||||
pub enum Transport {
|
||||
Raw,
|
||||
Json,
|
||||
}
|
||||
|
||||
impl Transport {
|
||||
pub fn create(
|
||||
&self,
|
||||
mut stdin: ChildStdin,
|
||||
mut stdout: ChildStdout,
|
||||
) -> Result<Box<dyn TransportProtocol>, TransportError> {
|
||||
let transport_type = match self {
|
||||
Transport::Raw => "raw",
|
||||
Transport::Json => "json",
|
||||
};
|
||||
|
||||
writeln!(stdin, "{}", transport_type).map_err(TransportError::Io)?;
|
||||
stdin.flush().map_err(TransportError::Io)?;
|
||||
|
||||
let mut ready_line = String::new();
|
||||
BufReader::new(&mut stdout)
|
||||
.read_line(&mut ready_line)
|
||||
.map_err(TransportError::Io)?;
|
||||
if ready_line.trim() != "ready" {
|
||||
return Err(TransportError::Process(
|
||||
"Python process not ready".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
match self {
|
||||
Transport::Raw => Ok(Box::new(RawTransport::new(stdin, stdout)?)),
|
||||
Transport::Json => Ok(Box::new(JsonTransport::new(stdin, stdout)?)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TransportProtocol: Debug + Send {
|
||||
fn new(stdin: ChildStdin, stdout: ChildStdout) -> Result<Self, TransportError>
|
||||
where
|
||||
Self: Sized;
|
||||
fn health_check(&mut self) -> Result<(), TransportError>;
|
||||
fn clone_box(&self) -> Box<dyn TransportProtocol>;
|
||||
fn send_impl(
|
||||
&mut self,
|
||||
message: &str,
|
||||
args: Option<Vec<String>>,
|
||||
) -> Result<String, TransportError>;
|
||||
|
||||
fn send(&mut self, message: &str, args: Option<Vec<String>>) -> Result<String, TransportError> {
|
||||
self.health_check()?;
|
||||
self.send_impl(message, args)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Box<dyn TransportProtocol> {
|
||||
fn clone(&self) -> Self {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RawTransport {
|
||||
reader: Arc<Mutex<BufReader<ChildStdout>>>,
|
||||
writer: Arc<Mutex<BufWriter<ChildStdin>>>,
|
||||
}
|
||||
|
||||
impl TransportProtocol for RawTransport {
|
||||
fn new(stdin: ChildStdin, stdout: ChildStdout) -> Result<Self, TransportError> {
|
||||
Ok(Self {
|
||||
reader: Arc::new(Mutex::new(BufReader::new(stdout))),
|
||||
writer: Arc::new(Mutex::new(BufWriter::new(stdin))),
|
||||
})
|
||||
}
|
||||
|
||||
fn health_check(&mut self) -> Result<(), TransportError> {
|
||||
self.send_impl("health", None)
|
||||
.and_then(|response| match response.as_str() {
|
||||
"ok" => Ok(()),
|
||||
other => Err(TransportError::Process(format!(
|
||||
"Health check failed: {}",
|
||||
other
|
||||
))),
|
||||
})
|
||||
}
|
||||
|
||||
fn clone_box(&self) -> Box<dyn TransportProtocol> {
|
||||
Box::new(RawTransport {
|
||||
reader: self.reader.clone(),
|
||||
writer: self.writer.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn send_impl(
|
||||
&mut self,
|
||||
message: &str,
|
||||
args: Option<Vec<String>>,
|
||||
) -> Result<String, TransportError> {
|
||||
let mut writer = self.writer.lock().unwrap();
|
||||
|
||||
if let Some(args) = args {
|
||||
// Join command and args with spaces
|
||||
writeln!(writer, "{} {}", message, args.join(" ")).map_err(TransportError::Io)?;
|
||||
} else {
|
||||
writeln!(writer, "{}", message).map_err(TransportError::Io)?;
|
||||
}
|
||||
|
||||
writer.flush().map_err(TransportError::Io)?;
|
||||
|
||||
let mut reader = self.reader.lock().unwrap();
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).map_err(TransportError::Io)?;
|
||||
Ok(line.trim().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct JsonCommand {
|
||||
command: String,
|
||||
args: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct JsonResponse {
|
||||
status: String,
|
||||
data: Option<Value>,
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
impl JsonResponse {
|
||||
pub fn data(&self) -> &Option<Value> {
|
||||
&self.data
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct JsonTransport {
|
||||
reader: Arc<Mutex<BufReader<ChildStdout>>>,
|
||||
writer: Arc<Mutex<BufWriter<ChildStdin>>>,
|
||||
}
|
||||
|
||||
impl TransportProtocol for JsonTransport {
|
||||
fn new(stdin: ChildStdin, stdout: ChildStdout) -> Result<Self, TransportError> {
|
||||
Ok(Self {
|
||||
reader: Arc::new(Mutex::new(BufReader::new(stdout))),
|
||||
writer: Arc::new(Mutex::new(BufWriter::new(stdin))),
|
||||
})
|
||||
}
|
||||
|
||||
fn health_check(&mut self) -> Result<(), TransportError> {
|
||||
self.send_impl("health", None).and_then(|response| {
|
||||
let json: JsonResponse = serde_json::from_str(&response)?;
|
||||
match json.status.as_str() {
|
||||
"ok" => Ok(()),
|
||||
_ => Err(TransportError::Process(
|
||||
json.error.unwrap_or_else(|| "Unknown error".to_string()),
|
||||
)),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn clone_box(&self) -> Box<dyn TransportProtocol> {
|
||||
Box::new(JsonTransport {
|
||||
reader: self.reader.clone(),
|
||||
writer: self.writer.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn send_impl(
|
||||
&mut self,
|
||||
message: &str,
|
||||
args: Option<Vec<String>>,
|
||||
) -> Result<String, TransportError> {
|
||||
let command = JsonCommand {
|
||||
command: message.to_string(),
|
||||
args,
|
||||
};
|
||||
|
||||
let mut writer = self.writer.lock().unwrap();
|
||||
serde_json::to_writer(&mut *writer, &command)?;
|
||||
writeln!(writer).map_err(TransportError::Io)?;
|
||||
writer.flush().map_err(TransportError::Io)?;
|
||||
|
||||
let mut reader = self.reader.lock().unwrap();
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).map_err(TransportError::Io)?;
|
||||
Ok(line.trim().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_raw_response(response: String) -> Result<String, TransportError> {
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub fn parse_json_response(response: String) -> Result<JsonResponse, TransportError> {
|
||||
serde_json::from_str(&response).map_err(TransportError::Json)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue