feat(repl): impl client server protocol

This commit is contained in:
Hanaasagi 2023-04-23 15:44:38 +09:00
parent d2d6f7341a
commit a2ee0f6c98
2 changed files with 218 additions and 73 deletions

View file

@ -19,6 +19,123 @@ use erg_compiler::Compiler;
pub type EvalError = CompileError;
pub type EvalErrors = CompileErrors;
/// The instructions for communication between the client and the server.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[repr(u8)]
enum Inst {
/// Send from server to client. Informs the client to print data.
Print = 0x01,
/// Send from client to server. Informs the REPL server that the executable .pyc file has been written out and is ready for evaluation.
Load = 0x02,
/// Send from server to client. Represents an exception.
Exception = 0x03,
/// Send from server to client. Tells the code generator to initialize due to an error.
Initialize = 0x04,
/// Informs that the connection is to be / should be terminated.
Exit = 0x05,
/// Informs that it is not a supported instruction.
Unknown = 0x00,
}
impl Inst {
fn has_data(&self) -> bool {
match self {
Self::Print => true,
Self::Load => false,
Self::Exception => true,
Self::Initialize => true,
Self::Exit => false,
Self::Unknown => false,
}
}
}
impl Into<Inst> for u8 {
fn into(self) -> Inst {
match self {
0x01 => Inst::Print,
0x02 => Inst::Load,
0x03 => Inst::Exception,
0x04 => Inst::Initialize,
0x05 => Inst::Exit,
_ => Inst::Unknown,
}
}
}
/// -------------------------------
/// | ins | size | data
/// -------------------------------
/// | 1 byte | 2 bytes | n bytes
/// -------------------------------
#[derive(Debug, Clone)]
struct Message {
inst: Inst,
size: usize,
data: Option<Vec<u8>>,
}
impl Message {
fn new(inst: Inst, data: Option<Vec<u8>>) -> Self {
let size = if let Some(d) = &data { d.len() } else { 0 };
Self { inst, size, data }
}
}
#[derive(Debug)]
struct MessageStream<T: Read + Write> {
stream: T,
read_buf: Vec<u8>,
write_buf: Vec<u8>,
}
impl<T: Read + Write> MessageStream<T> {
fn new(stream: T) -> Self {
Self {
stream,
read_buf: Vec::new(),
write_buf: Vec::new(),
}
}
fn send_msg(&mut self, msg: &Message) -> Result<(), std::io::Error> {
self.write_buf.clear();
self.write_buf.extend((msg.inst as u8).to_be_bytes());
self.write_buf.extend((msg.size).to_be_bytes());
self.write_buf
.extend_from_slice(&msg.data.clone().unwrap_or_default());
self.stream.write_all(&self.write_buf)?;
Ok(())
}
fn recv_msg(&mut self) -> Result<Message, std::io::Error> {
// read instruction, 1 byte
let mut inst_buf = [0; 1];
self.stream.read_exact(&mut inst_buf)?;
let inst: Inst = u8::from_be_bytes(inst_buf).into();
if !inst.has_data() {
return Ok(Message::new(inst, None));
}
// read size, 2 bytes
let mut size_buf = [0; 2];
self.stream.read_exact(&mut size_buf)?;
let data_size = u16::from_be_bytes(size_buf) as usize;
// read data
let mut data_buf = vec![0; data_size];
self.stream.read_exact(&mut data_buf)?;
Ok(Message::new(inst, Some(data_buf)))
}
}
fn find_available_port() -> u16 {
let socket = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
TcpListener::bind(socket)
@ -33,7 +150,7 @@ fn find_available_port() -> u16 {
#[derive(Debug)]
pub struct DummyVM {
compiler: Compiler,
stream: Option<TcpStream>,
stream: Option<MessageStream<TcpStream>>,
}
impl Default for DummyVM {
@ -82,7 +199,7 @@ impl Runnable for DummyVM {
stream
.set_read_timeout(Some(Duration::from_secs(cfg.py_server_timeout)))
.unwrap();
break Some(stream);
break Some(MessageStream::new(stream));
}
Err(_) => {
if !cfg.quiet_repl {
@ -104,15 +221,16 @@ impl Runnable for DummyVM {
fn finish(&mut self) {
if let Some(stream) = &mut self.stream {
if let Err(err) = stream.write_all("exit".as_bytes()) {
// send exit to server
if let Err(err) = stream.send_msg(&Message::new(Inst::Exit, None)) {
eprintln!("Write error: {err}");
process::exit(1);
}
let mut buf = [0; 1024];
match stream.read(&mut buf) {
Result::Ok(n) => {
let s = std::str::from_utf8(&buf[..n]).unwrap();
if s.contains("closed") && !self.cfg().quiet_repl {
// wait server exit
match stream.recv_msg() {
Result::Ok(msg) => {
if msg.inst == Inst::Exit && !self.cfg().quiet_repl {
println!("The REPL server is closed.");
}
}
@ -121,6 +239,7 @@ impl Runnable for DummyVM {
process::exit(1);
}
}
remove_file(self.cfg().dump_pyc_filename()).unwrap_or(());
}
}
@ -158,18 +277,68 @@ impl Runnable for DummyVM {
.map_err(|eart| eart.errors)?;
let (last, warns) = (arti.object, arti.warns);
let mut res = warns.to_string();
macro_rules! err_handle {
() => {
{
self.finish();
process::exit(1);
}
};
($hint:expr, $($args:expr),*) => {
{
self.finish();
eprintln!($hint, $($args)*);
process::exit(1);
}
};
}
// Tell the REPL server to execute the code
res += &match self.stream.as_mut().unwrap().write("load".as_bytes()) {
Result::Ok(_) => self.read()?,
Result::Err(err) => {
self.finish();
eprintln!("Sending error: {err}");
process::exit(1);
}
if let Err(err) = self
.stream
.as_mut()
.unwrap()
.send_msg(&Message::new(Inst::Load, None))
{
err_handle!("Sending error: {}", err);
};
// receive data from server
let data = match self.stream.as_mut().unwrap().recv_msg() {
Result::Ok(msg) => {
let s = match msg.inst {
Inst::Exception => {
return Err(EvalErrors::from(EvalError::system_exit()));
}
Inst::Initialize => {
self.compiler.initialize_generator();
String::from_utf8(msg.data.unwrap_or_default())
}
Inst::Print => String::from_utf8(msg.data.unwrap_or_default()),
Inst::Exit => err_handle!("Recving inst {:?} from server", msg.inst),
// `load` can only be sent from the client to the server
Inst::Load | Inst::Unknown => {
err_handle!("Recving unexpected inst {:?} from server", msg.inst)
}
};
if s.is_err() {
err_handle!("Failed to parse server response data, error: {:?}", s.err());
} else {
s.unwrap()
}
}
Result::Err(err) => err_handle!("Recving error: {}", err),
};
res.push_str(&data);
// If the result of an expression is None, it will not be displayed in the REPL.
if res.ends_with("None") {
res.truncate(res.len() - 5);
}
if self.cfg().show_type {
res.push_str(": ");
res.push_str(
@ -197,47 +366,4 @@ impl DummyVM {
pub fn eval(&mut self, src: String) -> Result<String, EvalErrors> {
Runnable::eval(self, src)
}
fn read(&mut self) -> Result<String, EvalErrors> {
// Server Data Format:
// -------------------
// | size | data
// -------------------
// | 2 bytes | n bytes
// -------------------
let mut size_buf = [0; 2];
match self.stream.as_mut().unwrap().read_exact(&mut size_buf) {
Result::Ok(()) => {}
Result::Err(err) => {
self.finish();
eprintln!("Read error: {err}");
process::exit(1);
}
}
let data_size = u16::from_be_bytes(size_buf) as usize;
let mut data_buf = vec![0; data_size];
match self.stream.as_mut().unwrap().read_exact(&mut data_buf) {
Result::Ok(()) => {}
Result::Err(err) => {
self.finish();
eprintln!("Read error: {err}");
process::exit(1);
}
}
let s = std::str::from_utf8(&data_buf)
.expect("failed to parse the response, maybe the output is too long");
if s.starts_with("[Exception] SystemExit") {
Err(EvalErrors::from(EvalError::system_exit()))
} else if let Some(remaing) = s.strip_prefix("[Initialize]") {
self.compiler.initialize_generator();
Ok(remaing.to_string())
} else {
Ok(s.to_string())
}
}
}

View file

@ -15,24 +15,44 @@ __server_socket.listen(1)
__already_loaded = False
__ctx = {'__importlib': __importlib}
def __encode(s):
s_bytes = s.encode()
s_len = len(s_bytes)
# two bytes for size, and n bytes for data
return s_len.to_bytes(2, 'big') + s_bytes
class INST:
# Informs that it is not a supported instruction.
UNKNOWN = 0x00
# Send from server to client. Informs the client to print data.
PRINT = 0x01
# Send from client to server. Informs the REPL server that the executable .pyc file has been written out and is ready for evaluation.
LOAD = 0x02
# Send from server to client. Represents an exception.
EXCEPTION = 0x03
# Send from server to client. Tells the code generator to initialize due to an error.
INITIALIZE = 0x04
# Informs that the connection is to be / should be terminated.
EXIT = 0x05
def __encode(instr, data=''):
data_bytes = data.encode()
data_len = len(data_bytes)
if data_len > 0:
# one byte for inst, two bytes for size(Optional), and n bytes for data(Optional)
return instr.to_bytes(1, 'big') + data_len.to_bytes(2, 'big') + data_bytes
return instr.to_bytes(1, 'big')
while True:
try:
__order = __client_socket.recv(1024).decode()
__data = __client_socket.recv(1024)
except ConnectionResetError: # when the server was crashed
break
if __order == 'quit' or __order == 'exit': # when the server was closed successfully
__client_socket.send(__encode('closed'))
__inst = int.from_bytes(__data[:1], 'big')
if __inst == INST.EXIT: # when the server was closed successfully
__client_socket.send(__encode(INST.EXIT))
break
elif __order == 'load':
elif __inst == INST.LOAD:
__sys.stdout = __io.StringIO()
__res = ''
__exc = ''
__resp_inst = INST.PRINT
__buf = []
try:
if __already_loaded:
@ -42,7 +62,7 @@ while True:
__res = str(exec('import __MODULE__', __ctx))
__already_loaded = True
except SystemExit:
__buf.append('[Exception] SystemExit')
__client_socket.send(__encode(INST.EXCEPTION, 'SystemExit'))
continue
except Exception as e:
try:
@ -51,16 +71,15 @@ while True:
excs = __traceback.format_exception_only(e.__class__, e)
__exc = ''.join(excs).rstrip()
__traceback.clear_frames(e.__traceback__)
__buf.append('[Initialize]')
__resp_inst = INST.INITIALIZE
__out = __sys.stdout.getvalue()[:-1]
# assert not(__exc and __res)
if __exc or __res:
if __out and __exc or __res:
__out += '\n'
__res = __out + __exc + __res
__buf.append(__res)
__client_socket.send(__encode(''.join(__buf)))
__client_socket.send(__encode(__resp_inst, ''.join(__buf)))
else:
__client_socket.send(__encode('unknown operation'))
__client_socket.send(__encode(INST.UNKNOWN))
__client_socket.close()
__server_socket.close()