Python Streaming API
ZMQ-backed streaming — PUB/SUB, PUSH/PULL, and REQ/REP socket pairs,
all backed by the native C dp_* streaming functions. All socket
types support the context manager protocol.
Source:
python/dsp/doppler/stream/_stream.pyi
stream
doppler.stream — ZMQ-based signal streaming.
Wraps the dp_stream C extension. All patterns use a two-frame
ZMQ multipart message: a header frame (SIGS magic + dp_header_t)
and a data frame. C transmitters and Python subscribers are fully
interoperable.
Patterns
PUB / SUB
One-to-many broadcast. :class:Publisher → :class:Subscriber.
PUSH / PULL
Pipeline (load-balanced). :class:Push → :class:Pull.
REQ / REP
Request–reply. :class:Requester ↔ :class:Replier.
Sample types
:data:CI32, :data:CF64, :data:CF128
Examples:
>>> from doppler.stream import Publisher, Subscriber, CF64
>>> import numpy as np
>>>
>>> samples = np.array([1+2j, 3+4j], dtype=np.complex128)
>>> with Publisher("tcp://*:5556", CF64) as pub:
... pub.send(samples, sample_rate=int(1e6), center_freq=int(2.4e9))
Publisher
ZMQ PUB socket — broadcasts signal frames to all subscribers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
str
|
ZMQ endpoint, e.g. |
required |
sample_type
|
int
|
One of :data: |
required |
send
send(samples: NDArray[Any], *, sample_rate: float = 0.0, center_freq: float = 0.0) -> None
Broadcast a signal frame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samples
|
NDArray[Any]
|
1-D array of CI32 (int32×2), CF64 (complex128), or CF128 samples matching the socket's sample_type. |
required |
sample_rate
|
float
|
Samples per second (stored in header; 0 = unspecified). |
0.0
|
center_freq
|
float
|
Centre frequency in Hz (stored in header; 0 = unspecified). |
0.0
|
Subscriber
ZMQ SUB socket — receives signal frames from a publisher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
str
|
ZMQ endpoint, e.g. |
required |
recv
recv(*, timeout_ms: int = -1) -> tuple[NDArray[Any], dict[str, Any]]
Receive one signal frame (zero-copy).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_ms
|
int
|
Milliseconds to wait; |
-1
|
Returns:
| Name | Type | Description |
|---|---|---|
samples |
NDArray[Any]
|
1-D NumPy array backed by the ZMQ buffer. dtype matches the sender's sample_type (complex128 for CF64, etc.). |
header |
dict[str, Any]
|
Dict with keys |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout_ms expires before a frame arrives. |
Push
ZMQ PUSH socket — sends frames to a PULL socket (pipeline).
Pull
ZMQ PULL socket — receives frames from a PUSH socket (pipeline).
Requester
ZMQ REQ socket — sends a request and waits for one reply.
Replier
ZMQ REP socket — receives a request and sends one reply.
get_timestamp_ns
get_timestamp_ns() -> int
Return the current time as nanoseconds since the Unix epoch.