Source code for CodeEntropy.levels.graph.frame_dag

"""Frame-local DAG execution.

This module defines the frame-scoped DAG used during the MAP stage of the
hierarchy workflow. Each selected frame is processed independently to produce
frame-local observable outputs, which are reduced outside this DAG.

FrameGraph owns trajectory positioning. It does not own scheduling, chunking, or
reduction.
"""

from __future__ import annotations

from typing import Any

import networkx as nx

from CodeEntropy.levels.nodes.covariance import FrameCovarianceNode


[docs] class FrameGraph: """Execute the frame-local directed acyclic graph.""" def __init__(self, universe_operations: Any | None = None) -> None: """Initialise a frame-local DAG. Args: universe_operations: Optional universe-operation adapter retained for frame graph construction consistency. """ self._universe_operations = universe_operations self._graph = nx.DiGraph() self._nodes: dict[str, Any] = {}
[docs] def build(self) -> FrameGraph: """Build the default frame-local graph topology. Returns: The current ``FrameGraph`` instance for fluent construction. """ self._add("frame_covariance", FrameCovarianceNode()) return self
[docs] def execute_frame(self, shared_data: dict[str, Any], frame_index: int) -> Any: """Execute frame-local nodes for one selected frame. Args: shared_data: Shared workflow data containing ``frame_source``. frame_index: Frame index in the active analysis frame-source space. Returns: The frame covariance payload produced by the frame-local covariance node. Raises: KeyError: If ``frame_source`` is missing from ``shared_data``. IndexError: If ``frame_index`` is outside the active trajectory bounds. """ frame_source = shared_data["frame_source"] frame_index = int(frame_index) try: frame_source.seek(frame_index) except IndexError as exc: n_frames = len(frame_source.universe.trajectory) raise IndexError( f"Frame index {frame_index} is outside analysis trajectory bounds " f"for trajectory with {n_frames} frames." ) from exc ctx = self._make_frame_ctx(shared_data=shared_data, frame_index=frame_index) for node_name in nx.topological_sort(self._graph): self._nodes[node_name].run(ctx) return ctx["frame_covariance"]
def _add(self, name: str, node: Any, deps: list[str] | None = None) -> None: """Register a frame-local node and dependency edges. Args: name: Unique node name in the frame DAG. node: Node object exposing a ``run`` method. deps: Optional upstream node names that must run before ``name``. """ self._nodes[name] = node self._graph.add_node(name) for dep in deps or []: self._graph.add_edge(dep, name) @staticmethod def _make_frame_ctx( shared_data: dict[str, Any], frame_index: int, ) -> dict[str, Any]: """Build a frame-local execution context. Args: shared_data: Shared workflow data referenced by frame-local nodes. frame_index: Frame index currently being executed. Returns: A frame context dictionary containing shared data, frame index, and a placeholder for frame covariance output. """ return { "shared": shared_data, "frame_index": frame_index, "frame_covariance": None, }