Skip to content

Python Streaming API

ZMQ-backed streaming — PUB/SUB, PUSH/PULL, and REQ/REP socket pairs, all backed by the native C dp_* streaming functions. All socket types support the context manager protocol.

Source: python/dsp/doppler/stream/_stream.pyi


stream

doppler.stream — ZMQ-based signal streaming.

Wraps the dp_stream C extension. All patterns use a two-frame ZMQ multipart message: a header frame (SIGS magic + dp_header_t) and a data frame. C transmitters and Python subscribers are fully interoperable.

Patterns

PUB / SUB One-to-many broadcast. :class:Publisher → :class:Subscriber. PUSH / PULL Pipeline (load-balanced). :class:Push → :class:Pull. REQ / REP Request–reply. :class:Requester ↔ :class:Replier.

Sample types

:data:CI32, :data:CF64, :data:CF128

Examples:

>>> from doppler.stream import Publisher, Subscriber, CF64
>>> import numpy as np
>>>
>>> samples = np.array([1+2j, 3+4j], dtype=np.complex128)
>>> with Publisher("tcp://*:5556", CF64) as pub:
...     pub.send(samples, sample_rate=int(1e6), center_freq=int(2.4e9))

Publisher

ZMQ PUB socket — broadcasts signal frames to all subscribers.

Parameters:

Name Type Description Default
address str

ZMQ endpoint, e.g. "tcp://*:5555".

required
sample_type int

One of :data:CI32, :data:CF64, :data:CF128.

required

send

send(samples: NDArray[Any], *, sample_rate: float = 0.0, center_freq: float = 0.0) -> None

Broadcast a signal frame.

Parameters:

Name Type Description Default
samples NDArray[Any]

1-D array of CI32 (int32×2), CF64 (complex128), or CF128 samples matching the socket's sample_type.

required
sample_rate float

Samples per second (stored in header; 0 = unspecified).

0.0
center_freq float

Centre frequency in Hz (stored in header; 0 = unspecified).

0.0

Subscriber

ZMQ SUB socket — receives signal frames from a publisher.

Parameters:

Name Type Description Default
address str

ZMQ endpoint, e.g. "tcp://localhost:5555".

required

recv

recv(*, timeout_ms: int = -1) -> tuple[NDArray[Any], dict[str, Any]]

Receive one signal frame (zero-copy).

Parameters:

Name Type Description Default
timeout_ms int

Milliseconds to wait; -1 (default) blocks indefinitely.

-1

Returns:

Name Type Description
samples NDArray[Any]

1-D NumPy array backed by the ZMQ buffer. dtype matches the sender's sample_type (complex128 for CF64, etc.).

header dict[str, Any]

Dict with keys sample_type, sample_rate, center_freq, timestamp_ns, num_samples.

Raises:

Type Description
TimeoutError

If timeout_ms expires before a frame arrives.

Push

ZMQ PUSH socket — sends frames to a PULL socket (pipeline).

Pull

ZMQ PULL socket — receives frames from a PUSH socket (pipeline).

Requester

ZMQ REQ socket — sends a request and waits for one reply.

Replier

ZMQ REP socket — receives a request and sends one reply.

get_timestamp_ns

get_timestamp_ns() -> int

Return the current time as nanoseconds since the Unix epoch.