feat: add lock-free SPSC ring buffer for audio streaming

AtomicU32-based storage avoids unsafe while maintaining SPSC guarantees.
Capacity: 4096 samples (~85ms at 48kHz). Exported from crate root.
This commit is contained in:
2026-03-13 16:21:19 +03:00
parent 5895344f6f
commit bddc144c27
3 changed files with 127 additions and 1 deletions

View File

@@ -28,7 +28,7 @@ pub use runtime::{
AudioMixer, AudioOutput, ClientRuntime, EmulationState, FRAME_HEIGHT, FRAME_RGBA_BYTES, AudioMixer, AudioOutput, ClientRuntime, EmulationState, FRAME_HEIGHT, FRAME_RGBA_BYTES,
FRAME_WIDTH, FrameClock, FramePacer, HostConfig, InputProvider, JOYPAD_BUTTON_ORDER, FRAME_WIDTH, FrameClock, FramePacer, HostConfig, InputProvider, JOYPAD_BUTTON_ORDER,
JOYPAD_BUTTONS_COUNT, JoypadButton, JoypadButtons, NesRuntime, NoopClock, NullAudio, NullInput, JOYPAD_BUTTONS_COUNT, JoypadButton, JoypadButtons, NesRuntime, NoopClock, NullAudio, NullInput,
NullVideo, PacingClock, RuntimeError, RuntimeHostLoop, SAVE_STATE_VERSION, VideoMode, NullVideo, PacingClock, RingBuffer, RuntimeError, RuntimeHostLoop, SAVE_STATE_VERSION, VideoMode,
VideoOutput, button_pressed, set_button_pressed, VideoOutput, button_pressed, set_button_pressed,
}; };

View File

@@ -5,6 +5,7 @@ mod constants;
mod core; mod core;
mod error; mod error;
mod host; mod host;
pub mod ring_buffer;
mod state; mod state;
mod timing; mod timing;
mod types; mod types;
@@ -12,6 +13,7 @@ mod types;
#[cfg(feature = "adapter-api")] #[cfg(feature = "adapter-api")]
pub use adapters::{AudioAdapter, ClockAdapter, InputAdapter, VideoAdapter}; pub use adapters::{AudioAdapter, ClockAdapter, InputAdapter, VideoAdapter};
pub use audio::AudioMixer; pub use audio::AudioMixer;
pub use ring_buffer::RingBuffer;
pub use constants::{FRAME_HEIGHT, FRAME_RGBA_BYTES, FRAME_WIDTH, SAVE_STATE_VERSION}; pub use constants::{FRAME_HEIGHT, FRAME_RGBA_BYTES, FRAME_WIDTH, SAVE_STATE_VERSION};
pub use core::NesRuntime; pub use core::NesRuntime;
pub use error::RuntimeError; pub use error::RuntimeError;

124
src/runtime/ring_buffer.rs Normal file
View File

@@ -0,0 +1,124 @@
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
pub struct RingBuffer {
buffer: Box<[AtomicU32]>,
capacity: usize,
head: AtomicUsize,
tail: AtomicUsize,
}
impl RingBuffer {
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0);
let buffer: Vec<AtomicU32> = (0..capacity).map(|_| AtomicU32::new(0)).collect();
Self {
buffer: buffer.into_boxed_slice(),
capacity,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
}
}
pub fn push(&self, samples: &[f32]) -> usize {
let head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Acquire);
let available = self.capacity - self.len_internal(head, tail) - 1;
let to_write = samples.len().min(available);
for (i, sample) in samples.iter().enumerate().take(to_write) {
let idx = (head + i) % self.capacity;
self.buffer[idx].store(sample.to_bits(), Ordering::Relaxed);
}
self.head
.store((head + to_write) % self.capacity, Ordering::Release);
to_write
}
pub fn pop(&self, out: &mut [f32]) -> usize {
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Acquire);
let available = self.len_internal(head, tail);
let to_read = out.len().min(available);
for (i, slot) in out.iter_mut().enumerate().take(to_read) {
let idx = (tail + i) % self.capacity;
*slot = f32::from_bits(self.buffer[idx].load(Ordering::Relaxed));
}
self.tail
.store((tail + to_read) % self.capacity, Ordering::Release);
to_read
}
/// Clear the buffer. Must only be called when no concurrent push/pop
/// is in progress (e.g., when the audio stream is paused or dropped).
pub fn clear(&self) {
self.tail.store(0, Ordering::SeqCst);
self.head.store(0, Ordering::SeqCst);
}
fn len_internal(&self, head: usize, tail: usize) -> usize {
if head >= tail {
head - tail
} else {
self.capacity - tail + head
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_pop_basic() {
let rb = RingBuffer::new(8);
let input = [1.0, 2.0, 3.0];
assert_eq!(rb.push(&input), 3);
let mut out = [0.0; 3];
assert_eq!(rb.pop(&mut out), 3);
assert_eq!(out, [1.0, 2.0, 3.0]);
}
#[test]
fn underrun_returns_zero_count() {
let rb = RingBuffer::new(8);
let mut out = [0.0; 4];
assert_eq!(rb.pop(&mut out), 0);
}
#[test]
fn overrun_drops_new_samples() {
let rb = RingBuffer::new(4); // usable capacity = 3
let input = [1.0, 2.0, 3.0, 4.0, 5.0];
let written = rb.push(&input);
assert_eq!(written, 3);
let mut out = [0.0; 3];
rb.pop(&mut out);
assert_eq!(out, [1.0, 2.0, 3.0]);
}
#[test]
fn clear_resets() {
let rb = RingBuffer::new(8);
rb.push(&[1.0, 2.0]);
rb.clear();
let mut out = [0.0; 2];
assert_eq!(rb.pop(&mut out), 0);
}
#[test]
fn wraparound() {
let rb = RingBuffer::new(4); // usable = 3
rb.push(&[1.0, 2.0, 3.0]);
let mut out = [0.0; 2];
rb.pop(&mut out);
assert_eq!(out, [1.0, 2.0]);
rb.push(&[4.0, 5.0]);
let mut out2 = [0.0; 3];
let read = rb.pop(&mut out2);
assert_eq!(read, 3);
assert_eq!(out2, [3.0, 4.0, 5.0]);
}
}