Skip to content

Python Ring Buffer API

Lock-free SPSC ring buffers backed by dp_buffer_*. Uses virtual-memory double-mapping so the consumer always sees a contiguous window across the wrap boundary — zero-copy, branch-free.

Source: src/doppler/buffer/__init__.py


Buffer types

Class NumPy dtype Bytes/sample Min capacity
F32Buffer complex64 8 512 samples
F64Buffer complex128 16 256 samples
I16Buffer int16 (shape (n, 2)) 4 1024 samples

Minimum capacity is a page-alignment constraint from the double-mapping trick. Requested capacities are rounded up to the next power of two.


Threading model

One producer thread calls write; one consumer thread calls wait / consume. write is non-blocking and drops samples if the buffer is full. wait blocks the consumer and releases the GIL so the producer can run concurrently.


Examples

Producer / consumer (threaded)

from doppler.buffer import F32Buffer
import numpy as np
import threading

buf = F32Buffer(4096)

def producer():
    for block in iq_source:                         # complex64 arrays
        buf.write(block)                            # non-blocking

def consumer():
    while True:
        view = buf.wait(1024)                       # blocks; zero-copy
        process(view)
        buf.consume(1024)

t_prod = threading.Thread(target=producer, daemon=True)
t_cons = threading.Thread(target=consumer, daemon=True)
t_prod.start()
t_cons.start()

Check available samples without blocking

if buf.available >= 1024:
    view = buf.wait(1024)
    process(view)
    buf.consume(1024)

I16Buffer — raw ADC samples

I16Buffer stores interleaved int16 IQ pairs. The returned array from wait has shape (n, 2): column 0 is I, column 1 is Q.

from doppler.buffer import I16Buffer
import numpy as np

buf = I16Buffer(4096)
raw = np.frombuffer(adc_bytes, dtype=np.int16).reshape(-1, 2)
buf.write(raw)

view = buf.wait(1024)       # shape (1024, 2), dtype int16
I = view[:, 0]
Q = view[:, 1]
buf.consume(1024)

Capacity and overflow

buf = F32Buffer(1024)
print(buf.capacity)         # 1024 (or next power of two)

ok = buf.write(np.ones(1024, dtype=np.complex64))
print(ok)                   # True if written, False if dropped

F32Buffer

Lock-free SPSC ring buffer for complex64 (CF32) samples.

Wraps dp_buffer_cf32_t. Uses virtual-memory double-mapping so the consumer always sees a contiguous window across the wrap boundary. One thread writes; one thread reads.

Parameters:

Name Type Description Default
capacity int

Buffer size in complex samples. Rounded up to the next power-of-two; minimum 512 samples (page-alignment constraint).

required

Examples:

>>> from doppler.buffer import F32Buffer
>>> import numpy as np
>>> buf = F32Buffer(1024)
>>> buf.capacity
1024
>>> buf.write(np.ones(512, dtype=np.complex64))
True

capacity property

capacity: int

Buffer capacity in complex samples.

dropped property

dropped: int

Samples dropped due to buffer overrun.

write

write(arr: NDArray[complex64]) -> bool

Non-blocking write. Returns True on success, False if full.

wait

wait(n: int) -> NDArray[np.complex64]

Block until n samples are available; return zero-copy view.

Must call :meth:consume when done with the view.

consume

consume(n: int = ...) -> None

Release n samples (defaults to the count from the last wait).

destroy

destroy() -> None

Unmap and release the buffer.


F64Buffer

Lock-free SPSC ring buffer for complex128 (CF64) samples.

Wraps dp_buffer_cf64_t. Same double-mapping design as :class:F32Buffer; minimum 256 samples.

Parameters:

Name Type Description Default
capacity int

Buffer size in complex samples.

required

Examples:

>>> from doppler.buffer import F64Buffer
>>> import numpy as np
>>> buf = F64Buffer(512)
>>> buf.capacity
512

capacity property

capacity: int

Buffer capacity in complex samples.

dropped property

dropped: int

Samples dropped due to buffer overrun.

write

write(arr: NDArray[complex128]) -> bool

Non-blocking write. Returns True on success, False if full.

wait

wait(n: int) -> NDArray[np.complex128]

Block until n samples are available; return zero-copy view.

consume

consume(n: int = ...) -> None

Release n samples.

destroy

destroy() -> None

Unmap and release the buffer.


I16Buffer

Lock-free SPSC ring buffer for interleaved int16 IQ pairs.

Wraps dp_buffer_i16_t. The wait view has shape (n, 2): column 0 is I, column 1 is Q. Minimum 1024 samples.

Parameters:

Name Type Description Default
capacity int

Buffer size in IQ sample pairs.

required

Examples:

>>> from doppler.buffer import I16Buffer
>>> import numpy as np
>>> buf = I16Buffer(2048)
>>> buf.capacity
2048

capacity property

capacity: int

Buffer capacity in IQ sample pairs.

dropped property

dropped: int

Samples dropped due to buffer overrun.

write

write(arr: NDArray[int16]) -> bool

Non-blocking write of shape (n, 2) int16 array.

wait

wait(n: int) -> NDArray[np.int16]

Block until n IQ pairs are available; return shape (n, 2) view.

consume

consume(n: int = ...) -> None

Release n IQ pairs.

destroy

destroy() -> None

Unmap and release the buffer.