feat: play audio files through stream

This commit is contained in:
Ahmet Kaan GÜMÜŞ 2024-05-03 04:32:47 +03:00
parent b10cf99a13
commit c490cc752e
6 changed files with 366 additions and 120 deletions

View file

@ -73,7 +73,7 @@ pub async fn sound_stream(
unhealty_packet
}
};
log::info!("{}", uncompressed_data.len());
let data = String::from_utf8(uncompressed_data).unwrap();
let mut datum_parsed: Vec<char> = vec![];
let mut data_parsed: Vec<String> = vec![];

View file

@ -11,8 +11,10 @@ cpal = "0.15.3"
futures-util = { version = "0.3.30", features = ["futures-sink", "sink"] }
iced = { git = "https://github.com/iced-rs/iced", features = ["tokio"] }
ringbuf = "0.3.3"
rubato = "0.15.0"
rustls-pemfile = "2.1.2"
rustls-platform-verifier = "0.2.0"
symphonia = { version = "0.5.4", features = ["all"] }
tokio = { version = "1.36.0", features = ["full"] }
tokio-rustls = "0.25.0"
tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-webpki-roots"] }

View file

@ -1,5 +1,7 @@
use iced::{
alignment, widget::{column, container, row, text::LineHeight, Container, Rule}, window, Color, Command, Subscription
alignment,
widget::{column, container, row, text::LineHeight, Container, Rule},
window, Color, Command, Subscription,
};
use tokio::sync::broadcast::{channel, Receiver, Sender};
@ -49,16 +51,17 @@ pub enum Message {
}
#[derive(Debug)]
struct DataChannel {
sound_stream_sender: Sender<f32>,
microphone_stream_sender: Sender<f32>,
audio_stream_sender: Sender<f32>,
}
#[derive(Debug)]
struct CommunicationChannel {
base_to_streaming: Sender<bool>,
streaming_to_base: Sender<bool>,
base_to_recording: Sender<bool>,
recording_to_base: Sender<bool>,
base_to_playing_audio: Sender<bool>,
playing_audio_to_base: Sender<bool>,
base_to_streaming_sender: Sender<bool>,
streaming_to_base_sender: Sender<bool>,
base_to_recording_sender: Sender<bool>,
recording_to_base_sender: Sender<bool>,
base_to_playing_sender: Sender<bool>,
playing_to_base_sender: Sender<bool>,
}
#[derive(Debug, PartialEq)]
enum Condition {
@ -91,15 +94,16 @@ impl Streamer {
Self {
config: None,
data_channel: DataChannel {
sound_stream_sender: channel(BUFFER_LENGTH).0,
microphone_stream_sender: channel(BUFFER_LENGTH).0,
audio_stream_sender: channel(BUFFER_LENGTH).0,
},
communication_channel: CommunicationChannel {
base_to_streaming: channel(1).0,
streaming_to_base: channel(1).0,
base_to_recording: channel(1).0,
recording_to_base: channel(1).0,
base_to_playing_audio: channel(1).0,
playing_audio_to_base: channel(1).0,
base_to_streaming_sender: channel(1).0,
streaming_to_base_sender: channel(1).0,
base_to_recording_sender: channel(1).0,
recording_to_base_sender: channel(1).0,
base_to_playing_sender: channel(1).0,
playing_to_base_sender: channel(1).0,
},
gui_status: GUIStatus {
are_we_connect: Condition::Passive,
@ -116,19 +120,23 @@ impl Streamer {
println!("Connect");
self.gui_status.are_we_connect = Condition::Loading;
let sound_stream_receiver = self.data_channel.sound_stream_sender.subscribe();
let sound_stream_receiver =
self.data_channel.microphone_stream_sender.subscribe();
let streamer_config = self.config.clone().unwrap();
let streaming_to_base = self.communication_channel.streaming_to_base.clone();
let base_to_streaming =
self.communication_channel.base_to_streaming.subscribe();
let streaming_to_base_sender =
self.communication_channel.streaming_to_base_sender.clone();
let base_to_streaming_receiver = self
.communication_channel
.base_to_streaming_sender
.subscribe();
Command::perform(
async move {
gui_utils::connect(
sound_stream_receiver,
streamer_config,
streaming_to_base,
base_to_streaming,
streaming_to_base_sender,
base_to_streaming_receiver,
)
.await
},
@ -139,12 +147,21 @@ impl Streamer {
println!("Disconnect");
self.gui_status.are_we_connect = Condition::Loading;
let streaming_to_base =
self.communication_channel.streaming_to_base.subscribe();
let base_to_streaming = self.communication_channel.base_to_streaming.clone();
let streaming_to_base_receiver = self
.communication_channel
.streaming_to_base_sender
.subscribe();
let base_to_streaming_sender =
self.communication_channel.base_to_streaming_sender.clone();
Command::perform(
async move { gui_utils::disconnect(streaming_to_base, base_to_streaming).await },
async move {
gui_utils::disconnect(
streaming_to_base_receiver,
base_to_streaming_sender,
)
.await
},
Message::State,
)
}
@ -152,17 +169,21 @@ impl Streamer {
println!("Record");
self.gui_status.are_we_record = Condition::Loading;
let sound_stream_sender = self.data_channel.sound_stream_sender.clone();
let recording_to_base = self.communication_channel.recording_to_base.clone();
let base_to_recording =
self.communication_channel.base_to_recording.subscribe();
let microphone_stream_sender =
self.data_channel.microphone_stream_sender.clone();
let recording_to_base_sender =
self.communication_channel.recording_to_base_sender.clone();
let base_to_recording_receiver = self
.communication_channel
.base_to_recording_sender
.subscribe();
Command::perform(
async move {
gui_utils::start_recording(
sound_stream_sender,
recording_to_base,
base_to_recording,
microphone_stream_sender,
recording_to_base_sender,
base_to_recording_receiver,
)
.await
},
@ -172,12 +193,19 @@ impl Streamer {
Event::StopRecord => {
println!("Stop Record");
self.gui_status.are_we_record = Condition::Loading;
let recording_to_base =
self.communication_channel.recording_to_base.subscribe();
let base_to_recording = self.communication_channel.base_to_recording.clone();
let recording_to_base_receiver = self
.communication_channel
.recording_to_base_sender
.subscribe();
let base_to_recording_sender =
self.communication_channel.base_to_recording_sender.clone();
Command::perform(
async move {
gui_utils::stop_recording(recording_to_base, base_to_recording).await
gui_utils::stop_recording(
recording_to_base_receiver,
base_to_recording_sender,
)
.await
},
Message::State,
)
@ -185,17 +213,21 @@ impl Streamer {
Event::PlayAudio => {
println!("Play Audio");
self.gui_status.are_we_play_audio = Condition::Loading;
let playing_audio_to_base =
self.communication_channel.playing_audio_to_base.clone();
let base_to_playing_audio =
self.communication_channel.base_to_playing_audio.subscribe();
///////TEST İÇİN YANLIŞ VERDİM UNUTMA
let audio_stream_sender = self.data_channel.microphone_stream_sender.clone();
let playing_to_base_sender =
self.communication_channel.playing_to_base_sender.clone();
let base_to_playing_receiver = self
.communication_channel
.base_to_playing_sender
.subscribe();
Command::perform(
async move {
gui_utils::start_playing_audio(
playing_audio_to_base,
base_to_playing_audio,
gui_utils::start_playing(
audio_stream_sender,
playing_to_base_sender,
base_to_playing_receiver,
)
.await
},
@ -205,18 +237,19 @@ impl Streamer {
Event::StopAudio => {
println!("Stop Audio");
self.gui_status.are_we_play_audio = Condition::Loading;
let mut playing_to_base_receiver =
self.communication_channel.playing_audio_to_base.subscribe();
let _ = self.communication_channel.base_to_playing_audio.send(false);
let playing_to_base_receiver = self
.communication_channel
.playing_to_base_sender
.subscribe();
let base_to_playing_sender =
self.communication_channel.base_to_playing_sender.clone();
Command::perform(
async move {
match playing_to_base_receiver.recv().await {
Ok(_) => State::StopAudio,
Err(err_val) => {
eprint!("Error: Communication | Playing | {}", err_val);
State::PlayingAudio
}
}
gui_utils::stop_playing(
playing_to_base_receiver,
base_to_playing_sender,
)
.await
},
Message::State,
)
@ -279,7 +312,9 @@ impl Streamer {
//let color_black = Color::from_rgb8(0, 0, 0);
let color_pink = Color::from_rgb8(255, 150, 150);
let header = text_centered("Radioxide").size(35).line_height(LineHeight::Relative(1.0));
let header = text_centered("Radioxide")
.size(35)
.line_height(LineHeight::Relative(1.0));
let connection_text = text_centered("Connection");
let recording_text = text_centered("Microphone");
@ -307,7 +342,7 @@ impl Streamer {
let record_button = match self.gui_status.are_we_record {
Condition::Active => {
recording_status_text = text_centered("Active").color(color_green);
button_with_centered_text("Start Mic").on_press(Message::Event(Event::StopRecord))
button_with_centered_text("Stop Mic").on_press(Message::Event(Event::StopRecord))
}
Condition::Loading => {
recording_status_text = text_centered("Loading").color(color_yellow);
@ -315,7 +350,7 @@ impl Streamer {
}
Condition::Passive => {
recording_status_text = text_centered("Passive").color(color_pink);
button_with_centered_text("Stop Mic").on_press(Message::Event(Event::Record))
button_with_centered_text("Start Mic").on_press(Message::Event(Event::Record))
}
};
@ -334,9 +369,7 @@ impl Streamer {
}
};
let header_content = row![header]
.width(350)
.height(50);
let header_content = row![header].width(350).height(50);
let text_content = row![
connection_text,
Rule::vertical(1),
@ -367,16 +400,17 @@ impl Streamer {
header_content,
Rule::horizontal(1),
text_content,
button_content,
status_content,
Rule::horizontal(1),
]
.spacing(20)
.width(350)
.height(300);
container(content).height(300).center_x().align_y(alignment::Vertical::Top)
container(content)
.height(300)
.center_x()
.align_y(alignment::Vertical::Top)
}
pub fn subscription(&self) -> Subscription<Message> {
iced::event::listen()
@ -393,26 +427,27 @@ impl Streamer {
)
}
fn call_closer(
streaming_to_base: Receiver<bool>,
base_to_streaming: Sender<bool>,
recording_to_base: Receiver<bool>,
base_to_recording: Sender<bool>,
playing_audio_to_base: Receiver<bool>,
base_to_playing_audio: Sender<bool>,
streaming_to_base_receiver: Receiver<bool>,
base_to_streaming_sender: Sender<bool>,
recording_to_base_receiver: Receiver<bool>,
base_to_recording_sender: Sender<bool>,
playing_to_base_receiver: Receiver<bool>,
base_to_playing_sender: Sender<bool>,
features_in_need: Features,
window_id: window::Id,
) -> Command<Message> {
Command::perform(
async move {
if features_in_need.stream {
gui_utils::disconnect(streaming_to_base, base_to_streaming).await;
gui_utils::disconnect(streaming_to_base_receiver, base_to_streaming_sender)
.await;
}
if features_in_need.record {
gui_utils::stop_recording(recording_to_base, base_to_recording).await;
gui_utils::stop_recording(recording_to_base_receiver, base_to_recording_sender)
.await;
}
if features_in_need.play_audio {
gui_utils::stop_playing_audio(playing_audio_to_base, base_to_playing_audio)
.await;
gui_utils::stop_playing(playing_to_base_receiver, base_to_playing_sender).await;
}
Event::CloseWindow(window_id)
},
@ -435,22 +470,31 @@ impl Streamer {
if self.gui_status.are_we_play_audio == Condition::Active {
features_in_need.play_audio = true;
}
let streaming_to_base = self.communication_channel.streaming_to_base.subscribe();
let base_to_streaming = self.communication_channel.base_to_streaming.clone();
let streaming_to_base_receiver = self
.communication_channel
.streaming_to_base_sender
.subscribe();
let base_to_streaming_sender = self.communication_channel.base_to_streaming_sender.clone();
let recording_to_base = self.communication_channel.recording_to_base.subscribe();
let base_to_recording = self.communication_channel.base_to_recording.clone();
let recording_to_base_receiver = self
.communication_channel
.recording_to_base_sender
.subscribe();
let base_to_recording_sender = self.communication_channel.base_to_recording_sender.clone();
let playing_audio_to_base = self.communication_channel.playing_audio_to_base.subscribe();
let base_to_playing_audio = self.communication_channel.base_to_playing_audio.clone();
let playing_to_base_receiver = self
.communication_channel
.playing_to_base_sender
.subscribe();
let base_to_playing_sender = self.communication_channel.base_to_playing_sender.clone();
Self::call_closer(
streaming_to_base,
base_to_streaming,
recording_to_base,
base_to_recording,
playing_audio_to_base,
base_to_playing_audio,
streaming_to_base_receiver,
base_to_streaming_sender,
recording_to_base_receiver,
base_to_recording_sender,
playing_to_base_receiver,
base_to_playing_sender,
features_in_need,
window_id,
)

View file

@ -1,19 +1,19 @@
use tokio::sync::broadcast::{Receiver, Sender};
use crate::{gui::State, recording, streaming, Config};
use crate::{gui::State, playing, recording, streaming, Config};
pub async fn connect(
sound_stream_receiver: Receiver<f32>,
streamer_config: Config,
streaming_to_base: Sender<bool>,
base_to_streaming: Receiver<bool>,
streaming_to_base_sender: Sender<bool>,
base_to_streaming_receiver: Receiver<bool>,
) -> State {
let mut streaming_to_base_receiver = streaming_to_base.subscribe();
let mut streaming_to_base_receiver = streaming_to_base_sender.subscribe();
tokio::spawn(streaming::connect(
sound_stream_receiver,
streamer_config,
base_to_streaming,
streaming_to_base.clone(),
base_to_streaming_receiver,
streaming_to_base_sender.clone(),
));
match streaming_to_base_receiver.recv().await {
Ok(_) => State::Connected,
@ -25,11 +25,11 @@ pub async fn connect(
}
pub async fn disconnect(
mut streaming_to_base: Receiver<bool>,
base_to_streaming: Sender<bool>,
mut streaming_to_base_receiver: Receiver<bool>,
base_to_streaming_sender: Sender<bool>,
) -> State {
let _ = base_to_streaming.send(false);
match streaming_to_base.recv().await {
let _ = base_to_streaming_sender.send(false);
match streaming_to_base_receiver.recv().await {
Ok(_) => State::Disconnected,
Err(err_val) => {
eprintln!("Error: Communication | {}", err_val);
@ -39,15 +39,15 @@ pub async fn disconnect(
}
pub async fn start_recording(
sound_stream_sender: Sender<f32>,
recording_to_base: Sender<bool>,
base_to_recording: Receiver<bool>,
microphone_stream_sender: Sender<f32>,
recording_to_base_sender: Sender<bool>,
base_to_recording_receiver: Receiver<bool>,
) -> State {
let mut recording_to_base_receiver = recording_to_base.subscribe();
let mut recording_to_base_receiver = recording_to_base_sender.subscribe();
tokio::spawn(recording::record(
sound_stream_sender.clone(),
recording_to_base.clone(),
base_to_recording,
microphone_stream_sender.clone(),
recording_to_base_sender.clone(),
base_to_recording_receiver,
));
match recording_to_base_receiver.recv().await {
@ -60,11 +60,11 @@ pub async fn start_recording(
}
pub async fn stop_recording(
mut recording_to_base: Receiver<bool>,
base_to_recording: Sender<bool>,
mut recording_to_base_receiver: Receiver<bool>,
base_to_recording_sender: Sender<bool>,
) -> State {
let _ = base_to_recording.send(false);
match recording_to_base.recv().await {
let _ = base_to_recording_sender.send(false);
match recording_to_base_receiver.recv().await {
Ok(_) => State::StopRecording,
Err(err_val) => {
eprintln!("Error: Communication | {}", err_val);
@ -73,13 +73,18 @@ pub async fn stop_recording(
}
}
pub async fn start_playing_audio(
playing_audio_to_base: Sender<bool>,
base_to_playing_audio: Receiver<bool>,
pub async fn start_playing(
audio_stream_sender: Sender<f32>,
playing_to_base_sender: Sender<bool>,
base_to_playing_receiver: Receiver<bool>,
) -> State {
//tokio::spawn(future);
let mut playing_audio_to_base_receiver = playing_audio_to_base.subscribe();
match playing_audio_to_base_receiver.recv().await {
let mut playing_to_base_receiver = playing_to_base_sender.subscribe();
tokio::spawn(playing::play(
audio_stream_sender,
playing_to_base_sender,
base_to_playing_receiver,
));
match playing_to_base_receiver.recv().await {
Ok(_) => State::PlayingAudio,
Err(err_val) => {
eprint!("Error: Communication | Playing | {}", err_val);
@ -88,16 +93,16 @@ pub async fn start_playing_audio(
}
}
pub async fn stop_playing_audio(
mut audio_to_base: Receiver<bool>,
base_to_audio: Sender<bool>,
pub async fn stop_playing(
mut playing_to_base_receiver: Receiver<bool>,
base_to_playing_sender: Sender<bool>,
) -> State {
let _ = base_to_audio.send(false);
match audio_to_base.recv().await {
Ok(_) => State::StopRecording,
let _ = base_to_playing_sender.send(false);
match playing_to_base_receiver.recv().await {
Ok(_) => State::StopAudio,
Err(err_val) => {
eprintln!("Error: Communication | {}", err_val);
State::Recording
State::PlayingAudio
}
}
}

View file

@ -1,6 +1,7 @@
pub mod gui;
pub mod gui_components;
pub mod gui_utils;
pub mod playing;
pub mod recording;
pub mod streaming;
pub mod utils;

194
streamer/src/playing.rs Normal file
View file

@ -0,0 +1,194 @@
use std::fs::File;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use ringbuf::HeapRb;
use rubato::{
Resampler, SincFixedIn, SincInterpolationParameters, SincInterpolationType, WindowFunction,
};
use symphonia::core::{
audio::{AudioBufferRef, Signal},
codecs::{DecoderOptions, CODEC_TYPE_NULL},
formats::FormatOptions,
io::MediaSourceStream,
meta::MetadataOptions,
probe::Hint,
};
use tokio::{
sync::broadcast::{Receiver, Sender},
task,
};
pub async fn play(
audio_stream_sender: Sender<f32>,
playing_to_base_sender: Sender<bool>,
mut base_to_playing_receiver: Receiver<bool>,
) {
let host = cpal::default_host();
let output_device = host.default_output_device().unwrap();
let output_device_config: cpal::StreamConfig =
output_device.default_output_config().unwrap().into();
let output_device_sample_rate = output_device_config.sample_rate.0;
let (mut audio_resampled_left, mut audio_resampled_right) =
decode_audio(output_device_sample_rate);
let total_ring_len = audio_resampled_left.len() + audio_resampled_right.len();
let (mut producer, mut receiver) = HeapRb::<f32>::new(total_ring_len).split();
for _ in 0..audio_resampled_left.clone().len() {
producer
.push(audio_resampled_left.pop().unwrap() as f32)
.unwrap();
producer
.push(audio_resampled_right.pop().unwrap() as f32)
.unwrap();
}
let output_data_fn = move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
for sample in data {
let single = match receiver.pop() {
Some(single) => {
//println!("{}", single);
single
}
None => 0.0,
};
if audio_stream_sender.receiver_count() > 0 {
let _ = audio_stream_sender.send(single);
}
*sample = single;
}
};
let output_stream = output_device
.build_output_stream(&output_device_config, output_data_fn, err_fn, None)
.unwrap();
output_stream.play().unwrap();
tokio::spawn(let_the_base_know(playing_to_base_sender.clone()));
task::block_in_place(|| {
let _ = base_to_playing_receiver.blocking_recv();
});
output_stream.pause().unwrap();
drop(output_stream);
tokio::spawn(let_the_base_know(playing_to_base_sender));
}
fn err_fn(err: cpal::StreamError) {
eprintln!("Something Happened: {}", err);
}
async fn let_the_base_know(playing_to_base_sender: Sender<bool>) {
let _ = playing_to_base_sender.send(true);
}
fn decode_audio(output_device_sample_rate: u32) -> (Vec<f64>, Vec<f64>) {
let file = File::open("music.mp3").unwrap();
let mut audio_decoded_left = vec![];
let mut audio_decoded_right = vec![];
let media_source_stream = MediaSourceStream::new(Box::new(file), Default::default());
let hint = Hint::new();
let metadata_options = MetadataOptions::default();
let format_options = FormatOptions::default();
let probed = symphonia::default::get_probe()
.format(
&hint,
media_source_stream,
&format_options,
&metadata_options,
)
.unwrap();
let mut format = probed.format;
let track = format
.tracks()
.iter()
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
.unwrap();
let audio_sample_rate = track.codec_params.sample_rate.unwrap();
DecoderOptions::default();
let decoder_options = DecoderOptions::default();
let mut decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &decoder_options)
.unwrap();
let track_id = track.id;
loop {
let packet = match format.next_packet() {
Ok(packet) => packet,
Err(_) => {
break;
}
};
while !format.metadata().is_latest() {
format.metadata().pop();
}
if packet.track_id() != track_id {
continue;
}
match decoder.decode(&packet) {
Ok(decoded) => match decoded {
AudioBufferRef::F32(buf) => {
for (left, right) in buf.chan(0).iter().zip(buf.chan(1).iter()) {
audio_decoded_left.push(*left as f64);
audio_decoded_right.push(*right as f64);
}
}
_ => {}
},
Err(_) => {
//eprintln!("Error: Sample Decode | {}", err_val);
println!("End ?");
}
}
}
let params = SincInterpolationParameters {
sinc_len: 256,
f_cutoff: 0.95,
interpolation: SincInterpolationType::Linear,
oversampling_factor: 256,
window: WindowFunction::BlackmanHarris2,
};
let mut resampler = SincFixedIn::<f64>::new(
output_device_sample_rate as f64 / audio_sample_rate as f64,
2.0,
params,
audio_decoded_left.len(),
2,
)
.unwrap();
let audio_decoded_channes_combined =
vec![audio_decoded_left.clone(), audio_decoded_right.clone()];
let audio_resampled = resampler
.process(&audio_decoded_channes_combined, None)
.unwrap();
let mut audio_resampled_left = vec![];
let mut audio_resampled_right = vec![];
for sample in &audio_resampled[0] {
audio_resampled_left.push(*sample);
}
for sample in &audio_resampled[1] {
audio_resampled_right.push(*sample);
}
audio_resampled_left.reverse();
audio_resampled_right.reverse();
(audio_resampled_left, audio_resampled_right)
}