Source code for atomicds.timeseries.polling

# polling.py
from __future__ import annotations

import asyncio
import random
import threading
import time
from collections.abc import AsyncIterator, Callable, Iterator
from typing import Any

from pandas import DataFrame

from atomicds.timeseries.registry import get_provider

Result = DataFrame
DistinctFn = Callable[[Result], Any]
Predicate = Callable[[Result], bool]
ErrorHandler = Callable[[BaseException], None]


def _fetch_result(client, data_id: str, last_n: int | None) -> Result:
    """Build a result via provider -> fetch_raw -> to_dataframe.

    Args:
        client: API client instance passed to the provider.
        data_id: Identifier of the resource to fetch.
        last_n: Last number of entries to poll for

    Returns:
        Any: The provider-converted result (typically a pandas.DataFrame).
    """
    provider = get_provider("rheed")
    kwargs = {"last_n": last_n} if last_n is not None else {}
    raw = provider.fetch_raw(client, data_id, **kwargs)
    return provider.to_dataframe(raw)


def _drift_corrected_sleep(next_tick: float, interval: float) -> float:
    """Compute a non-negative sleep delay to keep a fixed polling cadence.

    This helper calculates how long to sleep to reach `next_tick`, and if
    the caller is behind schedule, it computes a catch-up strategy that
    preserves the target cadence (i.e., it skips missed ticks rather than
    accumulating delay).

    Args:
        next_tick: Target monotonic time (seconds) for the next poll.
        interval: Desired polling interval in seconds.

    Returns:
        float: Non-negative delay (seconds) to sleep before the next poll.
    """
    now = time.monotonic()
    delay = next_tick - now
    if delay < 0:
        missed = int((-delay) // interval)
        next_tick += missed * interval
        delay = next_tick - time.monotonic()
    return max(0.0, delay)


[docs] def iter_poll( client, data_id: str, *, interval: float = 1.0, last_n: int | None = None, distinct_by: DistinctFn | None = None, until: Predicate | None = None, max_polls: int | None = None, fire_immediately: bool = True, jitter: float = 0.0, on_error: ErrorHandler | None = None, ) -> Iterator[Result]: """ Yield time series results at a fixed cadence. Supports deduplication (via a key extractor), stop conditions, bounded polling, optional jitter, and non-fatal error handling. Args: client: API client instance forwarded to the provider. data_id: Identifier to fetch data for. last_n: Last number of time series data points to poll. None is all. interval: Seconds between polls. Defaults to 1.0. distinct_by: Optional function mapping a result to a hashable key for deduping. If provided, only results with a new key are yielded. until: Optional predicate; stop when it returns True for a result. max_polls: Optional maximum number of polls before stopping. fire_immediately: If True, perform the first poll immediately; otherwise wait one interval before the first poll. Defaults to True. jitter: Optional random delay (0..jitter) added to each sleep to avoid thundering herds. Clamped at `interval`. Defaults to 0.0. on_error: Optional error handler called with the raised exception when a poll fails. Errors are swallowed so polling continues. Yields: Any: Each (optionally deduped) time series data frame result. Notes: - Uses drift-corrected scheduling to maintain the requested cadence even if individual polls are slow. - Stops when `until` is satisfied or `max_polls` is reached (if set). """ last_key = object() polls = 0 next_tick = time.monotonic() if not fire_immediately: next_tick += interval while True: polls += 1 try: result = _fetch_result(client, data_id, last_n) except BaseException as exc: if on_error: on_error(exc) else: key = distinct_by(result) if distinct_by else object() if distinct_by is None or key != last_key: last_key = key yield result if until and until(result): return if max_polls and polls >= max_polls: return # timing next_tick += interval delay = _drift_corrected_sleep(next_tick, interval) if jitter: delay += random.uniform(0, max(0.0, min(jitter, interval))) time.sleep(delay)
[docs] async def aiter_poll( client, data_id: str, *, interval: float = 1.0, last_n: int | None = None, distinct_by: DistinctFn | None = None, until: Predicate | None = None, max_polls: int | None = None, fire_immediately: bool = True, jitter: float = 0.0, on_error: ErrorHandler | None = None, ) -> AsyncIterator[Result]: """ Asynchronously yield time series results without blocking the loop. Uses the the same semantics as `iter_poll`. Args: client: API client instance forwarded to the provider. data_id: Identifier to fetch data for. interval: Seconds between polls. Defaults to 1.0. last_n: Last number of time series data points to poll. None is all. distinct_by: Optional function mapping a result to a hashable key for deduping. If provided, only results with a new key are yielded. until: Optional predicate; stop when it returns True for a result. max_polls: Optional maximum number of polls before stopping. fire_immediately: If True, perform the first poll immediately; otherwise wait one interval before the first poll. Defaults to True. jitter: Optional random delay (0..jitter) added to each sleep to avoid thundering herds. Clamped at `interval`. Defaults to 0.0. on_error: Optional error handler called with the raised exception when a poll fails. Errors are swallowed so polling continues. Yields: Any: Each (optionally deduped) time series data frame result. Notes: - Uses `asyncio.to_thread` so provider calls never block the event loop. - Drift-corrected scheduling preserves cadence even with slow polls. - Stops when `until` is satisfied or `max_polls` is reached (if set). """ loop = asyncio.get_running_loop() last_key = object() polls = 0 next_tick = loop.time() if not fire_immediately: next_tick += interval while True: polls += 1 try: result = await asyncio.to_thread(_fetch_result, client, data_id, last_n) except BaseException as exc: if on_error: on_error(exc) else: key = distinct_by(result) if distinct_by else object() if distinct_by is None or key != last_key: last_key = key yield result if until and until(result): return if max_polls and polls >= max_polls: return # timing next_tick += interval delay = next_tick - loop.time() if delay < 0: missed = int((-delay) // interval) + 1 next_tick += missed * interval delay = next_tick - loop.time() if jitter: delay += random.uniform(0, max(0.0, min(jitter, interval))) await asyncio.sleep(delay)
[docs] def start_polling_thread( client, data_id: str, *, interval: float = 1.0, last_n: int | None = None, on_result: Callable[[Result], None], **kwargs, ) -> threading.Event: """Start the sync poller in a background thread and stream results to a callback. Wraps `iter_poll` in a daemon thread and invokes `on_result(result)` for each yielded item. Returns a `threading.Event` that can be set to stop polling gracefully. Args: client: API client instance forwarded to the provider. data_id: Identifier to fetch data for. interval: Seconds between polls. Defaults to 1.0. last_n: Last number of time series data points to poll for. None is all. on_result: Callback invoked with each yielded result. **kwargs: Additional keyword arguments forwarded to `iter_poll` (e.g., `distinct_by`, `until`, `max_polls`, `fire_immediately`, `jitter`, `on_error`). Returns: threading.Event: Event that, when set, requests the polling thread to stop. """ stop = threading.Event() def _runner(): for res in iter_poll( client, data_id, interval=interval, last_n=last_n, **kwargs ): if stop.is_set(): break on_result(res) t = threading.Thread(target=_runner, name=f"poll:{data_id}", daemon=True) t.start() return stop
[docs] def start_polling_task( client, data_id: str, *, interval: float = 1.0, last_n: int | None = None, on_result: Callable[[Result], Any] | None = None, **kwargs, ) -> asyncio.Task[None]: """Start the async poller as an `asyncio.Task` and stream results to a callback. Wraps `aiter_poll` in a background Task. If `on_result` returns a coroutine, it will be awaited before the next iteration. Args: client: API client instance forwarded to the provider. data_id: Identifier to fetch data for. interval: Seconds between polls. Defaults to 1.0. last_n: Last number of time series data points to poll for. None is all. on_result: Optional callback invoked with each yielded result. If it returns a coroutine, it will be awaited. **kwargs: Additional keyword arguments forwarded to `aiter_poll` (e.g., `distinct_by`, `until`, `max_polls`, `fire_immediately`, `jitter`, `on_error`). Returns: asyncio.Task[None]: A created and started Task. Cancel it to stop polling. Raises: RuntimeError: If no running event loop is available when called. """ async def _runner(): async for res in aiter_poll( client, data_id, interval=interval, last_n=last_n, **kwargs ): if on_result is None: continue maybe = on_result(res) if asyncio.iscoroutine(maybe): await maybe return asyncio.create_task(_runner(), name=f"poll:{data_id}")