Skip to content

Python Streaming API

ZMQ-backed streaming — PUB/SUB, PUSH/PULL, and REQ/REP socket pairs, all backed by the native C dp_* streaming functions. All socket types support the context manager protocol.

Source: src/doppler/stream/__init__.py


stream

stream — ZMQ-based IQ streaming types.

CI32 module-attribute

CI32: int

Complex int32 — interleaved int32_t I/Q (8 bytes/sample).

Each complex sample occupies two consecutive int32_t elements in the array: element 2k is I, element 2k+1 is Q. For n complex samples the send functions expect a C-contiguous numpy.int32 array of length 2*n; recv returns the same flat layout.

Wire value: 0.

Examples:

>>> from doppler.stream import CI32
>>> CI32
0

CF64 module-attribute

CF64: int

Complex float64 — double _Complex (16 bytes/sample).

Sent and received as numpy.complex128. Default sample type for all sender socket types.

Wire value: 1.

Examples:

>>> from doppler.stream import CF64
>>> CF64
1

CF128 module-attribute

CF128: int

Complex long double — long double _Complex (32 bytes/sample).

Sent and received as numpy.clongdouble (complex256 on x86-64 Linux where long double is 80-bit extended precision stored in 16 bytes, giving an effective 128-byte-per-pair wire format).

Wire value: 2.

Examples:

>>> from doppler.stream import CF128
>>> CF128
2

Publisher

ZMQ PUB socket — one-to-many broadcast of signal frames.

Wraps dp_pub_t. Each :meth:send call emits a two-frame ZMQ multipart message: a dp_header_t frame followed by a raw data frame. Multiple :class:Subscriber sockets can connect to one Publisher; each subscriber receives every frame. Slow subscribers drop frames according to ZMQ's high-watermark (HWM) policy — there is no back-pressure.

The socket binds to endpoint; Subscribers connect to it. Use "ipc:///tmp/feed" for intra-host transfers (lower latency than TCP) and "tcp://*:PORT" for inter-host.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to bind, e.g. "tcp://*:5556" or "ipc:///tmp/iq.sock".

required
sample_type int

Wire encoding. One of :data:CI32, :data:CF64 (default), :data:CF128. Raises :exc:ValueError for any other value.

...

Raises:

Type Description
ValueError

If sample_type is not one of the three supported values.

RuntimeError

If the ZMQ context or socket cannot be created (e.g. the port is already in use).

Examples:

Construct, use as a context manager, and verify the type:

>>> from doppler.stream import Publisher, CF64
>>> pub = Publisher("tcp://*:19100", CF64)
>>> type(pub).__name__
'Publisher'
>>> pub.close()

Context-manager form (preferred — ensures close on exception):

>>> with Publisher("tcp://*:19101", CF64) as pub:
...     type(pub).__name__
'Publisher'

Full send round-trip with a :class:Subscriber (requires a live ZMQ connection and a brief warm-up sleep):

>>> import numpy as np, time
>>> from doppler.stream import Subscriber
>>> pub = Publisher("tcp://*:5556", CF64)
>>> sub = Subscriber("tcp://localhost:5556")
>>> time.sleep(0.1)
>>> pub.send(np.array([1+2j, 3+4j],
...          dtype=np.complex128),
...          sample_rate=int(1e6),
...          center_freq=int(2.4e9))
>>> samples, hdr = sub.recv(timeout_ms=2000)
>>> pub.close(); sub.close()

__init__

__init__(endpoint: str, sample_type: int = ...) -> None

Create a Publisher socket and bind to endpoint.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint string to bind, e.g. "tcp://*:5556" or "ipc:///tmp/feed".

required
sample_type int

Wire encoding: :data:CI32, :data:CF64 (default), or :data:CF128.

...

__enter__

__enter__() -> Publisher

Return self for use as a context manager.

Examples:

>>> from doppler.stream import Publisher, CF64
>>> with Publisher("tcp://*:19102", CF64) as pub:
...     type(pub).__name__
'Publisher'

__exit__

__exit__(*args: object) -> None

Call :meth:close on context-manager exit.

send

send(samples: NDArray[Any], sample_rate: float = 0, center_freq: float = 0) -> None

Broadcast one block of samples to all connected Subscribers.

Constructs a dp_header_t with the supplied metadata (plus an auto-generated timestamp_ns from CLOCK_REALTIME and a per-socket monotonically increasing sequence number), then sends a two-frame ZMQ multipart message: header frame followed by raw sample bytes. The call releases the GIL while blocked in ZMQ, so other Python threads can run concurrently.

Parameters:

Name Type Description Default
samples ndarray

C-contiguous array whose dtype must match the socket's sample_type: numpy.complex128 for :data:CF64, numpy.clongdouble for :data:CF128, numpy.int32 for :data:CI32 (interleaved I/Q, length 2*n_samples).

required
sample_rate float

Samples per second written into the header (default 0).

0
center_freq float

Centre frequency in Hz written into the header (default 0).

0

Raises:

Type Description
TypeError

If samples.dtype does not match the socket's sample_type.

ValueError

If samples is not C-contiguous.

RuntimeError

If the ZMQ send fails.

Examples:

>>> from doppler.stream import Publisher, CF64
>>> import numpy as np, time
>>> pub = Publisher("tcp://*:5556", CF64)
>>> pub.send(np.ones(4, dtype=np.complex128),
...          sample_rate=int(48000),
...          center_freq=int(433e6))
>>> pub.close()

close

close() -> None

Destroy the ZMQ socket and release all resources.

Calls dp_pub_destroy(). Safe to call multiple times — subsequent calls are no-ops. After close() the object must not be used for sending.

Examples:

>>> from doppler.stream import Publisher, CF64
>>> pub = Publisher("tcp://*:19103", CF64)
>>> pub.close()
>>> pub.close()  # idempotent — no error

Subscriber

ZMQ SUB socket — receives signal frames from a :class:Publisher.

Wraps dp_sub_t. Connects to the Publisher's endpoint. The socket subscribes to all topics (empty ZMQ topic filter), so it receives every frame the Publisher sends.

Unlike :class:Pull, a single Subscriber socket connects to exactly one Publisher. For fan-in (receiving from multiple publishers) or load-balanced consumption, use :class:Pull.

The recv path is zero-copy: the returned NumPy array's memory is owned by an internal dp_msg_t handle. The ZMQ buffer is freed only when the array (and all views of it) are garbage-collected.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to connect to, e.g. "tcp://localhost:5556" or "ipc:///tmp/feed".

required

Raises:

Type Description
RuntimeError

If the ZMQ context or socket cannot be created.

Examples:

>>> from doppler.stream import Subscriber
>>> sub = Subscriber("tcp://localhost:19104")
>>> type(sub).__name__
'Subscriber'
>>> sub.close()

Context-manager form:

>>> with Subscriber("tcp://localhost:19105") as sub:
...     type(sub).__name__
'Subscriber'

Receive one frame (requires a live :class:Publisher):

>>> sub.recv(timeout_ms=500)
(array([1.+2.j, 3.+4.j]), {'sample_rate': 1000000.0, ...})

__init__

__init__(endpoint: str) -> None

Create a Subscriber socket and connect to endpoint.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to connect to, e.g. "tcp://localhost:5556".

required

__enter__

__enter__() -> Subscriber

Return self for use as a context manager.

Examples:

>>> from doppler.stream import Subscriber
>>> with Subscriber("tcp://localhost:19106") as sub:
...     type(sub).__name__
'Subscriber'

__exit__

__exit__(*args: object) -> None

Call :meth:close on context-manager exit.

recv

recv(timeout_ms: int = -1) -> Tuple[NDArray[Any], Dict[str, Any]]

Receive one signal frame from the connected Publisher.

Blocks until a frame arrives or the optional timeout expires. The returned NumPy array is a zero-copy view into the ZMQ message buffer; the buffer is freed when the array is garbage-collected.

Parameters:

Name Type Description Default
timeout_ms int

Milliseconds to wait before raising :exc:TimeoutError. -1 (default) blocks indefinitely; 0 returns immediately if no frame is available.

-1

Returns:

Name Type Description
samples ndarray

Decoded sample data. dtype is numpy.complex128 (:data:CF64), numpy.clongdouble (:data:CF128), or numpy.int32 flat interleaved I/Q (:data:CI32).

header dict

Decoded dp_header_t fields:

sample_rate : float Samples per second as reported by the sender. center_freq : float Centre frequency in Hz as reported by the sender. sample_type : int Wire sample type (one of :data:CI32, :data:CF64, :data:CF128). timestamp_ns : int Frame timestamp (CLOCK_REALTIME nanoseconds) set by the sender at the moment of the send call. sequence : int Monotonically increasing per-sender frame counter; gaps indicate dropped frames. num_samples : int Number of IQ samples in the frame (len(samples) for CF64/CF128; len(samples)//2 for CI32). protocol : int Wire protocol (0 = SIGS, 1 = DIFI/VITA 49). stream_id : int Opaque stream identifier (0 for SIGS protocol).

Raises:

Type Description
TimeoutError

If timeout_ms elapses before a frame arrives.

RuntimeError

If the ZMQ recv fails for any other reason.

Examples:

>>> from doppler.stream import Subscriber
>>> with Subscriber("tcp://localhost:5556") as sub:
...     samples, hdr = sub.recv(timeout_ms=1000)
...     print(samples.dtype, hdr["sample_rate"])

close

close() -> None

Destroy the ZMQ socket and release all resources.

Calls dp_sub_destroy(). Safe to call multiple times.

Examples:

>>> from doppler.stream import Subscriber
>>> sub = Subscriber("tcp://localhost:19107")
>>> sub.close()
>>> sub.close()  # idempotent — no error

Push

ZMQ PUSH socket — pipeline sender for load-balanced distribution.

Wraps dp_push_t. Frames are distributed round-robin across all connected :class:Pull sockets. Each frame is delivered to exactly one Pull consumer, unlike :class:Publisher which fans out to all subscribers.

Use the PUSH/PULL pattern when you have a pool of workers and want automatic load balancing, or when you need back-pressure (PUSH blocks when no Pull worker is ready to receive).

The socket binds to endpoint; Pull workers connect to it.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to bind, e.g. "tcp://*:5557".

required
sample_type int

Wire encoding. One of :data:CI32, :data:CF64 (default), :data:CF128. Raises :exc:ValueError for any other value.

...

Raises:

Type Description
ValueError

If sample_type is not one of the three supported values.

RuntimeError

If the ZMQ context or socket cannot be created.

Examples:

>>> from doppler.stream import Push, CF64
>>> push = Push("tcp://*:19108", CF64)
>>> type(push).__name__
'Push'
>>> push.close()

Context-manager form:

>>> with Push("tcp://*:19109", CF64) as push:
...     type(push).__name__
'Push'

Round-trip with a :class:Pull worker (requires a live connection):

>>> import numpy as np, time
>>> from doppler.stream import Pull
>>> push = Push("tcp://127.0.0.1:5557", CF64)
>>> pull = Pull("tcp://127.0.0.1:5557")
>>> time.sleep(0.05)
>>> push.send(np.ones(4, dtype=np.complex128))
>>> samples, hdr = pull.recv(timeout_ms=2000)
>>> push.close(); pull.close()

__init__

__init__(endpoint: str, sample_type: int = ...) -> None

Create a Push socket and bind to endpoint.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to bind, e.g. "tcp://*:5557".

required
sample_type int

Wire encoding: :data:CI32, :data:CF64 (default), or :data:CF128.

...

__enter__

__enter__() -> Push

Return self for use as a context manager.

Examples:

>>> from doppler.stream import Push, CF64
>>> with Push("tcp://*:19110", CF64) as push:
...     type(push).__name__
'Push'

__exit__

__exit__(*args: object) -> None

Call :meth:close on context-manager exit.

send

send(samples: NDArray[Any], sample_rate: float = 0, center_freq: float = 0) -> None

Send one block of samples to the next available Pull worker.

Frames are distributed round-robin. The call blocks until a Pull socket is ready to accept (back-pressure), then releases the GIL while blocked in ZMQ.

Parameters:

Name Type Description Default
samples ndarray

C-contiguous array whose dtype must match the socket's sample_type: numpy.complex128 for :data:CF64, numpy.clongdouble for :data:CF128, numpy.int32 for :data:CI32 (interleaved I/Q, length 2*n).

required
sample_rate float

Samples per second written into the header (default 0).

0
center_freq float

Centre frequency in Hz written into the header (default 0).

0

Raises:

Type Description
TypeError

If samples.dtype does not match the socket's sample_type.

ValueError

If samples is not C-contiguous.

RuntimeError

If the ZMQ send fails.

Examples:

>>> from doppler.stream import Push, Pull, CF64
>>> import numpy as np, time
>>> push = Push("tcp://127.0.0.1:5557", CF64)
>>> pull = Pull("tcp://127.0.0.1:5557")
>>> time.sleep(0.05)
>>> push.send(np.array([1+2j, 3+4j],
...           dtype=np.complex128),
...           sample_rate=int(48000))
>>> samples, hdr = pull.recv(timeout_ms=2000)
>>> push.close(); pull.close()

close

close() -> None

Destroy the ZMQ socket and release all resources.

Calls dp_push_destroy(). Safe to call multiple times.

Examples:

>>> from doppler.stream import Push, CF64
>>> push = Push("tcp://*:19111", CF64)
>>> push.close()
>>> push.close()  # idempotent — no error

Pull

ZMQ PULL socket — pipeline receiver for load-balanced workers.

Wraps dp_pull_t. Receives frames from a :class:Push socket. Multiple Pull workers can share one Push sender; each frame goes to exactly one worker (round-robin from the Push side).

The recv path is zero-copy: see :class:Subscriber for the buffer lifetime semantics.

The socket connects to the Push endpoint; the Push socket binds.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to connect to, e.g. "tcp://localhost:5557".

required

Raises:

Type Description
RuntimeError

If the ZMQ context or socket cannot be created.

Examples:

>>> from doppler.stream import Pull
>>> pull = Pull("tcp://localhost:19112")
>>> type(pull).__name__
'Pull'
>>> pull.close()

Context-manager form:

>>> with Pull("tcp://localhost:19113") as pull:
...     type(pull).__name__
'Pull'

Receive one frame (requires a live :class:Push):

>>> pull.recv(timeout_ms=500)
(array([1.+2.j, ...]), {'sample_rate': ..., ...})

__init__

__init__(endpoint: str) -> None

Create a Pull socket and connect to endpoint.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to connect to, e.g. "tcp://localhost:5557".

required

__enter__

__enter__() -> Pull

Return self for use as a context manager.

Examples:

>>> from doppler.stream import Pull
>>> with Pull("tcp://localhost:19114") as pull:
...     type(pull).__name__
'Pull'

__exit__

__exit__(*args: object) -> None

Call :meth:close on context-manager exit.

recv

recv(timeout_ms: int = -1) -> Tuple[NDArray[Any], Dict[str, Any]]

Receive one signal frame from the connected Push socket.

Blocks until a frame arrives or the optional timeout expires. The returned NumPy array is a zero-copy view into the ZMQ message buffer; the buffer is freed when the array is garbage-collected.

Parameters:

Name Type Description Default
timeout_ms int

Milliseconds to wait before raising :exc:TimeoutError. -1 (default) blocks indefinitely; 0 returns immediately if no frame is available.

-1

Returns:

Name Type Description
samples ndarray

Decoded sample data. dtype is numpy.complex128 (:data:CF64), numpy.clongdouble (:data:CF128), or numpy.int32 flat interleaved I/Q (:data:CI32).

header dict

Decoded dp_header_t fields — see :meth:Subscriber.recv for the full key list.

Raises:

Type Description
TimeoutError

If timeout_ms elapses before a frame arrives.

RuntimeError

If the ZMQ recv fails for any other reason.

Examples:

>>> from doppler.stream import Pull
>>> with Pull("tcp://localhost:5557") as pull:
...     samples, hdr = pull.recv(timeout_ms=1000)
...     print(samples.dtype, hdr["num_samples"])

close

close() -> None

Destroy the ZMQ socket and release all resources.

Calls dp_pull_destroy(). Safe to call multiple times.

Examples:

>>> from doppler.stream import Pull
>>> pull = Pull("tcp://localhost:19115")
>>> pull.close()
>>> pull.close()  # idempotent — no error

Requester

ZMQ REQ socket — sends a request frame, then waits for a reply.

Wraps dp_req_t. The REQ/REP pattern is strictly alternating: :meth:send must be called before :meth:recv, and :meth:recv must complete before the next :meth:send. Violating this order triggers a ZMQ FSM error.

Complements :class:Replier. Use this pattern for control-plane messages (tuning commands, metadata queries) or synchronous signal-frame RPC where one peer processes each frame and returns a result.

The socket connects to the Replier endpoint.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to connect to, e.g. "tcp://localhost:5558".

required
sample_type int

Wire encoding of frames sent by this socket. One of :data:CI32, :data:CF64 (default), :data:CF128. The reply frame's type is determined by the :class:Replier.

...

Raises:

Type Description
ValueError

If sample_type is not one of the three supported values.

RuntimeError

If the ZMQ context or socket cannot be created.

Examples:

>>> from doppler.stream import Requester, CF64
>>> req = Requester("tcp://localhost:19116", CF64)
>>> type(req).__name__
'Requester'
>>> req.close()

Context-manager form:

>>> with Requester("tcp://localhost:19117", CF64) as req:
...     type(req).__name__
'Requester'

Full REQ/REP round-trip (requires a live :class:Replier):

>>> import numpy as np, time
>>> from doppler.stream import Replier
>>> rep = Replier("tcp://*:5558", CF64)
>>> req = Requester("tcp://localhost:5558", CF64)
>>> time.sleep(0.05)
>>> req.send(np.ones(4, dtype=np.complex128),
...          sample_rate=int(1e6))
>>> request, hdr = rep.recv(timeout_ms=2000)
>>> rep.send(request)
>>> reply, hdr2 = req.recv(timeout_ms=2000)
>>> req.close(); rep.close()

__init__

__init__(endpoint: str, sample_type: int = ...) -> None

Create a Requester socket and connect to endpoint.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to connect to, e.g. "tcp://localhost:5558".

required
sample_type int

Wire encoding of frames sent by this socket: :data:CI32, :data:CF64 (default), or :data:CF128.

...

__enter__

__enter__() -> Requester

Return self for use as a context manager.

Examples:

>>> from doppler.stream import Requester, CF64
>>> with Requester("tcp://localhost:19118", CF64) as req:
...     type(req).__name__
'Requester'

__exit__

__exit__(*args: object) -> None

Call :meth:close on context-manager exit.

send

send(samples: NDArray[Any], sample_rate: float = 0, center_freq: float = 0) -> None

Send a request frame to the connected :class:Replier.

The ZMQ REQ FSM requires that each :meth:send be followed by exactly one :meth:recv before the next send. Calling send twice in a row raises a ZMQ error.

Parameters:

Name Type Description Default
samples ndarray

C-contiguous array whose dtype must match the socket's sample_type: numpy.complex128 for :data:CF64, numpy.clongdouble for :data:CF128, numpy.int32 for :data:CI32.

required
sample_rate float

Samples per second written into the header (default 0).

0
center_freq float

Centre frequency in Hz written into the header (default 0).

0

Raises:

Type Description
TypeError

If samples.dtype does not match the socket's sample_type.

ValueError

If samples is not C-contiguous.

RuntimeError

If the ZMQ send fails (including FSM violation).

Examples:

>>> from doppler.stream import Requester, CF64
>>> import numpy as np
>>> req = Requester("tcp://localhost:5558", CF64)
>>> req.send(np.ones(4, dtype=np.complex128),
...          sample_rate=int(1e6))

recv

recv(timeout_ms: int = -1) -> Tuple[NDArray[Any], Dict[str, Any]]

Receive the reply frame from the :class:Replier.

Must be called after :meth:send; calling without a prior send triggers a ZMQ FSM error. Blocks until the reply arrives or the timeout expires.

Parameters:

Name Type Description Default
timeout_ms int

Milliseconds to wait before raising :exc:TimeoutError. -1 (default) blocks indefinitely.

-1

Returns:

Name Type Description
samples ndarray

Decoded reply data. dtype mirrors the :class:Replier's sample_type.

header dict

Decoded dp_header_t fields — see :meth:Subscriber.recv for the full key list.

Raises:

Type Description
TimeoutError

If timeout_ms elapses before the reply arrives.

RuntimeError

If the ZMQ recv fails (including FSM violation from calling recv without a prior send).

Examples:

>>> from doppler.stream import Requester, CF64
>>> req = Requester("tcp://localhost:5558", CF64)
>>> req.send(np.ones(4, dtype=np.complex128))
>>> reply, hdr = req.recv(timeout_ms=2000)

close

close() -> None

Destroy the ZMQ socket and release all resources.

Calls dp_req_destroy(). Safe to call multiple times.

Examples:

>>> from doppler.stream import Requester, CF64
>>> req = Requester("tcp://localhost:19119", CF64)
>>> req.close()
>>> req.close()  # idempotent — no error

Replier

ZMQ REP socket — receives a request frame, then sends a reply.

Wraps dp_rep_t. The REQ/REP pattern is strictly alternating: :meth:recv must be called first to consume a request, then :meth:send emits the reply. Violating this order triggers a ZMQ FSM error.

Complements :class:Requester. Use for control-plane responses or signal-frame RPC where the Replier processes each frame and returns a result synchronously.

The socket binds to endpoint; Requesters connect to it.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to bind, e.g. "tcp://*:5558".

required
sample_type int

Wire encoding of frames sent by this socket (the reply). One of :data:CI32, :data:CF64 (default), :data:CF128. The request frame's type is determined by the :class:Requester.

...

Raises:

Type Description
ValueError

If sample_type is not one of the three supported values.

RuntimeError

If the ZMQ context or socket cannot be created.

Examples:

>>> from doppler.stream import Replier, CF64
>>> rep = Replier("tcp://*:19120", CF64)
>>> type(rep).__name__
'Replier'
>>> rep.close()

Context-manager form:

>>> with Replier("tcp://*:19121", CF64) as rep:
...     type(rep).__name__
'Replier'

Full REQ/REP server loop (requires a live :class:Requester):

>>> from doppler.stream import Requester
>>> import numpy as np, time
>>> rep = Replier("tcp://*:5558", CF64)
>>> req = Requester("tcp://localhost:5558", CF64)
>>> time.sleep(0.05)
>>> req.send(np.ones(4, dtype=np.complex128))
>>> request, hdr = rep.recv(timeout_ms=2000)
>>> rep.send(request, sample_rate=hdr["sample_rate"])
>>> reply, _ = req.recv(timeout_ms=2000)
>>> req.close(); rep.close()

__init__

__init__(endpoint: str, sample_type: int = ...) -> None

Create a Replier socket and bind to endpoint.

Parameters:

Name Type Description Default
endpoint str

ZMQ endpoint to bind, e.g. "tcp://*:5558".

required
sample_type int

Wire encoding of reply frames sent by this socket: :data:CI32, :data:CF64 (default), or :data:CF128.

...

__enter__

__enter__() -> Replier

Return self for use as a context manager.

Examples:

>>> from doppler.stream import Replier, CF64
>>> with Replier("tcp://*:19122", CF64) as rep:
...     type(rep).__name__
'Replier'

__exit__

__exit__(*args: object) -> None

Call :meth:close on context-manager exit.

recv

recv(timeout_ms: int = -1) -> Tuple[NDArray[Any], Dict[str, Any]]

Receive one request frame from the :class:Requester.

Must be called before :meth:send; calling send before recv triggers a ZMQ FSM error. Blocks until a request arrives or the timeout expires.

Parameters:

Name Type Description Default
timeout_ms int

Milliseconds to wait before raising :exc:TimeoutError. -1 (default) blocks indefinitely.

-1

Returns:

Name Type Description
samples ndarray

Decoded request data. dtype mirrors the :class:Requester's sample_type.

header dict

Decoded dp_header_t fields — see :meth:Subscriber.recv for the full key list.

Raises:

Type Description
TimeoutError

If timeout_ms elapses before a request arrives.

RuntimeError

If the ZMQ recv fails (including FSM violation from calling recv twice in a row).

Examples:

>>> from doppler.stream import Replier, CF64
>>> rep = Replier("tcp://*:5558", CF64)
>>> request, hdr = rep.recv(timeout_ms=5000)
>>> rep.send(request,
...          sample_rate=hdr["sample_rate"])

send

send(samples: NDArray[Any], sample_rate: float = 0, center_freq: float = 0) -> None

Send the reply frame back to the :class:Requester.

Must be called after :meth:recv; calling without a prior recv triggers a ZMQ FSM error. After this call returns, the Replier is ready to recv the next request.

Parameters:

Name Type Description Default
samples ndarray

C-contiguous array whose dtype must match the socket's sample_type: numpy.complex128 for :data:CF64, numpy.clongdouble for :data:CF128, numpy.int32 for :data:CI32.

required
sample_rate float

Samples per second written into the reply header (default 0).

0
center_freq float

Centre frequency in Hz written into the reply header (default 0).

0

Raises:

Type Description
TypeError

If samples.dtype does not match the socket's sample_type.

ValueError

If samples is not C-contiguous.

RuntimeError

If the ZMQ send fails (including FSM violation from calling send before recv).

Examples:

>>> from doppler.stream import Replier, CF64
>>> import numpy as np
>>> rep = Replier("tcp://*:5558", CF64)
>>> request, hdr = rep.recv(timeout_ms=5000)
>>> rep.send(np.zeros_like(request),
...          sample_rate=hdr["sample_rate"])

close

close() -> None

Destroy the ZMQ socket and release all resources.

Calls dp_rep_destroy(). Safe to call multiple times.

Examples:

>>> from doppler.stream import Replier, CF64
>>> rep = Replier("tcp://*:19123", CF64)
>>> rep.close()
>>> rep.close()  # idempotent — no error

get_timestamp_ns

get_timestamp_ns() -> int

Current wall-clock time in nanoseconds since the UNIX epoch.

Calls clock_gettime(CLOCK_REALTIME) in the C layer. Useful for stamping outgoing frames when the caller does not supply its own timestamp_ns, or for computing round-trip latency from the value returned in the received header dict.

Returns:

Type Description
int

Non-negative nanosecond timestamp. Guaranteed to be monotonically non-decreasing within a single process on any POSIX system that supports CLOCK_REALTIME.

Examples:

>>> from doppler.stream import get_timestamp_ns
>>> t = get_timestamp_ns()
>>> isinstance(t, int)
True
>>> t > 0
True

options: members: - get_timestamp_ns - Publisher - Subscriber - Push - Pull - Requester - Replier