Streaming API

class atomicds.streaming.rheed_stream.RHEEDStreamer(api_key: str, endpoint: str | None = None, verbosity: int | None = None) None[source]

Bases: object

High-performance RHEED frame streaming client.

Bridges Python to a Rust/PyO3 backend for efficient, concurrent packaging and upload of uint8 grayscale frames to the Atomscale platform.

Parameters:
  • api_key (str)

  • endpoint (str | None)

  • verbosity (int | None)

initialize(fps: float, rotations_per_min: float, chunk_size: int, stream_name: str | None = None) str[source]
Parameters:
  • fps (float)

  • rotations_per_min (float)

  • chunk_size (int)

  • stream_name (str | None)

Return type:

str

run(data_id: str, frames_iter: Iterable[ndarray[tuple[Any, ...], dtype[uint8]]]) None[source]
Parameters:
  • data_id (str)

  • frames_iter (Iterable[ndarray[tuple[Any, ...], dtype[uint8]]])

Return type:

None

push(data_id: str, chunk_idx: int, frames: ndarray[tuple[Any, ...], dtype[uint8]]) None[source]
Parameters:
  • data_id (str)

  • chunk_idx (int)

  • frames (ndarray[tuple[Any, ...], dtype[uint8]])

Return type:

None

finalize(data_id: str) None[source]
Parameters:

data_id (str)

Return type:

None

Usage

from atomicds.streaming.rheed_stream import RHEEDStreamer

streamer = RHEEDStreamer(api_key="...")
data_id = streamer.initialize(
    fps=120.0,
    rotations_per_min=0.0,  # stationary
    chunk_size=240,
    stream_name="My RHEED Stream",
)


# Generator or iterator yielding (N,H,W) or (H,W) uint8 frames
def chunk_source():
    # yield your numpy arrays here
    yield ...


streamer.run(data_id, chunk_source())

# OR push chunks manually
for idx, frame_chunk in enumerate(frame_chunks):
    streamer.push(data_id, idx, frame_chunk)

streamer.finalize(data_id)