Source code for CodeEntropy.levels.frame_dag

"""Frame-local DAG execution.

This module defines the frame-scoped DAG used during the MAP stage of the
hierarchy workflow. Each frame is processed independently to produce
frame-local outputs (e.g., axes and covariance data), which are later reduced
outside this DAG.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import Any

import networkx as nx

from CodeEntropy.levels.nodes.covariance import FrameCovarianceNode

logger = logging.getLogger(__name__)


[docs] @dataclass class FrameContext: """Container for per-frame execution context. Attributes: shared: Shared workflow data (mutated across the full workflow). frame_index: Absolute trajectory frame index being processed. frame_covariance: Frame-local covariance output produced by FrameCovarianceNode. data: Additional frame-local scratch space for nodes, if needed. """ shared: dict[str, Any] frame_index: int frame_covariance: Any = None data: dict[str, Any] | None = None
[docs] class FrameGraph: """Execute a frame-local directed acyclic graph. The graph is run once per trajectory frame. Nodes may read shared inputs from `ctx["shared"]` and must write only frame-local outputs into the frame context. Expected node outputs: - "frame_covariance" """ def __init__(self, universe_operations: Any | None = None) -> None: """Initialise a FrameGraph. Args: universe_operations: Optional adapter providing universe operations used by frame-level nodes (e.g., selections / molecule containers). """ self._universe_operations = universe_operations self._graph = nx.DiGraph() self._nodes: dict[str, Any] = {}
[docs] def build(self) -> FrameGraph: """Build the default frame DAG topology. Returns: Self, to allow fluent chaining. """ self._add("frame_covariance", FrameCovarianceNode()) return self
[docs] def execute_frame(self, shared_data: dict[str, Any], frame_index: int) -> Any: """Execute the frame DAG for one selected analysis frame. FrameGraph owns trajectory positioning for frame-local execution. Higher-level orchestration passes explicit frame indices but must not rely on hidden MDAnalysis cursor state. Args: shared_data: Shared workflow data dictionary. Must contain ``"frame_source"``. frame_index: Frame index valid for the active analysis universe. During this migration stage this is local to the frame-reduced universe. Returns: Frame-local covariance payload produced by ``FrameCovarianceNode``. Raises: KeyError: If ``"frame_source"`` is missing from ``shared_data``. IndexError: If ``frame_index`` is outside trajectory bounds. """ frame_source = shared_data["frame_source"] frame_index = int(frame_index) try: frame_source.seek(frame_index) except IndexError as exc: n_frames = len(frame_source.universe.trajectory) raise IndexError( f"Frame index {frame_index} is outside analysis trajectory bounds " f"for trajectory with {n_frames} frames." ) from exc ctx = self._make_frame_ctx( shared_data=shared_data, frame_index=frame_index, ) for node_name in nx.topological_sort(self._graph): logger.debug("[FrameGraph] running %s @ frame=%s", node_name, frame_index) self._nodes[node_name].run(ctx) return ctx["frame_covariance"]
def _add(self, name: str, node: Any, deps: list[str] | None = None) -> None: """Register a node and its dependencies in the DAG.""" self._nodes[name] = node self._graph.add_node(name) for dep in deps or []: self._graph.add_edge(dep, name) @staticmethod def _make_frame_ctx( shared_data: dict[str, Any], frame_index: int ) -> dict[str, Any]: """Create a frame context dictionary for node execution. Notes: - The context includes a reference to `shared_data` via "shared". - The context is intentionally frame-scoped and should not be used as a replacement for shared workflow state. Args: shared_data: Shared workflow data dict. frame_index: Absolute trajectory frame index. Returns: Frame context dict with required keys. """ return { "shared": shared_data, "frame_index": frame_index, "frame_covariance": None, }