fix: 🐛 no detection for server based disconnection

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-17 02:51:50 +03:00
parent ce9d91bc9e
commit 94fc639c74
3 changed files with 111 additions and 6 deletions

View file

@ -91,6 +91,7 @@ struct DataChannel {
struct CommunicationChannel {
base_to_streaming_sender: Sender<bool>,
streaming_to_base_sender: Sender<bool>,
streaming_to_base_is_finished: Sender<bool>,
base_to_recording_sender: Sender<bool>,
recording_to_base_sender: Sender<bool>,
base_to_playing_sender: Sender<Player>,
@ -142,6 +143,7 @@ impl Streamer {
communication_channel: CommunicationChannel {
base_to_streaming_sender: channel(1).0,
streaming_to_base_sender: channel(1).0,
streaming_to_base_is_finished: channel(1).0,
base_to_recording_sender: channel(1).0,
recording_to_base_sender: channel(1).0,
base_to_playing_sender: channel(1).0,
@ -187,7 +189,21 @@ impl Streamer {
.subscribe();
let microphone_stream_volume = self.gui_status.microphone_volume.value.clone();
let audio_stream_volume = self.gui_status.audio_volume.value.clone();
Command::perform(
let streaming_to_base_sender_is_finished = self
.communication_channel
.streaming_to_base_is_finished
.clone();
let streaming_to_base_receiver_is_streaming_stopped = self
.communication_channel
.recording_to_base_sender
.subscribe();
let streaming_to_base_receiver_is_streaming_finished = self
.communication_channel
.streaming_to_base_is_finished
.subscribe();
let connect_command = Command::perform(
async move {
gui_utils::connect(
microphone_stream_receiver,
@ -195,13 +211,28 @@ impl Streamer {
streamer_config,
streaming_to_base_sender,
base_to_streaming_receiver,
streaming_to_base_sender_is_finished,
microphone_stream_volume,
audio_stream_volume,
)
.await
},
Message::State,
);
let is_streaming_finished_command = Command::perform(
async move {
gui_utils::is_streaming_finished(
streaming_to_base_receiver_is_streaming_finished,
streaming_to_base_receiver_is_streaming_stopped,
)
.await
},
Message::State,
);
let commands = vec![connect_command, is_streaming_finished_command];
Command::batch(commands)
}
Event::Disconnect => {
println!("Disconnect");

View file

@ -16,6 +16,7 @@ pub async fn connect(
streamer_config: Config,
streaming_to_base_sender: Sender<bool>,
base_to_streaming_receiver: Receiver<bool>,
streaming_to_base_sender_is_finished: Sender<bool>,
microphone_stream_volume: Arc<Mutex<f32>>,
audio_stream_volume: Arc<Mutex<f32>>,
) -> State {
@ -26,6 +27,7 @@ pub async fn connect(
streamer_config,
base_to_streaming_receiver,
streaming_to_base_sender.clone(),
streaming_to_base_sender_is_finished,
microphone_stream_volume,
audio_stream_volume,
));
@ -71,6 +73,46 @@ pub async fn disconnect(
}
}
pub async fn is_streaming_finished(
mut streaming_to_base_receiver_is_streaming_finished: Receiver<bool>,
mut streaming_to_base_receiver_is_streaming_stopped: Receiver<bool>,
) -> State {
tokio::select! {
is_streaming_finished = async move {
match streaming_to_base_receiver_is_streaming_finished.recv().await {
Ok(_) => State::Disconnected,
Err(err_val) => {
eprintln!(
"Error: Communication | Streaming to Base | Recv | Is Finished | {}",
err_val
);
State::Connected
},
}
} => is_streaming_finished,
is_streaming_stopped = async move {
match streaming_to_base_receiver_is_streaming_stopped.recv().await {
Ok(_) => {
while let Err(err_val) = streaming_to_base_receiver_is_streaming_stopped.recv().await {
eprintln!(
"Error: Communication | Streaming to Base | Recv | Is Stopped | {}",
err_val
);
}
State::Disconnected
},
Err(err_val) => {
eprintln!(
"Error: Communication | Streaming to Base | Recv | Is Stopped Never Started | {}",
err_val
);
State::Disconnected
},
}
} =>is_streaming_stopped,
}
}
pub async fn start_recording(
microphone_stream_sender: Sender<f32>,
recording_to_base_sender: Sender<bool>,

View file

@ -21,6 +21,7 @@ pub async fn connect(
streamer_config: Config,
mut base_to_streaming: Receiver<bool>,
streaming_to_base: Sender<bool>,
streaming_to_base_sender_is_finished: Sender<bool>,
microphone_stream_volume: Arc<Mutex<f32>>,
audio_stream_volume: Arc<Mutex<f32>>,
) {
@ -83,6 +84,7 @@ pub async fn connect(
mixer_task,
base_to_streaming,
streaming_to_base,
streaming_to_base_sender_is_finished,
));
}
}
@ -254,15 +256,45 @@ async fn status_checker(
mixer_task: JoinHandle<()>,
mut base_to_streaming: Receiver<bool>,
streaming_to_base: Sender<bool>,
streaming_to_base_sender_is_finished: Sender<bool>,
) {
while let Err(_) = base_to_streaming.try_recv() {
let mut problem = false;
loop {
if let Ok(_) = base_to_streaming.try_recv() {
println!("Time to Retrieve");
break;
}
if stream_task.is_finished() {
println!("Warning: Stream Task Finished");
problem = true;
break;
}
if mixer_task.is_finished() {
println!("Warning: Mixer Task Finished");
problem = true;
break;
}
if message_organizer_task.is_finished() {
println!("Warning: Message Organizer Task Finished");
problem = true;
break;
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
stream_task.abort();
mixer_task.abort();
message_organizer_task.abort();
if problem {
println!("Problem");
match streaming_to_base_sender_is_finished.send(true) {
Ok(_) => println!("Cleaning Done: Streamer Disconnected"),
Err(err_val) => eprintln!("Error: Cleaning | Is Finished | {}", err_val),
}
} else {
println!("No Problem");
match streaming_to_base.send(true) {
Ok(_) => println!("Cleaning Done: Streamer Disconnected"),
Err(err_val) => eprintln!("Error: Cleaning | {}", err_val),
Err(err_val) => eprintln!("Error: Cleaning | Is Stopped | {}", err_val),
}
}
}