From 8247bcee214b26ffe5882a93a85f3e0dce136247 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ahmet=20Kaan=20G=C3=9CM=C3=9C=C5=9E?= <96421894+Tahinli@users.noreply.github.com> Date: Tue, 23 Jul 2024 04:11:10 +0300 Subject: [PATCH] feat: :sparkles: decode on the go --- front/src/main.rs | 3 +- front/src/streaming.rs | 17 +-- streamer/Cargo.toml | 13 +- streamer/src/gui.rs | 225 +++++++++++++++++++-------------- streamer/src/gui_components.rs | 13 +- streamer/src/gui_utils.rs | 16 ++- streamer/src/main.rs | 42 ++++-- streamer/src/playing.rs | 189 +++++++++++++++------------ streamer/src/streaming.rs | 16 ++- streamer/src/utils.rs | 11 +- 10 files changed, 312 insertions(+), 233 deletions(-) diff --git a/front/src/main.rs b/front/src/main.rs index d7389b7..4a51bf2 100644 --- a/front/src/main.rs +++ b/front/src/main.rs @@ -1,5 +1,5 @@ use dioxus::prelude::*; -use front::components::{coin_status_renderer, listen_renderer, server_status_renderer}; +use front::components::listen_renderer; fn main() { println!("Hello, world!"); @@ -8,7 +8,6 @@ fn main() { } fn app() -> Element { - let server_address = "https://tahinli.com.tr:2323".to_string(); rsx! { page_base {} listen_renderer {} diff --git a/front/src/streaming.rs b/front/src/streaming.rs index fb408b3..17f4383 100644 --- a/front/src/streaming.rs +++ b/front/src/streaming.rs @@ -7,7 +7,6 @@ use futures_util::StreamExt; use ringbuf::{HeapRb, Producer, SharedRb}; use std::{io::Write, mem::MaybeUninit, sync::Arc}; - use crate::{listening::listen_podcast, BUFFER_LENGTH}; pub async fn start_listening( @@ -17,13 +16,9 @@ pub async fn start_listening( if is_listening() { log::info!("Trying Sir"); let connect_addr = "ws://192.168.1.2:2424"; - + let ws_stream: tokio_tungstenite_wasm::WebSocketStream; - match tokio_tungstenite_wasm::connect( - connect_addr, - ) - .await - { + match tokio_tungstenite_wasm::connect(connect_addr).await { Ok(ws_stream_connected) => ws_stream = ws_stream_connected, Err(_) => { is_listening.set(false); @@ -67,10 +62,10 @@ pub async fn sound_stream( log::error!("Error: Decompression | {}", err_val); } let uncompressed_data = match decompression_writer.into_inner() { - Ok(healty_packet) => healty_packet, - Err(unhealty_packet) => { - log::warn!("Warning: Unhealty Packet | {}", unhealty_packet.len()); - unhealty_packet + Ok(healthy_packet) => healthy_packet, + Err(unhealthy_packet) => { + log::warn!("Warning: Unhealthy Packet | {}", unhealthy_packet.len()); + unhealthy_packet } }; log::info!("{}", uncompressed_data.len()); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 64137e7..7c5d090 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -3,13 +3,24 @@ name = "streamer" version = "0.1.0" edition = "2021" +[profile.release] +strip = "symbols" +opt-level = 3 +overflow-checks = true +lto = true +codegen-units = 1 +panic = "abort" + +[lints.rust] +unsafe_code = "forbid" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] brotli = "5.0.0" cpal = "0.15.3" futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] } -iced = { git = "https://github.com/iced-rs/iced", features = ["tokio"] } +iced = { git = "https://github.com/iced-rs/iced", features = ["tokio"], rev = "dcdf1307006883f50083c186ca7b8656bfa60873"} ringbuf = "0.3.3" rubato = "0.15.0" rustls-pemfile = "2.1.2" diff --git a/streamer/src/gui.rs b/streamer/src/gui.rs index cd68004..b2e4b1d 100644 --- a/streamer/src/gui.rs +++ b/streamer/src/gui.rs @@ -1,14 +1,18 @@ -use std::{cmp::max, fs::File, path::Path, sync::Arc}; +use std::{ + cmp::max, + fs::File, + path::Path, + process::exit, + sync::{Arc, Mutex}, +}; use iced::{ alignment, widget::{column, container, row, scrollable, slider, text::LineHeight, Container, Rule}, - window, Color, Command, Length, Subscription, -}; -use tokio::sync::{ - broadcast::{channel, Receiver, Sender}, - Mutex, + window::{self}, + Color, Length, Subscription, Task, }; +use tokio::sync::broadcast::{channel, Receiver, Sender}; use crate::{ gui_components::{button_with_centered_text, text_centered}, @@ -135,6 +139,52 @@ impl Default for Streamer { } impl Streamer { + pub fn new_with_load() -> (Self, Task) { + ( + Self { + config: None, + data_channel: DataChannel { + microphone_stream_sender: channel(BUFFER_LENGTH).0, + audio_stream_sender: channel(BUFFER_LENGTH).0, + }, + communication_channel: CommunicationChannel { + base_to_streaming_sender: channel(1).0, + streaming_to_base_sender: channel(1).0, + streaming_to_base_is_finished: channel(1).0, + base_to_recording_sender: channel(1).0, + recording_to_base_sender: channel(1).0, + base_to_playing_sender: channel(1).0, + playing_to_base_sender: channel(1).0, + }, + audio_miscellaneous: AudioMiscellaneous { + file: None, + selected_file_name: String::new(), + playing_file_name: String::new(), + files: None, + decoded_to_playing_sender: None, + }, + gui_status: GUIStatus { + are_we_connect: Condition::Passive, + are_we_record: Condition::Passive, + are_we_play_audio: Condition::Passive, + are_we_paused_audio: Condition::Passive, + microphone_volume: ChangeableValue { + value: Arc::new(1.0.into()), + }, + audio_volume: ChangeableValue { + value: Arc::new(1.0.into()), + }, + }, + }, + Task::perform( + async move { + let config = get_config(); + Event::LoadConfig(config) + }, + Message::Event, + ), + ) + } fn new() -> Self { Self { config: None, @@ -172,10 +222,10 @@ impl Streamer { }, } } - pub fn update(&mut self, message: Message) -> Command { + pub fn update(&mut self, message: Message) -> Task { match message { Message::Event(event) => match event { - Event::None => Command::none(), + Event::None => Task::none(), Event::Connect => { println!("Connect"); self.gui_status.are_we_connect = Condition::Loading; @@ -206,7 +256,7 @@ impl Streamer { .streaming_to_base_sender .subscribe(); - let connect_command = Command::perform( + let connect_command = Task::perform( async move { gui_utils::connect( microphone_stream_receiver, @@ -223,7 +273,7 @@ impl Streamer { Message::State, ); - let is_streaming_finished_command = Command::perform( + let is_streaming_finished_command = Task::perform( async move { gui_utils::is_streaming_finished( streaming_to_base_receiver_is_streaming_finished, @@ -235,7 +285,7 @@ impl Streamer { ); let commands = vec![connect_command, is_streaming_finished_command]; - Command::batch(commands) + Task::batch(commands) } Event::Disconnect => { println!("Disconnect"); @@ -248,7 +298,7 @@ impl Streamer { let base_to_streaming_sender = self.communication_channel.base_to_streaming_sender.clone(); - Command::perform( + Task::perform( async move { gui_utils::disconnect( streaming_to_base_receiver, @@ -272,7 +322,7 @@ impl Streamer { .base_to_recording_sender .subscribe(); - Command::perform( + Task::perform( async move { gui_utils::start_recording( microphone_stream_sender, @@ -293,7 +343,7 @@ impl Streamer { .subscribe(); let base_to_recording_sender = self.communication_channel.base_to_recording_sender.clone(); - Command::perform( + Task::perform( async move { gui_utils::stop_recording( recording_to_base_receiver, @@ -319,7 +369,7 @@ impl Streamer { eprintln!("Error: Open File | {}", err_val); self.audio_miscellaneous.file = None; self.gui_status.are_we_play_audio = Condition::Passive; - return Command::none(); + return Task::none(); } } self.audio_miscellaneous.decoded_to_playing_sender = Some( @@ -379,7 +429,7 @@ impl Streamer { let audio_volume = self.gui_status.audio_volume.value.clone(); - let playing_command = Command::perform( + let playing_command = Task::perform( async move { gui_utils::start_playing( audio_stream_sender, @@ -393,7 +443,7 @@ impl Streamer { }, Message::State, ); - let is_finished_command = Command::perform( + let is_finished_command = Task::perform( async move { gui_utils::is_playing_finished( playing_to_base_receiver_is_audio_finished, @@ -406,7 +456,7 @@ impl Streamer { Message::State, ); let commands = vec![playing_command, is_finished_command]; - Command::batch(commands) + Task::batch(commands) } Event::StopAudio => { println!("Stop Audio"); @@ -419,7 +469,7 @@ impl Streamer { let base_to_playing_sender = self.communication_channel.base_to_playing_sender.clone(); - Command::perform( + Task::perform( async move { gui_utils::stop_playing( playing_to_base_receiver, @@ -441,7 +491,7 @@ impl Streamer { let base_to_playing_sender = self.communication_channel.base_to_playing_sender.clone(); - Command::perform( + Task::perform( async move { gui_utils::pause_playing( playing_to_base_receiver, @@ -463,7 +513,7 @@ impl Streamer { let base_to_playing_sender = self.communication_channel.base_to_playing_sender.clone(); - Command::perform( + Task::perform( async move { gui_utils::continue_playing( playing_to_base_receiver, @@ -486,117 +536,110 @@ impl Streamer { self.audio_miscellaneous.file = None; } } - Command::none() + Task::none() } Event::ChangeMicrophoneVolume(value) => { - *self.gui_status.microphone_volume.value.blocking_lock() = value; + // let microphone_volume = self.gui_status.microphone_volume.value.clone(); + //*self.gui_status.microphone_volume.value.blocking_lock() = value; let microphone_volume = self.gui_status.microphone_volume.value.clone(); - Command::perform( + Task::perform( async move { change_microphone_volume(value, microphone_volume).await }, Message::State, ) } Event::ChangeAudioVolume(value) => { - *self.gui_status.audio_volume.value.blocking_lock() = value; + // *self.gui_status.audio_volume.value.blocking_lock() = value; let audio_volume = self.gui_status.audio_volume.value.clone(); - Command::perform( + Task::perform( async move { change_audio_volume(value, audio_volume).await }, Message::State, ) } Event::LoadConfig(config) => { self.config = Some(config); - Command::none() + Task::none() } Event::ListFiles(files) => { self.audio_miscellaneous.files = files; - Command::none() + Task::none() } Event::IcedEvent(iced_event) => match iced_event { - iced::Event::Keyboard(_) => Command::perform( + iced::Event::Keyboard(_) => Task::perform( async move { let files = gui_utils::list_files(Path::new(AUDIOS_PATH)).await; Event::ListFiles(files) }, Message::Event, ), - iced::Event::Mouse(_) => Command::perform( + iced::Event::Mouse(_) => Task::perform( async move { let files = gui_utils::list_files(Path::new(AUDIOS_PATH)).await; Event::ListFiles(files) }, Message::Event, ), - iced::Event::Window(id, window_event) => { - if let window::Event::CloseRequested = window_event { - self.exit(id) - } else { - Command::perform( - async move { - let files = gui_utils::list_files(Path::new(AUDIOS_PATH)).await; - Event::ListFiles(files) - }, - Message::Event, - ) - } - } - iced::Event::Touch(_) => Command::perform( + iced::Event::Touch(_) => Task::perform( async move { let files = gui_utils::list_files(Path::new(AUDIOS_PATH)).await; Event::ListFiles(files) }, Message::Event, ), - iced::Event::PlatformSpecific(_) => Command::perform( - async move { - let files = gui_utils::list_files(Path::new(AUDIOS_PATH)).await; - Event::ListFiles(files) + iced::Event::Window(windows_event) => Task::perform( + { + if let window::Event::CloseRequested = windows_event { + self.exit(); + } + async move { + let files = gui_utils::list_files(Path::new(AUDIOS_PATH)).await; + Event::ListFiles(files) + } }, Message::Event, ), }, - Event::CloseWindow(id) => window::close(id), + Event::CloseWindow(_) => self.exit(), }, Message::State(state) => match state { - State::None => Command::none(), + State::None => Task::none(), State::Connected => { self.gui_status.are_we_connect = Condition::Active; - Command::none() + Task::none() } State::Disconnected => { self.gui_status.are_we_connect = Condition::Passive; - Command::none() + Task::none() } State::Recording => { self.gui_status.are_we_record = Condition::Active; - Command::none() + Task::none() } State::StopRecording => { self.gui_status.are_we_record = Condition::Passive; - Command::none() + Task::none() } State::PlayingAudio => { self.audio_miscellaneous.playing_file_name = self.audio_miscellaneous.selected_file_name.clone(); self.gui_status.are_we_play_audio = Condition::Active; self.gui_status.are_we_paused_audio = Condition::Passive; - Command::none() + Task::none() } State::StopAudio => { self.audio_miscellaneous.playing_file_name = String::new(); self.gui_status.are_we_play_audio = Condition::Passive; - Command::none() + Task::none() } State::PausedAudio => { self.gui_status.are_we_paused_audio = Condition::Active; - Command::none() + Task::none() } State::ContinuedAudio => { self.gui_status.are_we_paused_audio = Condition::Passive; - Command::none() + Task::none() } - State::MicrophoneVolumeChanged => Command::none(), - State::AudioVolumeChanged => Command::none(), + State::MicrophoneVolumeChanged => Task::none(), + State::AudioVolumeChanged => Task::none(), }, } } @@ -700,14 +743,14 @@ impl Streamer { let microphone_volume_slider = slider( 0.0..=1.0, - *self.gui_status.microphone_volume.value.blocking_lock(), + *self.gui_status.microphone_volume.value.lock().unwrap(), |value| Message::Event(Event::ChangeMicrophoneVolume(value)), ) .step(0.01); let audio_volume_slider = slider( 0.0..=1.0, - *self.gui_status.audio_volume.value.blocking_lock(), + *self.gui_status.audio_volume.value.lock().unwrap(), |value| Message::Event(Event::ChangeAudioVolume(value)), ) .step(0.01); @@ -717,7 +760,7 @@ impl Streamer { None => 0, }; - let longest_name_for_scrollable = match self.audio_miscellaneous.files.as_ref() { + let longest_audio_name = match self.audio_miscellaneous.files.as_ref() { Some(audio_files) => { let mut longest = 0; for audio_file in audio_files { @@ -733,7 +776,7 @@ impl Streamer { let mut audio_scrollable_content = column![] .spacing(1) .height(audio_file_size_for_scrollable) - .width(longest_name_for_scrollable); + .width(longest_audio_name); let audio_selected = text_centered(format!( "Selected: {}", self.audio_miscellaneous.selected_file_name.clone() @@ -750,10 +793,10 @@ impl Streamer { audio_scrollable_content.push(button.height(AUDIO_SCROLLABLE_BUTTON_SIZE)); } } - let audios_scrollable = scrollable(audio_scrollable_content) - .height(200) - .width(longest_name_for_scrollable); - let audio_info_content = column![audio_selected, audio_playing,].height(60); + let audios_scrollable = scrollable(audio_scrollable_content).height(200).width(350); + let audio_info_content = column![audio_selected, audio_playing,] + .height(100) + .width(longest_audio_name); let header_content = row![header].width(350).height(50); let text_content = row![ connection_text, @@ -817,17 +860,17 @@ impl Streamer { .map(Event::IcedEvent) .map(Message::Event) } - pub fn load_config() -> Command { - Command::perform( + pub fn load_config() -> Task { + Task::perform( async move { - let config = get_config().await; + let config = get_config(); Event::LoadConfig(config) }, Message::Event, ) } - pub fn list_files() -> Command { - Command::perform( + pub fn list_files() -> Task { + Task::perform( async move { let files = gui_utils::list_files(Path::new(AUDIOS_PATH)).await; Event::ListFiles(files) @@ -843,27 +886,22 @@ impl Streamer { playing_to_base_receiver: Receiver, base_to_playing_sender: Sender, features_in_need: Features, - window_id: window::Id, - ) -> Command { - Command::perform( - async move { - if features_in_need.stream { - gui_utils::disconnect(streaming_to_base_receiver, base_to_streaming_sender) - .await; - } - if features_in_need.record { - gui_utils::stop_recording(recording_to_base_receiver, base_to_recording_sender) - .await; - } - if features_in_need.play_audio { - gui_utils::stop_playing(playing_to_base_receiver, base_to_playing_sender).await; - } - Event::CloseWindow(window_id) - }, - Message::Event, - ) + ) -> Task { + tokio::spawn(async move { + if features_in_need.stream { + gui_utils::disconnect(streaming_to_base_receiver, base_to_streaming_sender).await; + } + if features_in_need.record { + gui_utils::stop_recording(recording_to_base_receiver, base_to_recording_sender) + .await; + } + if features_in_need.play_audio { + gui_utils::stop_playing(playing_to_base_receiver, base_to_playing_sender).await; + } + }); + exit(1); } - fn exit(&self, window_id: window::Id) -> Command { + fn exit(&self) -> Task { let mut features_in_need = Features { stream: false, record: false, @@ -905,7 +943,6 @@ impl Streamer { playing_to_base_receiver, base_to_playing_sender, features_in_need, - window_id, ) } } diff --git a/streamer/src/gui_components.rs b/streamer/src/gui_components.rs index 6863c5c..5ae03ff 100644 --- a/streamer/src/gui_components.rs +++ b/streamer/src/gui_components.rs @@ -1,5 +1,4 @@ use iced::{ - alignment, widget::{button, text, Button, Text}, Length, }; @@ -7,18 +6,14 @@ use iced::{ use crate::gui::Message; pub fn button_with_centered_text>(txt: T) -> Button<'static, Message> { - button( - text(txt.into()) - .width(Length::Fill) - .horizontal_alignment(alignment::Horizontal::Center), - ) - .height(Length::Fill) - .width(Length::Fill) + button(text(txt.into()).width(Length::Fill).center()) + .height(Length::Fill) + .width(Length::Fill) } pub fn text_centered>(txt: T) -> Text<'static> { text(txt.into()) .width(Length::Fill) .height(Length::Fill) - .horizontal_alignment(alignment::Horizontal::Center) + .center() } diff --git a/streamer/src/gui_utils.rs b/streamer/src/gui_utils.rs index f135bbf..8006b79 100644 --- a/streamer/src/gui_utils.rs +++ b/streamer/src/gui_utils.rs @@ -1,10 +1,12 @@ -use std::{fs::File, path::Path, sync::Arc, time::Duration}; - -use tokio::sync::{ - broadcast::{Receiver, Sender}, - Mutex, +use std::{ + fs::File, + path::Path, + sync::{Arc, Mutex}, + time::Duration, }; +use tokio::sync::broadcast::{Receiver, Sender}; + use crate::{ gui::{Player, State}, playing, recording, streaming, Config, @@ -348,7 +350,7 @@ pub async fn change_microphone_volume( desired_value: f32, microphone_stream_volume: Arc>, ) -> State { - *microphone_stream_volume.lock().await = desired_value; + *microphone_stream_volume.lock().unwrap() = desired_value; State::MicrophoneVolumeChanged } @@ -356,7 +358,7 @@ pub async fn change_audio_volume( desired_value: f32, audio_stream_volume: Arc>, ) -> State { - *audio_stream_volume.lock().await = desired_value; + *audio_stream_volume.lock().unwrap() = desired_value; State::AudioVolumeChanged } diff --git a/streamer/src/main.rs b/streamer/src/main.rs index b0b7f02..6a96769 100644 --- a/streamer/src/main.rs +++ b/streamer/src/main.rs @@ -3,16 +3,34 @@ use streamer::gui::Streamer; #[tokio::main] async fn main() { println!("Hello, world!"); - tokio::task::block_in_place(|| { - iced::program("Streamer GUI", Streamer::update, Streamer::view) - .centered() - .window_size((350.0, 650.0)) - .load(Streamer::load_config) - .load(Streamer::list_files) - .antialiasing(true) - .subscription(Streamer::subscription) - .exit_on_close_request(false) - .run() - .unwrap() - }); + + iced::application("Streamer GUI", Streamer::update, Streamer::view) + .centered() + .window_size((350.0, 650.0)) + .antialiasing(true) + .subscription(Streamer::subscription) + .exit_on_close_request(false) + .run_with(|| Streamer::new_with_load()) + .unwrap() + + // tokio::task::spawn_blocking(|| { + // iced::application("Streamer GUI", Streamer::update, Streamer::view) + // .centered() + // .window_size((350.0, 650.0)) + // .antialiasing(true) + // .subscription(Streamer::subscription) + // .exit_on_close_request(false) + // .run_with(|| Streamer::new_with_load()) + // .unwrap() + // }); + // tokio::task::block_in_place(|| { + // iced::application("Streamer GUI", Streamer::update, Streamer::view) + // .centered() + // .window_size((350.0, 650.0)) + // .antialiasing(true) + // .subscription(Streamer::subscription) + // .exit_on_close_request(false) + // .run_with(|| Streamer::new_with_load()) + // .unwrap() + // }); } diff --git a/streamer/src/playing.rs b/streamer/src/playing.rs index 6e0b56c..b22416e 100644 --- a/streamer/src/playing.rs +++ b/streamer/src/playing.rs @@ -1,4 +1,8 @@ -use std::{fs::File, sync::Arc}; +use std::{ + fs::File, + sync::{Arc, Mutex}, + time::Duration, +}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use rubato::{ @@ -6,17 +10,14 @@ use rubato::{ }; use symphonia::core::{ audio::{AudioBufferRef, Signal}, - codecs::{DecoderOptions, CODEC_TYPE_NULL}, - formats::FormatOptions, + codecs::{Decoder, DecoderOptions, CODEC_TYPE_NULL}, + formats::{FormatOptions, FormatReader}, io::MediaSourceStream, meta::MetadataOptions, probe::Hint, }; use tokio::{ - sync::{ - broadcast::{Receiver, Sender}, - Mutex, - }, + sync::broadcast::{Receiver, Sender}, task, }; @@ -37,30 +38,20 @@ pub async fn play( let output_device_sample_rate = output_device_config.sample_rate.0; - let (mut audio_resampled_left, mut audio_resampled_right) = - match decode_audio(output_device_sample_rate, file) { - Some((left, right)) => (left, right), - None => { - let_the_base_know(playing_to_base_sender, Player::Stop).await; - return; - } - }; - let mut decoded_to_playing_receiver = decoded_to_playing_sender.subscribe(); - for _ in 0..audio_resampled_left.clone().len() { - decoded_to_playing_sender - .send(audio_resampled_left.pop().unwrap() as f32) - .unwrap(); - decoded_to_playing_sender - .send(audio_resampled_right.pop().unwrap() as f32) - .unwrap(); + let audio_process_task = tokio::spawn(process_audio( + output_device_sample_rate, + file, + decoded_to_playing_sender, + )); + while decoded_to_playing_receiver.is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; } - let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { for sample in data { if decoded_to_playing_receiver.len() > 0 { let single = match decoded_to_playing_receiver.blocking_recv() { - Ok(single) => single * *audio_volume.blocking_lock(), + Ok(single) => single * *audio_volume.lock().unwrap(), Err(_) => 0.0, }; if audio_stream_sender.receiver_count() > 0 { @@ -101,9 +92,19 @@ pub async fn play( //todo when pause error, do software level stop Err(_) => todo!(), }, - Player::Stop => break, + Player::Stop => { + if !audio_process_task.is_finished() { + audio_process_task.abort(); + } + break; + } }, - Err(_) => break, + Err(_) => { + if !audio_process_task.is_finished() { + audio_process_task.abort(); + } + break; + } } }); drop(output_stream); @@ -116,9 +117,56 @@ fn err_fn(err: cpal::StreamError) { async fn let_the_base_know(playing_to_base_sender: Sender, action: Player) { let _ = playing_to_base_sender.send(action); } -fn decode_audio(output_device_sample_rate: u32, file: File) -> Option<(Vec, Vec)> { + +fn decode_audio( + format: &mut Box, + track_id: u32, + decoder: &mut Box, +) -> Option<(Vec, Vec)> { let mut audio_decoded_left = vec![]; let mut audio_decoded_right = vec![]; + let packet = match format.next_packet() { + Ok(packet) => packet, + Err(_) => return None, + }; + + while !format.metadata().is_latest() { + format.metadata().pop(); + } + + if packet.track_id() != track_id { + return None; + } + + if let Ok(decoded) = decoder.decode(&packet) { + if let AudioBufferRef::F32(buf) = decoded { + for (left, right) in buf.chan(0).iter().zip(buf.chan(1).iter()) { + audio_decoded_left.push(*left as f64); + audio_decoded_right.push(*right as f64); + } + } + } + Some((audio_decoded_left, audio_decoded_right)) +} + +fn resample_audio( + audio_decoded_left: Vec, + audio_decoded_right: Vec, + resampler: &mut SincFixedIn, +) -> (Vec, Vec) { + let audio_decoded_channels_combined = vec![audio_decoded_left, audio_decoded_right]; + let audio_resampled = resampler + .process(&audio_decoded_channels_combined, None) + .unwrap(); + + (audio_resampled[0].clone(), audio_resampled[1].clone()) +} + +async fn process_audio( + output_device_sample_rate: u32, + file: File, + decoded_to_playing_sender: tokio::sync::broadcast::Sender, +) { let media_source_stream = MediaSourceStream::new(Box::new(file), Default::default()); let hint = Hint::new(); @@ -135,7 +183,7 @@ fn decode_audio(output_device_sample_rate: u32, file: File) -> Option<(Vec, match probed { Ok(probed_safe) => probed = Ok(probed_safe), - Err(_) => return None, + Err(_) => return, } let mut format = probed.unwrap().format; @@ -147,7 +195,6 @@ fn decode_audio(output_device_sample_rate: u32, file: File) -> Option<(Vec, .unwrap(); let audio_sample_rate = track.codec_params.sample_rate.unwrap(); - DecoderOptions::default(); let decoder_options = DecoderOptions::default(); let mut decoder = symphonia::default::get_codecs() @@ -156,74 +203,50 @@ fn decode_audio(output_device_sample_rate: u32, file: File) -> Option<(Vec, let track_id = track.id; - loop { - let packet = match format.next_packet() { - Ok(packet) => packet, - Err(_) => { - break; - } - }; - - while !format.metadata().is_latest() { - format.metadata().pop(); - } - - if packet.track_id() != track_id { - continue; - } - - match decoder.decode(&packet) { - Ok(decoded) => match decoded { - AudioBufferRef::F32(buf) => { - for (left, right) in buf.chan(0).iter().zip(buf.chan(1).iter()) { - audio_decoded_left.push(*left as f64); - audio_decoded_right.push(*right as f64); - } - } - _ => {} - }, - Err(_) => { - //eprintln!("Error: Sample Decode | {}", err_val); - println!("End ?"); - } - } - } - let params = SincInterpolationParameters { sinc_len: 256, f_cutoff: 0.95, interpolation: SincInterpolationType::Linear, - oversampling_factor: 256, + oversampling_factor: 128, window: WindowFunction::BlackmanHarris2, }; + + let chunk_size = match decode_audio(&mut format, track_id, &mut decoder) { + Some((audio_decoded_left_channel, _)) => audio_decoded_left_channel.len(), + None => return, + }; + let mut resampler = SincFixedIn::::new( output_device_sample_rate as f64 / audio_sample_rate as f64, 2.0, params, - audio_decoded_left.len(), + chunk_size, 2, ) .unwrap(); - let audio_decoded_channes_combined = - vec![audio_decoded_left.clone(), audio_decoded_right.clone()]; - let audio_resampled = resampler - .process(&audio_decoded_channes_combined, None) - .unwrap(); + loop { + let (mut audio_decoded_left, mut audio_decoded_right) = (vec![], vec![]); - let mut audio_resampled_left = vec![]; - let mut audio_resampled_right = vec![]; + match decode_audio(&mut format, track_id, &mut decoder) { + Some((audio_decoded_left_channel, audio_decoded_right_channel)) => { + for (single_left, single_right) in audio_decoded_left_channel + .iter() + .zip(&audio_decoded_right_channel) + { + audio_decoded_left.push(*single_left); + audio_decoded_right.push(*single_right); + } + } + None => break, + }; - for sample in &audio_resampled[0] { - audio_resampled_left.push(*sample); + let (audio_resampled_left, audio_resampled_right) = + resample_audio(audio_decoded_left, audio_decoded_right, &mut resampler); + + for (single_left, single_right) in audio_resampled_left.iter().zip(&audio_resampled_right) { + let _ = decoded_to_playing_sender.send(*single_left as f32); + let _ = decoded_to_playing_sender.send(*single_right as f32); + } } - - for sample in &audio_resampled[1] { - audio_resampled_right.push(*sample); - } - - audio_resampled_left.reverse(); - audio_resampled_right.reverse(); - - Some((audio_resampled_left, audio_resampled_right)) } diff --git a/streamer/src/streaming.rs b/streamer/src/streaming.rs index fa3234e..4bf1505 100644 --- a/streamer/src/streaming.rs +++ b/streamer/src/streaming.rs @@ -1,13 +1,15 @@ -use std::{cmp::min, io::Write, sync::Arc, time::Duration}; +use std::{ + cmp::min, + io::Write, + sync::{Arc, Mutex}, + time::Duration, +}; use brotli::CompressorWriter; use futures_util::SinkExt; use ringbuf::HeapRb; use tokio::{ - sync::{ - broadcast::{channel, Receiver, Sender}, - Mutex, - }, + sync::broadcast::{channel, Receiver, Sender}, task::JoinHandle, }; use tokio_tungstenite::tungstenite::Message; @@ -157,8 +159,8 @@ async fn mixer( } let mut flow = vec![]; - let microphone_volume = *microphone_stream_volume.lock().await; - let audio_volume = *audio_stream_volume.lock().await; + let microphone_volume = *microphone_stream_volume.lock().unwrap(); + let audio_volume = *audio_stream_volume.lock().unwrap(); for element in microphone_stream { if element < 0.01 || element > -0.01 { diff --git a/streamer/src/utils.rs b/streamer/src/utils.rs index 6323894..0f3ffa5 100644 --- a/streamer/src/utils.rs +++ b/streamer/src/utils.rs @@ -1,14 +1,11 @@ -use tokio::{fs::File, io::AsyncReadExt}; +use std::{fs::File, io::Read}; use crate::Config; -pub async fn get_config() -> Config { - let mut config_file = File::open("configs/streamer_configs.txt").await.unwrap(); +pub fn get_config() -> Config { + let mut config_file = File::open("configs/streamer_configs.txt").unwrap(); let mut configs_unparsed = String::new(); - config_file - .read_to_string(&mut configs_unparsed) - .await - .unwrap(); + config_file.read_to_string(&mut configs_unparsed).unwrap(); let configs_parsed: Vec<&str> = configs_unparsed.split_terminator("\n").collect(); let mut configs_cleaned: Vec<&str> = vec![];