feat: microphone and audio stream at the same time

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-06 04:09:00 +03:00
parent 61f92412fd
commit 8b220d403b
3 changed files with 92 additions and 35 deletions

View file

@ -146,8 +146,9 @@ impl Streamer {
println!("Connect");
self.gui_status.are_we_connect = Condition::Loading;
let sound_stream_receiver =
let microphone_stream_receiver =
self.data_channel.microphone_stream_sender.subscribe();
let audio_stream_receiver = self.data_channel.audio_stream_sender.subscribe();
let streamer_config = self.config.clone().unwrap();
let streaming_to_base_sender =
self.communication_channel.streaming_to_base_sender.clone();
@ -159,7 +160,8 @@ impl Streamer {
Command::perform(
async move {
gui_utils::connect(
sound_stream_receiver,
microphone_stream_receiver,
audio_stream_receiver,
streamer_config,
streaming_to_base_sender,
base_to_streaming_receiver,
@ -257,8 +259,7 @@ impl Streamer {
.0,
);
///////Don't Forget it's for testing
let audio_stream_sender = self.data_channel.microphone_stream_sender.clone();
let audio_stream_sender = self.data_channel.audio_stream_sender.clone();
let playing_to_base_sender =
self.communication_channel.playing_to_base_sender.clone();
let base_to_playing_receiver = self

View file

@ -8,14 +8,16 @@ use crate::{
};
pub async fn connect(
sound_stream_receiver: Receiver<f32>,
microphone_stream_receiver: Receiver<f32>,
audio_stream_receiver: Receiver<f32>,
streamer_config: Config,
streaming_to_base_sender: Sender<bool>,
base_to_streaming_receiver: Receiver<bool>,
) -> State {
let mut streaming_to_base_receiver = streaming_to_base_sender.subscribe();
tokio::spawn(streaming::connect(
sound_stream_receiver,
microphone_stream_receiver,
audio_stream_receiver,
streamer_config,
base_to_streaming_receiver,
streaming_to_base_sender.clone(),

View file

@ -13,7 +13,8 @@ use crate::{Config, BUFFER_LENGTH};
const MAX_TOLERATED_MESSAGE_COUNT: usize = 10;
pub async fn connect(
sound_stream_receiver: Receiver<f32>,
microphone_stream_receiver: Receiver<f32>,
audio_stream_receiver: Receiver<f32>,
streamer_config: Config,
mut base_to_streaming: Receiver<bool>,
streaming_to_base: Sender<bool>,
@ -54,9 +55,16 @@ pub async fn connect(
}
let (message_producer, message_consumer) = channel(BUFFER_LENGTH);
println!("Connected to: {}", connect_addr);
let (flow_sender, flow_receiver) = channel(BUFFER_LENGTH);
let mixer_task = tokio::spawn(mixer(
microphone_stream_receiver,
audio_stream_receiver,
flow_sender,
streamer_config.latency,
));
let message_organizer_task = tokio::spawn(message_organizer(
message_producer,
sound_stream_receiver,
flow_receiver,
streamer_config.quality,
streamer_config.latency,
));
@ -65,27 +73,88 @@ pub async fn connect(
tokio::spawn(status_checker(
message_organizer_task,
stream_task,
mixer_task,
base_to_streaming,
streaming_to_base,
));
}
}
async fn mixer(
mut microphone_stream_receiver: Receiver<f32>,
mut audio_stream_receiver: Receiver<f32>,
flow_sender: Sender<f32>,
latency: u16,
) {
loop {
let mut microphone_stream = vec![];
let mut audio_stream = vec![];
let mut microphone_stream_iteration = microphone_stream_receiver.len();
while microphone_stream_iteration > 0 {
microphone_stream_iteration -= 1;
match microphone_stream_receiver.recv().await {
Ok(microphone_datum) => {
microphone_stream.push(microphone_datum);
}
Err(err_val) => {
eprintln!(
"Error: Communication | Microphone Stream | Recv | {}",
err_val
);
}
}
}
let mut audio_stream_iteration = audio_stream_receiver.len();
while audio_stream_iteration > 0 {
audio_stream_iteration -= 1;
match audio_stream_receiver.recv().await {
Ok(audio_datum) => {
audio_stream.push(audio_datum);
}
Err(err_val) => {
eprintln!("Error: Communication | Audio Stream | Recv | {}", err_val);
}
}
}
let mut flow = vec![];
for element in microphone_stream {
flow.push(element * 0.5);
}
for (i, element) in audio_stream.iter().enumerate() {
if flow.len() > i && flow.len() != 0 {
flow[i] = flow[i] + element * 0.5;
} else {
flow.push(element * 0.5);
}
}
for element in flow {
match flow_sender.send(element) {
Ok(_) => {}
Err(err_val) => {
eprintln!("Error: Communication | Flow | Send | {}", err_val);
}
}
}
tokio::time::sleep(Duration::from_millis(latency.into())).await;
}
}
async fn message_organizer(
message_producer: Sender<Message>,
mut receiver: Receiver<f32>,
mut flow_receiver: Receiver<f32>,
quality: u8,
latency: u16,
) {
loop {
let mut messages: Vec<u8> = Vec::new();
let mut iteration = receiver.len();
let mut iteration = flow_receiver.len();
while iteration > 0 {
iteration -= 1;
match receiver.recv().await {
match flow_receiver.recv().await {
Ok(single_data) => {
let ring = HeapRb::<u8>::new(BUFFER_LENGTH);
let (mut producer, mut receiver) = ring.split();
let (mut producer, mut flow_receiver) = ring.split();
let mut charred: Vec<char> = single_data.to_string().chars().collect();
if charred[0] == '0' {
charred.insert(0, '+');
@ -105,8 +174,8 @@ async fn message_organizer(
for element in single_data_packet {
producer.push(element).unwrap();
}
while !receiver.is_empty() {
messages.push(receiver.pop().unwrap());
while !flow_receiver.is_empty() {
messages.push(flow_receiver.pop().unwrap());
}
}
Err(_) => {}
@ -118,17 +187,10 @@ async fn message_organizer(
eprintln!("Error: Compression | {}", err_val);
}
let compressed_messages = compression_writer.into_inner();
// println!("Compressed Len {}", compressed_messages.len());
// println!("UNCompressed Len {}", messages.len());
match message_producer.send(compressed_messages.into()) {
Ok(_) => {}
Err(_) => {}
}
// println!(
// "Message Counter = {} | Receiver Count = {}",
// message_producer.len(),
// message_producer.receiver_count()
// );
}
tokio::time::sleep(Duration::from_millis(latency.into())).await;
}
@ -140,27 +202,15 @@ async fn stream<T: futures_util::Sink<Message> + std::marker::Unpin>(
) {
while let Ok(message) = message_consumer.recv().await {
if message_consumer.len() > MAX_TOLERATED_MESSAGE_COUNT {
// println!(
// "{} Forced to Disconnect | Reason -> Slow Consumer",
// format!("{}:{}", listener.ip, listener.port)
// );
break;
}
match ws_stream.send(message).await {
Ok(_) => {
if let Err(_) = ws_stream.flush().await {
// println!(
// "{} is Disconnected",
// format!("{}:{}", listener.ip, listener.port)
// );
break;
}
}
Err(_) => {
// println!(
// "{} is Disconnected",
// format!("{}:{}", listener.ip, listener.port)
// );
break;
}
}
@ -170,6 +220,7 @@ async fn stream<T: futures_util::Sink<Message> + std::marker::Unpin>(
async fn status_checker(
message_organizer_task: JoinHandle<()>,
stream_task: JoinHandle<()>,
mixer_task: JoinHandle<()>,
mut base_to_streaming: Receiver<bool>,
streaming_to_base: Sender<bool>,
) {
@ -177,7 +228,10 @@ async fn status_checker(
tokio::time::sleep(Duration::from_secs(3)).await;
}
stream_task.abort();
mixer_task.abort();
message_organizer_task.abort();
let _ = streaming_to_base.send(true);
println!("Cleaning Done: Streamer Disconnected");
match streaming_to_base.send(true) {
Ok(_) => println!("Cleaning Done: Streamer Disconnected"),
Err(err_val) => eprintln!("Error: Cleaning | {}", err_val),
}
}