Implement room connection, handle local and remote audio streams, refactor client state management #40

Merged
puregarlic merged 18 commits from issue/3 into main 2026-02-25 10:21:08 -08:00
Owner

This is the big PR that makes the whole program work. First, it restructures the client application to use multiple managed values instead of a single AppState struct. It streamlines configuration management by delegating it to a ConfigManager, which enables other managers to react to value changes. This PR further allows the Tauri client to connect to a microclimate channel, backed by a LiveKit room. Finally, it enables the client to send microphone data to other participants in the room, and handles sending remote audio tracks to a local playback device.

There was quite a bit of LLM involved in this one, mostly due to complexity introduced by the desire to retain the dasp crate's Signal API, which does not implement Send, and so was problematic for asynchronous tasks.

Also of note is the specified target directory in .cargo/config.toml. After adding the livekit crate to the Tauri project, I encountered issues with paths that were longer than Windows' supported 255 character-length paths. I had to shorten the target directory path as much as possible, and it resolved my issue. Any other attempted solutions would be welcome, though, because I don't really like this one all that much.

I have tested and validated that audio publishing and subscription works on my local machine, with two separate instances of the client application. Let me know if you want a working .env with LiveKit credentials you can use to test.

Closes #3
Closes #4
Closes #5
Closes #37
Closes #41

This is the big PR that makes the whole program work. First, it restructures the client application to use multiple managed values instead of a single `AppState` struct. It streamlines configuration management by delegating it to a `ConfigManager`, which enables other managers to react to value changes. This PR further allows the Tauri client to connect to a microclimate channel, backed by a LiveKit room. Finally, it enables the client to send microphone data to other participants in the room, and handles sending remote audio tracks to a local playback device. There was quite a bit of LLM involved in this one, mostly due to complexity introduced by the desire to retain the `dasp` crate's `Signal` API, which does not implement `Send`, and so was problematic for asynchronous tasks. Also of note is the specified target directory in `.cargo/config.toml`. After adding the `livekit` crate to the Tauri project, I encountered issues with paths that were longer than Windows' supported 255 character-length paths. I had to shorten the target directory path as much as possible, and it resolved my issue. Any other attempted solutions would be welcome, though, because I don't really like this one all that much. I have tested and validated that audio publishing and subscription works on my local machine, with two separate instances of the client application. Let me know if you want a working `.env` with LiveKit credentials you can use to test. Closes #3 Closes #4 Closes #5 Closes #37 Closes #41
seb approved these changes 2026-02-24 18:05:09 -08:00
@ -32,0 +38,4 @@
/// `Sized`, so all dasp adapters (`.map()`, `.scale_amp()`, rate
/// converters, etc.) work on it directly.
#[instrument(skip(self))]
pub async fn get_signal(&self) -> Result<(BoxedSignal<i16>, u32), AudioDeviceError> {
Collaborator

just from personal experience, if there is any chance at all of a tuple type ever changing/getting longer in the future you should give it a type name

just from personal experience, if there is any chance at all of a tuple type ever changing/getting longer in the future you should give it a type name
puregarlic marked this conversation as resolved
@ -127,3 +131,4 @@
sample_rate,
})
}
Collaborator

I don't like getters in rust

I don't like getters in rust
puregarlic marked this conversation as resolved
@ -0,0 +31,4 @@
}
// SAFETY: The inner Box holds a `Send` trait object.
unsafe impl<F> Send for BoxedSignal<F> {}
Collaborator

compiles without the unsafe impl. I'm not sure this does anything. Signal implements send with the + Send invocation in the typedef, which means the Box and therefore the BoxedSignal should also implement Send.

compiles without the unsafe impl. I'm not sure this does anything. Signal implements send with the `+ Send` invocation in the typedef, which means the Box and therefore the BoxedSignal should also implement Send.
puregarlic marked this conversation as resolved
@ -0,0 +85,4 @@
}
// SAFETY: Receiver<Vec<f32>> is Send.
unsafe impl Send for ReceiverSignal {}
Collaborator

same as above

same as above
puregarlic marked this conversation as resolved
@ -0,0 +45,4 @@
impl ChannelsManager {
#[instrument(name = "channels_manager_new", skip(config))]
pub fn new(config: ConfigManager) -> Arc<Self> {
let manager = Arc::new(ChannelsManager {
Collaborator

no reason to make a variable here -- the whole function can just be

pub fn new(config: ConfigManager) -> Arc<Self> {
	        Arc::new(ChannelsManager {
	            config,
	            client: Arc::new(Mutex::new(None)),
	            room: Arc::new(Mutex::new(None)),
	            events_handle: Arc::new(Mutex::new(None)),
	            mic_task: Arc::new(Mutex::new(None)),
	        })
	    }
no reason to make a variable here -- the whole function can just be ``` pub fn new(config: ConfigManager) -> Arc<Self> { Arc::new(ChannelsManager { config, client: Arc::new(Mutex::new(None)), room: Arc::new(Mutex::new(None)), events_handle: Arc::new(Mutex::new(None)), mic_task: Arc::new(Mutex::new(None)), }) } ```
puregarlic marked this conversation as resolved
@ -0,0 +181,4 @@
let room_arc = self.room.clone();
let output_rate = output.lock().await.sample_rate();
let output_for_events = output.clone();
let events_handle = tauri::async_runtime::spawn(async move {
Collaborator

It's sort of annoying, but I'm a fan of doing functions for the bodies of async_runtime::spawn if possible, to avoid tabbing over too much. ymmv

It's sort of annoying, but I'm a fan of doing functions for the bodies of async_runtime::spawn if possible, to avoid tabbing over too much. ymmv
puregarlic marked this conversation as resolved
@ -0,0 +235,4 @@
// Spawn the mic capture pipeline:
// blocking thread → reads dasp signal, batches into 10 ms frames
// async task → receives frames, calls source.capture_frame()
let samples_per_channel = (sample_rate / 100) as usize; // 10 ms
Collaborator

avoid magic numbers imo

avoid magic numbers imo
puregarlic marked this conversation as resolved
@ -0,0 +1,19 @@
use tracing::instrument;
Collaborator

wrt my comment above, I think it'd make sense to merge this into the config manager or find some other non-mod.rs based pattern

wrt my comment above, I think it'd make sense to merge this into the config manager or find some other non-mod.rs based pattern
puregarlic marked this conversation as resolved
@ -0,0 +1,53 @@
use std::sync::Arc;
Collaborator

we should figure out how we feel about this early.

https://users.rust-lang.org/t/module-mod-rs-or-module-rs/122653

I prefer either not having mod.rs at all (calling it i.e. config_manager.rs and removing the config/ folder altogether) or only using mod.rs for reexports. there are other examples of projects that like having mod.rs contain logic, like axum. even then, they seem a little inconsistent about it.

another option is config/config_manager.rs

the main advantage of naming the file is that you don't have a bunch of files called mod.rs all over the project. I could see it either way though

we should figure out how we feel about this early. https://users.rust-lang.org/t/module-mod-rs-or-module-rs/122653 I prefer either not having mod.rs at all (calling it i.e. config_manager.rs and removing the config/ folder altogether) or only using mod.rs for reexports. there are other examples of projects that like having mod.rs contain logic, like [axum](https://github.com/tokio-rs/axum/blob/main/axum/src/handler/mod.rs). even then, they seem a little [inconsistent](https://github.com/tokio-rs/axum/blob/main/axum/src/middleware/mod.rs) about it. another option is config/config_manager.rs the main advantage of naming the file is that you don't have a bunch of files called mod.rs all over the project. I could see it either way though
puregarlic marked this conversation as resolved
@ -22,1 +28,4 @@
.setup(|app| {
setup_tracing();
let _guard = info_span!("setting up managers").entered();
Collaborator

you can just do let _ = ...

you can just do `let _ = ...`
puregarlic marked this conversation as resolved
@ -1,17 +1,15 @@
use std::sync::Arc;
Collaborator

this file and state.rs are no longer included in the project and can be deleted, assuming they're supposed to be subsumed by the new managers

this file and state.rs are no longer included in the project and can be deleted, assuming they're supposed to be subsumed by the new managers
puregarlic marked this conversation as resolved
- Extract logic from `mod.rs` files
- Extract closures to functions when spawning tasks
- Move magic numbers to constants
- Remove getter
- Clean up unincluded files
- Remove mysterious `unsafe` code
puregarlic requested review from seb 2026-02-25 09:27:17 -08:00
seb approved these changes 2026-02-25 10:11:09 -08:00
@ -79,0 +25,4 @@
}
// Start the input stream and obtain a live i16 signal.
let crate::audio::input::CapturedAudio {
Collaborator

go fish

go fish
@ -0,0 +38,4 @@
/// construction — `dasp::signal::from_iter` calls `iter.next()` immediately on
/// construction, which would block for a blocking iterator.
pub struct ReceiverSignal {
rx: mpsc::Receiver<Vec<f32>>,
Collaborator

go fish

go fish
@ -0,0 +44,4 @@
}
impl ReceiverSignal {
pub fn new(rx: mpsc::Receiver<Vec<f32>>) -> Self {
Collaborator

go fish

go fish
@ -0,0 +68,4 @@
self.current = batch.into_iter();
self.current.next().unwrap_or(0.0)
}
Err(mpsc::TryRecvError::Empty) => 0.0,
Collaborator

go fish

go fish
@ -0,0 +36,4 @@
audio: State<'_, AudioManager>,
channel_id: String,
) -> Result<(), String> {
let crate::audio::input::CapturedAudio {
Collaborator

go fish

go fish
@ -0,0 +75,4 @@
let manager = Arc::clone(&self);
let config = self.config.clone();
tauri::async_runtime::spawn(handle_config_changes(config, manager));
Collaborator

go fish

go fish
@ -0,0 +192,4 @@
}
// Spawn the room-events task with track subscription handling.
let events_handle = tauri::async_runtime::spawn(handle_room_events(
Collaborator

go fish

go fish
@ -0,0 +211,4 @@
// async task → receives frames, calls source.capture_frame()
let samples_per_channel = (sample_rate / (1000 / FRAME_DURATION_MS)) as usize;
let (frame_tx, mut frame_rx) =
tokio::sync::mpsc::channel::<Vec<i16>>(MIC_FRAME_CHANNEL_CAPACITY);
Collaborator

go fish

go fish
@ -0,0 +220,4 @@
buf.push(signal.next());
if buf.len() >= samples_per_channel {
let batch =
std::mem::replace(&mut buf, Vec::with_capacity(samples_per_channel));
Collaborator

go fish

go fish
@ -0,0 +340,4 @@
output: &Arc<Mutex<OutputDeviceManager>>,
) -> Option<(JoinHandle<()>, StreamHandle)> {
let mut stream = NativeAudioStream::new(track.rtc_track(), sample_rate as i32, MONO_CHANNEL_COUNT as i32);
let (tx, rx) = std::sync::mpsc::channel::<Vec<f32>>();
Collaborator

go fish

go fish
Sign in to join this conversation.
No description provided.