diff --git a/src/client.rs b/src/client.rs index 87eaa16..58fb579 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,10 @@ -use std::process::Output; +use std::{process::Output, sync::Arc, time::Duration}; use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; -use tokio::process::Command; +use tokio::{process::Command, sync::Mutex}; use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream}; use crate::{Config, Payload, Report}; @@ -14,21 +14,28 @@ type WebSocketSender = type WebSocketReceiver = SplitStream>>; -pub async fn start(config: Config) { - let (ws_sender, ws_receiver) = match connect(config).await { - Some((ws_sender, ws_receiver)) => (ws_sender, ws_receiver), - None => return, - }; +pub async fn start(config: Config, debug: bool) { + loop { + let (ws_sender, ws_receiver) = match connect(&config, debug).await { + Some((ws_sender, ws_receiver)) => (ws_sender, ws_receiver), + None => { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; - serve((ws_sender, ws_receiver)).await; + serve((ws_sender, ws_receiver), debug).await; + } } -pub async fn connect(config: Config) -> Option<(WebSocketSender, WebSocketReceiver)> { +pub async fn connect(config: &Config, debug: bool) -> Option<(WebSocketSender, WebSocketReceiver)> { let ws_connection = match connect_async(format!("ws://{}:{}", config.server_address, config.port)).await { Ok(ws_connection) => ws_connection, Err(err_val) => { - eprintln!("Error: WebSocket Connection | {}", err_val); + if debug { + eprintln!("Error: WebSocket Connection | {}", err_val); + } return None; } }; @@ -37,28 +44,34 @@ pub async fn connect(config: Config) -> Option<(WebSocketSender, WebSocketReceiv Some((ws_sender, ws_receiver)) } -pub async fn serve((mut ws_sender, mut ws_receiver): (WebSocketSender, WebSocketReceiver)) -> ! { - loop { - match receive(&mut ws_receiver).await { - Some(message) => { - match serde_json::from_str(&message[..]) { - Ok(payload) => match execute(payload).await { - Some(output) => send(output, &mut ws_sender).await, - None => todo!(), - }, - Err(err_val) => { - eprintln!("Error: Message to Payload | {}", err_val); - continue; - } - }; +pub async fn serve( + (ws_sender, mut ws_receiver): (WebSocketSender, WebSocketReceiver), + debug: bool, +) { + let ws_sender = Arc::new(Mutex::new(ws_sender)); + while let Some(message) = receive(&mut ws_receiver, debug).await { + match serde_json::from_str::(&message[..]) { + Ok(payload) => { + let ws_sender = ws_sender.clone(); + tokio::spawn(async move { + let output = execute(payload.clone(), debug).await; + send(output, payload, ws_sender.clone(), debug).await + }); } - None => continue, - } + Err(err_val) => { + if debug { + eprintln!("Error: Message to Payload | {}", err_val); + } + continue; + } + }; } } -pub async fn execute(payload: Payload) -> Option { - println!("{:#?}", payload); +pub async fn execute(payload: Payload, debug: bool) -> Option { + if debug { + println!("{:#?}", payload); + } match Command::new(payload.command) .args(payload.args) .output() @@ -66,41 +79,84 @@ pub async fn execute(payload: Payload) -> Option { { Ok(output) => Some(output), Err(err_val) => { - eprintln!("Error: Command Execution | {}", err_val); + if debug { + eprintln!("Error: Command Execution | {}", err_val); + } return None; } } } -pub async fn receive(ws_receiver: &mut WebSocketReceiver) -> Option { +pub async fn receive(ws_receiver: &mut WebSocketReceiver, debug: bool) -> Option { match ws_receiver.next().await { Some(message) => match message { Ok(message) => { if let Message::Text(message) = message { Some(message) } else { - eprintln!("Error: Message Type | {:#?}", message); + if debug { + eprintln!("Error: Message Type | {:#?}", message); + } None } } Err(err_val) => { - eprintln!("Error: Message | {}", err_val); + if debug { + eprintln!("Error: Message | {}", err_val); + } None } }, None => { - eprintln!("Error: WebSocket Receive"); + if debug { + eprintln!("Error: WebSocket Receive"); + } None } } } -pub async fn send(output: Output, ws_sender: &mut WebSocketSender) { - let report = Report { - status: output.status.to_string(), - stdout: String::from_utf8(output.stdout).unwrap(), - stderr: String::from_utf8(output.stderr).unwrap(), +pub async fn send( + output: Option, + payload: Payload, + ws_sender: Arc>, + debug: bool, +) -> bool { + let report = match output { + Some(output) => Report { + payload, + status: output.status.to_string(), + stdout: String::from_utf8(output.stdout).unwrap(), + stderr: String::from_utf8(output.stderr).unwrap(), + }, + None => Report { + payload, + status: "Nope".to_string(), + stdout: "Nope".to_string(), + stderr: "Nope".to_string(), + }, }; + let report = serde_json::json!(report); - let _ = ws_sender.send(report.to_string().into()).await; + let result = ws_sender.lock().await.send(report.to_string().into()).await; + match result { + Ok(_) => { + let result = ws_sender.lock().await.flush().await; + match result { + Ok(_) => true, + Err(err_val) => { + if debug { + eprintln!("Error: WebSocket Flush | {}", err_val); + } + false + } + } + } + Err(err_val) => { + if debug { + eprintln!("Error: WebSocket Send | {}", err_val); + } + false + } + } } diff --git a/src/lib.rs b/src/lib.rs index 8d291f1..03f03e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,21 +11,27 @@ pub enum Runner { Client, } -#[derive(Debug)] +pub enum RunnerMode { + State(Runner, bool), +} + +#[derive(Debug, Clone)] pub struct Config { pub server_address: IpAddr, pub port: u16, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Payload { pub sudo: bool, + pub user: String, pub command: String, pub args: Vec, } #[derive(Debug, Serialize, Deserialize)] pub struct Report { + pub payload: Payload, pub status: String, pub stdout: String, pub stderr: String, diff --git a/src/main.rs b/src/main.rs index 203c563..2825486 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use rust_remote::{ - client, server, utils::{read_config, take_args}, + Runner, RunnerMode, }; #[tokio::main] @@ -16,9 +16,19 @@ async fn main() { }; match take_args() { - Some(runner) => match runner { - rust_remote::Runner::Server => server::start(config).await, - rust_remote::Runner::Client => client::start(config).await, + Some(runner_mode) => match runner_mode { + RunnerMode::State(Runner::Server, false) => { + rust_remote::server::start(config, false).await + } + RunnerMode::State(Runner::Server, true) => { + rust_remote::server::start(config, true).await + } + RunnerMode::State(Runner::Client, false) => { + rust_remote::client::start(config, false).await + } + RunnerMode::State(Runner::Client, true) => { + rust_remote::client::start(config, true).await + } }, None => { eprintln!("Error: Take Args"); diff --git a/src/server.rs b/src/server.rs index 8748d53..6b457fe 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,53 +1,95 @@ -use std::io::stdin; +use std::{io::stdin, sync::Arc}; use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::Mutex, +}; use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; use crate::{Config, Payload}; type WebSocketSender = SplitSink, Message>; type WebSocketReceiver = SplitStream>; -pub async fn start(config: Config) { - let listener = - match TcpListener::bind(format!("{}:{}", config.server_address, config.port)).await { - Ok(listener) => listener, - Err(err_val) => { - eprintln!("Error: Listener | {}", err_val); - return; - } - }; +pub async fn start(config: Config, debug: bool) { + let listener = match setup(config, debug).await { + Some(listener) => listener, + None => return, + }; loop { - if let Ok(connection) = listener.accept().await { - let ws_connection = match accept_async(connection.0).await { - Ok(ws_connection) => ws_connection, - Err(err_val) => { - eprintln!("Error: WebSocket Upgrade | {}", err_val); - continue; - } - }; - - let (mut ws_sender, mut ws_receiver) = ws_connection.split(); - loop { - match payload_from_input().await { - Some(payload) => { - send(payload, &mut ws_sender).await; - let report = receive(&mut ws_receiver).await; - println!("{:#?}", report); + match establish_connection(&listener, debug).await { + Some((ws_sender, ws_receiver)) => { + let ws_sender = Arc::new(Mutex::new(ws_sender)); + let ws_receiver = Arc::new(Mutex::new(ws_receiver)); + loop { + let ws_sender = ws_sender.clone(); + let ws_receiver = ws_receiver.clone(); + match payload_from_input(debug).await { + Some(payload) => { + if !send(payload, ws_sender, debug).await { + break; + } + tokio::spawn(async move { + let report = receive(ws_receiver, debug).await; + println!("{:#?}", report); + }); + } + None => continue, } - None => continue, } } + None => return, + } + } +} +pub async fn setup(config: Config, debug: bool) -> Option { + match TcpListener::bind(format!("{}:{}", config.server_address, config.port)).await { + Ok(listener) => Some(listener), + Err(err_val) => { + if debug { + eprintln!("Error: Listener | {}", err_val); + } + None } } } -pub async fn payload_from_input() -> Option { - match get_input() { +pub async fn establish_connection( + listener: &TcpListener, + debug: bool, +) -> Option<(WebSocketSender, WebSocketReceiver)> { + match listener.accept().await { + Ok(connection) => match accept_async(connection.0).await { + Ok(ws_connection) => Some(ws_connection.split()), + Err(err_val) => { + if debug { + eprintln!("Error: WebSocket Upgrade | {}", err_val); + } + None + } + }, + Err(err_val) => { + if debug { + eprintln!("Error: Listener Accept | {}", err_val); + } + None + } + } +} + +pub async fn payload_from_input(debug: bool) -> Option { + println!("User"); + // let user = match get_input() { + // Some(input) => input, + // None => return None, + // }; + let user = "tahinli".to_string(); + println!("Command"); + match get_input(debug) { Some(input) => { let mut args: Vec = input.split_ascii_whitespace().map(String::from).collect(); if args.is_empty() { @@ -64,6 +106,7 @@ pub async fn payload_from_input() -> Option { } Some(Payload { sudo, + user, command, args, }) @@ -73,41 +116,73 @@ pub async fn payload_from_input() -> Option { } } -pub fn get_input() -> Option { +pub fn get_input(debug: bool) -> Option { let mut payload_input: String = String::new(); match stdin().read_line(&mut payload_input) { Ok(_) => Some(payload_input.trim().to_string()), Err(err_val) => { - eprintln!("Error: Read Input | {}", err_val); + if debug { + eprintln!("Error: Read Input | {}", err_val); + } None } } } -pub async fn receive(ws_receiver: &mut WebSocketReceiver) -> Option { - match ws_receiver.next().await { +pub async fn receive(ws_receiver: Arc>, debug: bool) -> Option { + match ws_receiver.lock().await.next().await { Some(message) => match message { Ok(message) => { if let Message::Text(message) = message { Some(message) } else { - eprintln!("Error: Message Type | {:#?}", message); + if debug { + eprintln!("Error: Message Type | {:#?}", message); + } None } } Err(err_val) => { - eprintln!("Error: Message | {}", err_val); + if debug { + eprintln!("Error: Message | {}", err_val); + } None } }, None => { - eprintln!("Error: WebSocket Receive"); + if debug { + eprintln!("Error: WebSocket Receive"); + } None } } } -pub async fn send(payload: Payload, ws_sender: &mut WebSocketSender) { +pub async fn send(payload: Payload, ws_sender: Arc>, debug: bool) -> bool { let payload = serde_json::json!(payload); - let _ = ws_sender.send(payload.to_string().into()).await; + let result = ws_sender + .lock() + .await + .send(payload.to_string().into()) + .await; + match result { + Ok(_) => { + let result = ws_sender.lock().await.flush().await; + match result { + Ok(_) => true, + Err(err_val) => { + if debug { + eprintln!("Error: WebSocket Flush | {}", err_val); + } + false + } + } + } + Err(err_val) => { + if debug { + eprintln!("Error: WebSocket Send | {}", err_val); + } + false + } + } } diff --git a/src/utils.rs b/src/utils.rs index 766ec2e..04653b3 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,15 +1,24 @@ use std::{env, fs::File, io::Read}; -use crate::{Config, Runner}; +use crate::{Config, Runner, RunnerMode}; -pub fn take_args() -> Option { +pub fn take_args() -> Option { let args: Vec = env::args().collect(); if args.len() > 1 { - match &args[1][..] { - "--server" => Some(Runner::Server), - "--client" => Some(Runner::Client), - _ => None, - } + let runner = match &args[1][..] { + "--server" => Runner::Server, + "--client" => Runner::Client, + _ => return None, + }; + let debug = if args.len() > 2 { + match &args[2][..] { + "--debug" => true, + _ => return None, + } + } else { + false + }; + Some(RunnerMode::State(runner, debug)) } else { None }