feat: Error Handling, Auto Reconnection

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-07-08 01:20:42 +03:00
parent 67efd29c24
commit 17977d4ee2
5 changed files with 247 additions and 91 deletions

View file

@ -1,10 +1,10 @@
use std::process::Output; use std::{process::Output, sync::Arc, time::Duration};
use futures_util::{ use futures_util::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
SinkExt, StreamExt, SinkExt, StreamExt,
}; };
use tokio::process::Command; use tokio::{process::Command, sync::Mutex};
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream}; use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream};
use crate::{Config, Payload, Report}; use crate::{Config, Payload, Report};
@ -14,21 +14,28 @@ type WebSocketSender =
type WebSocketReceiver = type WebSocketReceiver =
SplitStream<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>; SplitStream<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>;
pub async fn start(config: Config) { pub async fn start(config: Config, debug: bool) {
let (ws_sender, ws_receiver) = match connect(config).await { loop {
Some((ws_sender, ws_receiver)) => (ws_sender, ws_receiver), let (ws_sender, ws_receiver) = match connect(&config, debug).await {
None => return, Some((ws_sender, ws_receiver)) => (ws_sender, ws_receiver),
}; None => {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
serve((ws_sender, ws_receiver)).await; serve((ws_sender, ws_receiver), debug).await;
}
} }
pub async fn connect(config: Config) -> Option<(WebSocketSender, WebSocketReceiver)> { pub async fn connect(config: &Config, debug: bool) -> Option<(WebSocketSender, WebSocketReceiver)> {
let ws_connection = let ws_connection =
match connect_async(format!("ws://{}:{}", config.server_address, config.port)).await { match connect_async(format!("ws://{}:{}", config.server_address, config.port)).await {
Ok(ws_connection) => ws_connection, Ok(ws_connection) => ws_connection,
Err(err_val) => { Err(err_val) => {
eprintln!("Error: WebSocket Connection | {}", err_val); if debug {
eprintln!("Error: WebSocket Connection | {}", err_val);
}
return None; return None;
} }
}; };
@ -37,28 +44,34 @@ pub async fn connect(config: Config) -> Option<(WebSocketSender, WebSocketReceiv
Some((ws_sender, ws_receiver)) Some((ws_sender, ws_receiver))
} }
pub async fn serve((mut ws_sender, mut ws_receiver): (WebSocketSender, WebSocketReceiver)) -> ! { pub async fn serve(
loop { (ws_sender, mut ws_receiver): (WebSocketSender, WebSocketReceiver),
match receive(&mut ws_receiver).await { debug: bool,
Some(message) => { ) {
match serde_json::from_str(&message[..]) { let ws_sender = Arc::new(Mutex::new(ws_sender));
Ok(payload) => match execute(payload).await { while let Some(message) = receive(&mut ws_receiver, debug).await {
Some(output) => send(output, &mut ws_sender).await, match serde_json::from_str::<Payload>(&message[..]) {
None => todo!(), Ok(payload) => {
}, let ws_sender = ws_sender.clone();
Err(err_val) => { tokio::spawn(async move {
eprintln!("Error: Message to Payload | {}", err_val); let output = execute(payload.clone(), debug).await;
continue; send(output, payload, ws_sender.clone(), debug).await
} });
};
} }
None => continue, Err(err_val) => {
} if debug {
eprintln!("Error: Message to Payload | {}", err_val);
}
continue;
}
};
} }
} }
pub async fn execute(payload: Payload) -> Option<Output> { pub async fn execute(payload: Payload, debug: bool) -> Option<Output> {
println!("{:#?}", payload); if debug {
println!("{:#?}", payload);
}
match Command::new(payload.command) match Command::new(payload.command)
.args(payload.args) .args(payload.args)
.output() .output()
@ -66,41 +79,84 @@ pub async fn execute(payload: Payload) -> Option<Output> {
{ {
Ok(output) => Some(output), Ok(output) => Some(output),
Err(err_val) => { Err(err_val) => {
eprintln!("Error: Command Execution | {}", err_val); if debug {
eprintln!("Error: Command Execution | {}", err_val);
}
return None; return None;
} }
} }
} }
pub async fn receive(ws_receiver: &mut WebSocketReceiver) -> Option<String> { pub async fn receive(ws_receiver: &mut WebSocketReceiver, debug: bool) -> Option<String> {
match ws_receiver.next().await { match ws_receiver.next().await {
Some(message) => match message { Some(message) => match message {
Ok(message) => { Ok(message) => {
if let Message::Text(message) = message { if let Message::Text(message) = message {
Some(message) Some(message)
} else { } else {
eprintln!("Error: Message Type | {:#?}", message); if debug {
eprintln!("Error: Message Type | {:#?}", message);
}
None None
} }
} }
Err(err_val) => { Err(err_val) => {
eprintln!("Error: Message | {}", err_val); if debug {
eprintln!("Error: Message | {}", err_val);
}
None None
} }
}, },
None => { None => {
eprintln!("Error: WebSocket Receive"); if debug {
eprintln!("Error: WebSocket Receive");
}
None None
} }
} }
} }
pub async fn send(output: Output, ws_sender: &mut WebSocketSender) { pub async fn send(
let report = Report { output: Option<Output>,
status: output.status.to_string(), payload: Payload,
stdout: String::from_utf8(output.stdout).unwrap(), ws_sender: Arc<Mutex<WebSocketSender>>,
stderr: String::from_utf8(output.stderr).unwrap(), debug: bool,
) -> bool {
let report = match output {
Some(output) => Report {
payload,
status: output.status.to_string(),
stdout: String::from_utf8(output.stdout).unwrap(),
stderr: String::from_utf8(output.stderr).unwrap(),
},
None => Report {
payload,
status: "Nope".to_string(),
stdout: "Nope".to_string(),
stderr: "Nope".to_string(),
},
}; };
let report = serde_json::json!(report); let report = serde_json::json!(report);
let _ = ws_sender.send(report.to_string().into()).await; let result = ws_sender.lock().await.send(report.to_string().into()).await;
match result {
Ok(_) => {
let result = ws_sender.lock().await.flush().await;
match result {
Ok(_) => true,
Err(err_val) => {
if debug {
eprintln!("Error: WebSocket Flush | {}", err_val);
}
false
}
}
}
Err(err_val) => {
if debug {
eprintln!("Error: WebSocket Send | {}", err_val);
}
false
}
}
} }

View file

@ -11,21 +11,27 @@ pub enum Runner {
Client, Client,
} }
#[derive(Debug)] pub enum RunnerMode {
State(Runner, bool),
}
#[derive(Debug, Clone)]
pub struct Config { pub struct Config {
pub server_address: IpAddr, pub server_address: IpAddr,
pub port: u16, pub port: u16,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Payload { pub struct Payload {
pub sudo: bool, pub sudo: bool,
pub user: String,
pub command: String, pub command: String,
pub args: Vec<String>, pub args: Vec<String>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct Report { pub struct Report {
pub payload: Payload,
pub status: String, pub status: String,
pub stdout: String, pub stdout: String,
pub stderr: String, pub stderr: String,

View file

@ -1,6 +1,6 @@
use rust_remote::{ use rust_remote::{
client, server,
utils::{read_config, take_args}, utils::{read_config, take_args},
Runner, RunnerMode,
}; };
#[tokio::main] #[tokio::main]
@ -16,9 +16,19 @@ async fn main() {
}; };
match take_args() { match take_args() {
Some(runner) => match runner { Some(runner_mode) => match runner_mode {
rust_remote::Runner::Server => server::start(config).await, RunnerMode::State(Runner::Server, false) => {
rust_remote::Runner::Client => client::start(config).await, rust_remote::server::start(config, false).await
}
RunnerMode::State(Runner::Server, true) => {
rust_remote::server::start(config, true).await
}
RunnerMode::State(Runner::Client, false) => {
rust_remote::client::start(config, false).await
}
RunnerMode::State(Runner::Client, true) => {
rust_remote::client::start(config, true).await
}
}, },
None => { None => {
eprintln!("Error: Take Args"); eprintln!("Error: Take Args");

View file

@ -1,53 +1,95 @@
use std::io::stdin; use std::{io::stdin, sync::Arc};
use futures_util::{ use futures_util::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
SinkExt, StreamExt, SinkExt, StreamExt,
}; };
use tokio::net::{TcpListener, TcpStream}; use tokio::{
net::{TcpListener, TcpStream},
sync::Mutex,
};
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
use crate::{Config, Payload}; use crate::{Config, Payload};
type WebSocketSender = SplitSink<WebSocketStream<TcpStream>, Message>; type WebSocketSender = SplitSink<WebSocketStream<TcpStream>, Message>;
type WebSocketReceiver = SplitStream<WebSocketStream<TcpStream>>; type WebSocketReceiver = SplitStream<WebSocketStream<TcpStream>>;
pub async fn start(config: Config) { pub async fn start(config: Config, debug: bool) {
let listener = let listener = match setup(config, debug).await {
match TcpListener::bind(format!("{}:{}", config.server_address, config.port)).await { Some(listener) => listener,
Ok(listener) => listener, None => return,
Err(err_val) => { };
eprintln!("Error: Listener | {}", err_val);
return;
}
};
loop { loop {
if let Ok(connection) = listener.accept().await { match establish_connection(&listener, debug).await {
let ws_connection = match accept_async(connection.0).await { Some((ws_sender, ws_receiver)) => {
Ok(ws_connection) => ws_connection, let ws_sender = Arc::new(Mutex::new(ws_sender));
Err(err_val) => { let ws_receiver = Arc::new(Mutex::new(ws_receiver));
eprintln!("Error: WebSocket Upgrade | {}", err_val); loop {
continue; let ws_sender = ws_sender.clone();
} let ws_receiver = ws_receiver.clone();
}; match payload_from_input(debug).await {
Some(payload) => {
let (mut ws_sender, mut ws_receiver) = ws_connection.split(); if !send(payload, ws_sender, debug).await {
loop { break;
match payload_from_input().await { }
Some(payload) => { tokio::spawn(async move {
send(payload, &mut ws_sender).await; let report = receive(ws_receiver, debug).await;
let report = receive(&mut ws_receiver).await; println!("{:#?}", report);
println!("{:#?}", report); });
}
None => continue,
} }
None => continue,
} }
} }
None => return,
}
}
}
pub async fn setup(config: Config, debug: bool) -> Option<TcpListener> {
match TcpListener::bind(format!("{}:{}", config.server_address, config.port)).await {
Ok(listener) => Some(listener),
Err(err_val) => {
if debug {
eprintln!("Error: Listener | {}", err_val);
}
None
} }
} }
} }
pub async fn payload_from_input() -> Option<Payload> { pub async fn establish_connection(
match get_input() { listener: &TcpListener,
debug: bool,
) -> Option<(WebSocketSender, WebSocketReceiver)> {
match listener.accept().await {
Ok(connection) => match accept_async(connection.0).await {
Ok(ws_connection) => Some(ws_connection.split()),
Err(err_val) => {
if debug {
eprintln!("Error: WebSocket Upgrade | {}", err_val);
}
None
}
},
Err(err_val) => {
if debug {
eprintln!("Error: Listener Accept | {}", err_val);
}
None
}
}
}
pub async fn payload_from_input(debug: bool) -> Option<Payload> {
println!("User");
// let user = match get_input() {
// Some(input) => input,
// None => return None,
// };
let user = "tahinli".to_string();
println!("Command");
match get_input(debug) {
Some(input) => { Some(input) => {
let mut args: Vec<String> = input.split_ascii_whitespace().map(String::from).collect(); let mut args: Vec<String> = input.split_ascii_whitespace().map(String::from).collect();
if args.is_empty() { if args.is_empty() {
@ -64,6 +106,7 @@ pub async fn payload_from_input() -> Option<Payload> {
} }
Some(Payload { Some(Payload {
sudo, sudo,
user,
command, command,
args, args,
}) })
@ -73,41 +116,73 @@ pub async fn payload_from_input() -> Option<Payload> {
} }
} }
pub fn get_input() -> Option<String> { pub fn get_input(debug: bool) -> Option<String> {
let mut payload_input: String = String::new(); let mut payload_input: String = String::new();
match stdin().read_line(&mut payload_input) { match stdin().read_line(&mut payload_input) {
Ok(_) => Some(payload_input.trim().to_string()), Ok(_) => Some(payload_input.trim().to_string()),
Err(err_val) => { Err(err_val) => {
eprintln!("Error: Read Input | {}", err_val); if debug {
eprintln!("Error: Read Input | {}", err_val);
}
None None
} }
} }
} }
pub async fn receive(ws_receiver: &mut WebSocketReceiver) -> Option<String> { pub async fn receive(ws_receiver: Arc<Mutex<WebSocketReceiver>>, debug: bool) -> Option<String> {
match ws_receiver.next().await { match ws_receiver.lock().await.next().await {
Some(message) => match message { Some(message) => match message {
Ok(message) => { Ok(message) => {
if let Message::Text(message) = message { if let Message::Text(message) = message {
Some(message) Some(message)
} else { } else {
eprintln!("Error: Message Type | {:#?}", message); if debug {
eprintln!("Error: Message Type | {:#?}", message);
}
None None
} }
} }
Err(err_val) => { Err(err_val) => {
eprintln!("Error: Message | {}", err_val); if debug {
eprintln!("Error: Message | {}", err_val);
}
None None
} }
}, },
None => { None => {
eprintln!("Error: WebSocket Receive"); if debug {
eprintln!("Error: WebSocket Receive");
}
None None
} }
} }
} }
pub async fn send(payload: Payload, ws_sender: &mut WebSocketSender) { pub async fn send(payload: Payload, ws_sender: Arc<Mutex<WebSocketSender>>, debug: bool) -> bool {
let payload = serde_json::json!(payload); let payload = serde_json::json!(payload);
let _ = ws_sender.send(payload.to_string().into()).await; let result = ws_sender
.lock()
.await
.send(payload.to_string().into())
.await;
match result {
Ok(_) => {
let result = ws_sender.lock().await.flush().await;
match result {
Ok(_) => true,
Err(err_val) => {
if debug {
eprintln!("Error: WebSocket Flush | {}", err_val);
}
false
}
}
}
Err(err_val) => {
if debug {
eprintln!("Error: WebSocket Send | {}", err_val);
}
false
}
}
} }

View file

@ -1,15 +1,24 @@
use std::{env, fs::File, io::Read}; use std::{env, fs::File, io::Read};
use crate::{Config, Runner}; use crate::{Config, Runner, RunnerMode};
pub fn take_args() -> Option<Runner> { pub fn take_args() -> Option<RunnerMode> {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
if args.len() > 1 { if args.len() > 1 {
match &args[1][..] { let runner = match &args[1][..] {
"--server" => Some(Runner::Server), "--server" => Runner::Server,
"--client" => Some(Runner::Client), "--client" => Runner::Client,
_ => None, _ => return None,
} };
let debug = if args.len() > 2 {
match &args[2][..] {
"--debug" => true,
_ => return None,
}
} else {
false
};
Some(RunnerMode::State(runner, debug))
} else { } else {
None None
} }