feat: add lock-free SPSC ring buffer for audio streaming
AtomicU32-based storage avoids unsafe while maintaining SPSC guarantees. Capacity: 4096 samples (~85ms at 48kHz). Exported from crate root. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -28,7 +28,7 @@ pub use runtime::{
|
||||
AudioMixer, AudioOutput, ClientRuntime, EmulationState, FRAME_HEIGHT, FRAME_RGBA_BYTES,
|
||||
FRAME_WIDTH, FrameClock, FramePacer, HostConfig, InputProvider, JOYPAD_BUTTON_ORDER,
|
||||
JOYPAD_BUTTONS_COUNT, JoypadButton, JoypadButtons, NesRuntime, NoopClock, NullAudio, NullInput,
|
||||
NullVideo, PacingClock, RuntimeError, RuntimeHostLoop, SAVE_STATE_VERSION, VideoMode,
|
||||
NullVideo, PacingClock, RingBuffer, RuntimeError, RuntimeHostLoop, SAVE_STATE_VERSION, VideoMode,
|
||||
VideoOutput, button_pressed, set_button_pressed,
|
||||
};
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ mod constants;
|
||||
mod core;
|
||||
mod error;
|
||||
mod host;
|
||||
pub mod ring_buffer;
|
||||
mod state;
|
||||
mod timing;
|
||||
mod types;
|
||||
@@ -12,6 +13,7 @@ mod types;
|
||||
#[cfg(feature = "adapter-api")]
|
||||
pub use adapters::{AudioAdapter, ClockAdapter, InputAdapter, VideoAdapter};
|
||||
pub use audio::AudioMixer;
|
||||
pub use ring_buffer::RingBuffer;
|
||||
pub use constants::{FRAME_HEIGHT, FRAME_RGBA_BYTES, FRAME_WIDTH, SAVE_STATE_VERSION};
|
||||
pub use core::NesRuntime;
|
||||
pub use error::RuntimeError;
|
||||
|
||||
124
src/runtime/ring_buffer.rs
Normal file
124
src/runtime/ring_buffer.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
|
||||
|
||||
pub struct RingBuffer {
|
||||
buffer: Box<[AtomicU32]>,
|
||||
capacity: usize,
|
||||
head: AtomicUsize,
|
||||
tail: AtomicUsize,
|
||||
}
|
||||
|
||||
impl RingBuffer {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
assert!(capacity > 0);
|
||||
let buffer: Vec<AtomicU32> = (0..capacity).map(|_| AtomicU32::new(0)).collect();
|
||||
Self {
|
||||
buffer: buffer.into_boxed_slice(),
|
||||
capacity,
|
||||
head: AtomicUsize::new(0),
|
||||
tail: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&self, samples: &[f32]) -> usize {
|
||||
let head = self.head.load(Ordering::Relaxed);
|
||||
let tail = self.tail.load(Ordering::Acquire);
|
||||
let available = self.capacity - self.len_internal(head, tail) - 1;
|
||||
let to_write = samples.len().min(available);
|
||||
|
||||
for (i, sample) in samples.iter().enumerate().take(to_write) {
|
||||
let idx = (head + i) % self.capacity;
|
||||
self.buffer[idx].store(sample.to_bits(), Ordering::Relaxed);
|
||||
}
|
||||
|
||||
self.head
|
||||
.store((head + to_write) % self.capacity, Ordering::Release);
|
||||
to_write
|
||||
}
|
||||
|
||||
pub fn pop(&self, out: &mut [f32]) -> usize {
|
||||
let tail = self.tail.load(Ordering::Relaxed);
|
||||
let head = self.head.load(Ordering::Acquire);
|
||||
let available = self.len_internal(head, tail);
|
||||
let to_read = out.len().min(available);
|
||||
|
||||
for (i, slot) in out.iter_mut().enumerate().take(to_read) {
|
||||
let idx = (tail + i) % self.capacity;
|
||||
*slot = f32::from_bits(self.buffer[idx].load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
self.tail
|
||||
.store((tail + to_read) % self.capacity, Ordering::Release);
|
||||
to_read
|
||||
}
|
||||
|
||||
/// Clear the buffer. Must only be called when no concurrent push/pop
|
||||
/// is in progress (e.g., when the audio stream is paused or dropped).
|
||||
pub fn clear(&self) {
|
||||
self.tail.store(0, Ordering::SeqCst);
|
||||
self.head.store(0, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
fn len_internal(&self, head: usize, tail: usize) -> usize {
|
||||
if head >= tail {
|
||||
head - tail
|
||||
} else {
|
||||
self.capacity - tail + head
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn push_pop_basic() {
|
||||
let rb = RingBuffer::new(8);
|
||||
let input = [1.0, 2.0, 3.0];
|
||||
assert_eq!(rb.push(&input), 3);
|
||||
let mut out = [0.0; 3];
|
||||
assert_eq!(rb.pop(&mut out), 3);
|
||||
assert_eq!(out, [1.0, 2.0, 3.0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn underrun_returns_zero_count() {
|
||||
let rb = RingBuffer::new(8);
|
||||
let mut out = [0.0; 4];
|
||||
assert_eq!(rb.pop(&mut out), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn overrun_drops_new_samples() {
|
||||
let rb = RingBuffer::new(4); // usable capacity = 3
|
||||
let input = [1.0, 2.0, 3.0, 4.0, 5.0];
|
||||
let written = rb.push(&input);
|
||||
assert_eq!(written, 3);
|
||||
let mut out = [0.0; 3];
|
||||
rb.pop(&mut out);
|
||||
assert_eq!(out, [1.0, 2.0, 3.0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_resets() {
|
||||
let rb = RingBuffer::new(8);
|
||||
rb.push(&[1.0, 2.0]);
|
||||
rb.clear();
|
||||
let mut out = [0.0; 2];
|
||||
assert_eq!(rb.pop(&mut out), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wraparound() {
|
||||
let rb = RingBuffer::new(4); // usable = 3
|
||||
rb.push(&[1.0, 2.0, 3.0]);
|
||||
let mut out = [0.0; 2];
|
||||
rb.pop(&mut out);
|
||||
assert_eq!(out, [1.0, 2.0]);
|
||||
rb.push(&[4.0, 5.0]);
|
||||
let mut out2 = [0.0; 3];
|
||||
let read = rb.pop(&mut out2);
|
||||
assert_eq!(read, 3);
|
||||
assert_eq!(out2, [3.0, 4.0, 5.0]);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user