Python Streaming API¶
ZMQ-backed streaming — PUB/SUB, PUSH/PULL, and REQ/REP socket pairs,
all backed by the native C dp_* streaming functions. All socket
types support the context manager protocol.
Source:
src/doppler/stream/__init__.py
stream
¶
stream — ZMQ-based IQ streaming types.
CI32
module-attribute
¶
Complex int32 — interleaved int32_t I/Q (8 bytes/sample).
Each complex sample occupies two consecutive int32_t elements in the
array: element 2k is I, element 2k+1 is Q. For n complex samples the
send functions expect a C-contiguous numpy.int32 array of length
2*n; recv returns the same flat layout.
Wire value: 0.
Examples:
CF64
module-attribute
¶
CF128
module-attribute
¶
Complex long double — long double _Complex (32 bytes/sample).
Sent and received as numpy.clongdouble (complex256 on x86-64
Linux where long double is 80-bit extended precision stored in 16
bytes, giving an effective 128-byte-per-pair wire format).
Wire value: 2.
Examples:
Publisher
¶
ZMQ PUB socket — one-to-many broadcast of signal frames.
Wraps dp_pub_t. Each :meth:send call emits a two-frame ZMQ
multipart message: a dp_header_t frame followed by a raw data
frame. Multiple :class:Subscriber sockets can connect to one
Publisher; each subscriber receives every frame. Slow subscribers
drop frames according to ZMQ's high-watermark (HWM) policy —
there is no back-pressure.
The socket binds to endpoint; Subscribers connect to it.
Use "ipc:///tmp/feed" for intra-host transfers (lower latency
than TCP) and "tcp://*:PORT" for inter-host.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to bind, e.g. |
required |
sample_type
|
int
|
Wire encoding. One of :data: |
...
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
RuntimeError
|
If the ZMQ context or socket cannot be created (e.g. the port is already in use). |
Examples:
Construct, use as a context manager, and verify the type:
>>> from doppler.stream import Publisher, CF64
>>> pub = Publisher("tcp://*:19100", CF64)
>>> type(pub).__name__
'Publisher'
>>> pub.close()
Context-manager form (preferred — ensures close on exception):
Full send round-trip with a :class:Subscriber (requires a live
ZMQ connection and a brief warm-up sleep):
>>> import numpy as np, time
>>> from doppler.stream import Subscriber
>>> pub = Publisher("tcp://*:5556", CF64)
>>> sub = Subscriber("tcp://localhost:5556")
>>> time.sleep(0.1)
>>> pub.send(np.array([1+2j, 3+4j],
... dtype=np.complex128),
... sample_rate=int(1e6),
... center_freq=int(2.4e9))
>>> samples, hdr = sub.recv(timeout_ms=2000)
>>> pub.close(); sub.close()
__init__
¶
Create a Publisher socket and bind to endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint string to bind, e.g. |
required |
sample_type
|
int
|
Wire encoding: :data: |
...
|
__enter__
¶
send
¶
Broadcast one block of samples to all connected Subscribers.
Constructs a dp_header_t with the supplied metadata (plus an
auto-generated timestamp_ns from CLOCK_REALTIME and a
per-socket monotonically increasing sequence number), then
sends a two-frame ZMQ multipart message: header frame followed
by raw sample bytes. The call releases the GIL while blocked in
ZMQ, so other Python threads can run concurrently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samples
|
ndarray
|
C-contiguous array whose dtype must match the socket's
|
required |
sample_rate
|
float
|
Samples per second written into the header (default 0). |
0
|
center_freq
|
float
|
Centre frequency in Hz written into the header (default 0). |
0
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
ValueError
|
If |
RuntimeError
|
If the ZMQ send fails. |
Examples:
close
¶
Destroy the ZMQ socket and release all resources.
Calls dp_pub_destroy(). Safe to call multiple times —
subsequent calls are no-ops. After close() the object
must not be used for sending.
Examples:
Subscriber
¶
ZMQ SUB socket — receives signal frames from a :class:Publisher.
Wraps dp_sub_t. Connects to the Publisher's endpoint. The
socket subscribes to all topics (empty ZMQ topic filter), so it
receives every frame the Publisher sends.
Unlike :class:Pull, a single Subscriber socket connects to exactly
one Publisher. For fan-in (receiving from multiple publishers) or
load-balanced consumption, use :class:Pull.
The recv path is zero-copy: the returned NumPy array's memory is
owned by an internal dp_msg_t handle. The ZMQ buffer is freed
only when the array (and all views of it) are garbage-collected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to connect to, e.g. |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the ZMQ context or socket cannot be created. |
Examples:
>>> from doppler.stream import Subscriber
>>> sub = Subscriber("tcp://localhost:19104")
>>> type(sub).__name__
'Subscriber'
>>> sub.close()
Context-manager form:
Receive one frame (requires a live :class:Publisher):
__init__
¶
Create a Subscriber socket and connect to endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to connect to, e.g.
|
required |
__enter__
¶
recv
¶
Receive one signal frame from the connected Publisher.
Blocks until a frame arrives or the optional timeout expires. The returned NumPy array is a zero-copy view into the ZMQ message buffer; the buffer is freed when the array is garbage-collected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_ms
|
int
|
Milliseconds to wait before raising :exc: |
-1
|
Returns:
| Name | Type | Description |
|---|---|---|
samples |
ndarray
|
Decoded sample data. dtype is |
header |
dict
|
Decoded
|
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If |
RuntimeError
|
If the ZMQ recv fails for any other reason. |
Examples:
close
¶
Push
¶
ZMQ PUSH socket — pipeline sender for load-balanced distribution.
Wraps dp_push_t. Frames are distributed round-robin across all
connected :class:Pull sockets. Each frame is delivered to exactly
one Pull consumer, unlike :class:Publisher which fans out to all
subscribers.
Use the PUSH/PULL pattern when you have a pool of workers and want automatic load balancing, or when you need back-pressure (PUSH blocks when no Pull worker is ready to receive).
The socket binds to endpoint; Pull workers connect to it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to bind, e.g. |
required |
sample_type
|
int
|
Wire encoding. One of :data: |
...
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
RuntimeError
|
If the ZMQ context or socket cannot be created. |
Examples:
>>> from doppler.stream import Push, CF64
>>> push = Push("tcp://*:19108", CF64)
>>> type(push).__name__
'Push'
>>> push.close()
Context-manager form:
Round-trip with a :class:Pull worker (requires a live connection):
>>> import numpy as np, time
>>> from doppler.stream import Pull
>>> push = Push("tcp://127.0.0.1:5557", CF64)
>>> pull = Pull("tcp://127.0.0.1:5557")
>>> time.sleep(0.05)
>>> push.send(np.ones(4, dtype=np.complex128))
>>> samples, hdr = pull.recv(timeout_ms=2000)
>>> push.close(); pull.close()
__init__
¶
Create a Push socket and bind to endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to bind, e.g. |
required |
sample_type
|
int
|
Wire encoding: :data: |
...
|
__enter__
¶
send
¶
Send one block of samples to the next available Pull worker.
Frames are distributed round-robin. The call blocks until a Pull socket is ready to accept (back-pressure), then releases the GIL while blocked in ZMQ.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samples
|
ndarray
|
C-contiguous array whose dtype must match the socket's
|
required |
sample_rate
|
float
|
Samples per second written into the header (default 0). |
0
|
center_freq
|
float
|
Centre frequency in Hz written into the header (default 0). |
0
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
ValueError
|
If |
RuntimeError
|
If the ZMQ send fails. |
Examples:
>>> from doppler.stream import Push, Pull, CF64
>>> import numpy as np, time
>>> push = Push("tcp://127.0.0.1:5557", CF64)
>>> pull = Pull("tcp://127.0.0.1:5557")
>>> time.sleep(0.05)
>>> push.send(np.array([1+2j, 3+4j],
... dtype=np.complex128),
... sample_rate=int(48000))
>>> samples, hdr = pull.recv(timeout_ms=2000)
>>> push.close(); pull.close()
close
¶
Pull
¶
ZMQ PULL socket — pipeline receiver for load-balanced workers.
Wraps dp_pull_t. Receives frames from a :class:Push socket.
Multiple Pull workers can share one Push sender; each frame goes to
exactly one worker (round-robin from the Push side).
The recv path is zero-copy: see :class:Subscriber for the buffer
lifetime semantics.
The socket connects to the Push endpoint; the Push socket binds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to connect to, e.g. |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the ZMQ context or socket cannot be created. |
Examples:
>>> from doppler.stream import Pull
>>> pull = Pull("tcp://localhost:19112")
>>> type(pull).__name__
'Pull'
>>> pull.close()
Context-manager form:
Receive one frame (requires a live :class:Push):
__init__
¶
Create a Pull socket and connect to endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to connect to, e.g.
|
required |
__enter__
¶
recv
¶
Receive one signal frame from the connected Push socket.
Blocks until a frame arrives or the optional timeout expires. The returned NumPy array is a zero-copy view into the ZMQ message buffer; the buffer is freed when the array is garbage-collected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_ms
|
int
|
Milliseconds to wait before raising :exc: |
-1
|
Returns:
| Name | Type | Description |
|---|---|---|
samples |
ndarray
|
Decoded sample data. dtype is |
header |
dict
|
Decoded |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If |
RuntimeError
|
If the ZMQ recv fails for any other reason. |
Examples:
close
¶
Requester
¶
ZMQ REQ socket — sends a request frame, then waits for a reply.
Wraps dp_req_t. The REQ/REP pattern is strictly alternating:
:meth:send must be called before :meth:recv, and :meth:recv
must complete before the next :meth:send. Violating this order
triggers a ZMQ FSM error.
Complements :class:Replier. Use this pattern for control-plane
messages (tuning commands, metadata queries) or synchronous
signal-frame RPC where one peer processes each frame and returns a
result.
The socket connects to the Replier endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to connect to, e.g. |
required |
sample_type
|
int
|
Wire encoding of frames sent by this socket. One of
:data: |
...
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
RuntimeError
|
If the ZMQ context or socket cannot be created. |
Examples:
>>> from doppler.stream import Requester, CF64
>>> req = Requester("tcp://localhost:19116", CF64)
>>> type(req).__name__
'Requester'
>>> req.close()
Context-manager form:
Full REQ/REP round-trip (requires a live :class:Replier):
>>> import numpy as np, time
>>> from doppler.stream import Replier
>>> rep = Replier("tcp://*:5558", CF64)
>>> req = Requester("tcp://localhost:5558", CF64)
>>> time.sleep(0.05)
>>> req.send(np.ones(4, dtype=np.complex128),
... sample_rate=int(1e6))
>>> request, hdr = rep.recv(timeout_ms=2000)
>>> rep.send(request)
>>> reply, hdr2 = req.recv(timeout_ms=2000)
>>> req.close(); rep.close()
__init__
¶
Create a Requester socket and connect to endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to connect to, e.g.
|
required |
sample_type
|
int
|
Wire encoding of frames sent by this socket:
:data: |
...
|
__enter__
¶
send
¶
Send a request frame to the connected :class:Replier.
The ZMQ REQ FSM requires that each :meth:send be followed by
exactly one :meth:recv before the next send. Calling
send twice in a row raises a ZMQ error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samples
|
ndarray
|
C-contiguous array whose dtype must match the socket's
|
required |
sample_rate
|
float
|
Samples per second written into the header (default 0). |
0
|
center_freq
|
float
|
Centre frequency in Hz written into the header (default 0). |
0
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
ValueError
|
If |
RuntimeError
|
If the ZMQ send fails (including FSM violation). |
Examples:
recv
¶
Receive the reply frame from the :class:Replier.
Must be called after :meth:send; calling without a prior
send triggers a ZMQ FSM error. Blocks until the reply
arrives or the timeout expires.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_ms
|
int
|
Milliseconds to wait before raising :exc: |
-1
|
Returns:
| Name | Type | Description |
|---|---|---|
samples |
ndarray
|
Decoded reply data. dtype mirrors the :class: |
header |
dict
|
Decoded |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If |
RuntimeError
|
If the ZMQ recv fails (including FSM violation from
calling |
Examples:
close
¶
Replier
¶
ZMQ REP socket — receives a request frame, then sends a reply.
Wraps dp_rep_t. The REQ/REP pattern is strictly alternating:
:meth:recv must be called first to consume a request, then
:meth:send emits the reply. Violating this order triggers a ZMQ
FSM error.
Complements :class:Requester. Use for control-plane responses or
signal-frame RPC where the Replier processes each frame and returns
a result synchronously.
The socket binds to endpoint; Requesters connect to it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to bind, e.g. |
required |
sample_type
|
int
|
Wire encoding of frames sent by this socket (the reply). One
of :data: |
...
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
RuntimeError
|
If the ZMQ context or socket cannot be created. |
Examples:
>>> from doppler.stream import Replier, CF64
>>> rep = Replier("tcp://*:19120", CF64)
>>> type(rep).__name__
'Replier'
>>> rep.close()
Context-manager form:
Full REQ/REP server loop (requires a live :class:Requester):
>>> from doppler.stream import Requester
>>> import numpy as np, time
>>> rep = Replier("tcp://*:5558", CF64)
>>> req = Requester("tcp://localhost:5558", CF64)
>>> time.sleep(0.05)
>>> req.send(np.ones(4, dtype=np.complex128))
>>> request, hdr = rep.recv(timeout_ms=2000)
>>> rep.send(request, sample_rate=hdr["sample_rate"])
>>> reply, _ = req.recv(timeout_ms=2000)
>>> req.close(); rep.close()
__init__
¶
Create a Replier socket and bind to endpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
ZMQ endpoint to bind, e.g. |
required |
sample_type
|
int
|
Wire encoding of reply frames sent by this socket:
:data: |
...
|
__enter__
¶
recv
¶
Receive one request frame from the :class:Requester.
Must be called before :meth:send; calling send before
recv triggers a ZMQ FSM error. Blocks until a request
arrives or the timeout expires.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout_ms
|
int
|
Milliseconds to wait before raising :exc: |
-1
|
Returns:
| Name | Type | Description |
|---|---|---|
samples |
ndarray
|
Decoded request data. dtype mirrors the
:class: |
header |
dict
|
Decoded |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If |
RuntimeError
|
If the ZMQ recv fails (including FSM violation from
calling |
Examples:
send
¶
Send the reply frame back to the :class:Requester.
Must be called after :meth:recv; calling without a prior
recv triggers a ZMQ FSM error. After this call returns,
the Replier is ready to recv the next request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
samples
|
ndarray
|
C-contiguous array whose dtype must match the socket's
|
required |
sample_rate
|
float
|
Samples per second written into the reply header (default 0). |
0
|
center_freq
|
float
|
Centre frequency in Hz written into the reply header (default 0). |
0
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
ValueError
|
If |
RuntimeError
|
If the ZMQ send fails (including FSM violation from
calling |
Examples:
close
¶
get_timestamp_ns
¶
Current wall-clock time in nanoseconds since the UNIX epoch.
Calls clock_gettime(CLOCK_REALTIME) in the C layer. Useful for
stamping outgoing frames when the caller does not supply its own
timestamp_ns, or for computing round-trip latency from the value
returned in the received header dict.
Returns:
| Type | Description |
|---|---|
int
|
Non-negative nanosecond timestamp. Guaranteed to be
monotonically non-decreasing within a single process on any
POSIX system that supports |
Examples:
options: members: - get_timestamp_ns - Publisher - Subscriber - Push - Pull - Requester - Replier