Source code for CodeEntropy.levels.frame_dag
"""Frame-local DAG execution.
This module defines the frame-scoped DAG used during the MAP stage of the
hierarchy workflow. Each frame is processed independently to produce
frame-local outputs (e.g., axes and covariance data), which are later reduced
outside this DAG.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
import networkx as nx
from CodeEntropy.levels.nodes.covariance import FrameCovarianceNode
logger = logging.getLogger(__name__)
[docs]
@dataclass
class FrameContext:
"""Container for per-frame execution context.
Attributes:
shared: Shared workflow data (mutated across the full workflow).
frame_index: Absolute trajectory frame index being processed.
frame_covariance: Frame-local covariance output produced by FrameCovarianceNode.
data: Additional frame-local scratch space for nodes, if needed.
"""
shared: dict[str, Any]
frame_index: int
frame_covariance: Any = None
data: dict[str, Any] | None = None
[docs]
class FrameGraph:
"""Execute a frame-local directed acyclic graph.
The graph is run once per trajectory frame. Nodes may read shared inputs from
`ctx["shared"]` and must write only frame-local outputs into the frame context.
Expected node outputs:
- "frame_covariance"
"""
def __init__(self, universe_operations: Any | None = None) -> None:
"""Initialise a FrameGraph.
Args:
universe_operations: Optional adapter providing universe operations used
by frame-level nodes (e.g., selections / molecule containers).
"""
self._universe_operations = universe_operations
self._graph = nx.DiGraph()
self._nodes: dict[str, Any] = {}
[docs]
def build(self) -> FrameGraph:
"""Build the default frame DAG topology.
Returns:
Self, to allow fluent chaining.
"""
self._add("frame_covariance", FrameCovarianceNode())
return self
[docs]
def execute_frame(self, shared_data: dict[str, Any], frame_index: int) -> Any:
"""Execute the frame DAG for one selected analysis frame.
FrameGraph owns trajectory positioning for frame-local execution. Higher-level
orchestration passes explicit frame indices but must not rely on hidden
MDAnalysis cursor state.
Args:
shared_data: Shared workflow data dictionary. Must contain
``"frame_source"``.
frame_index: Frame index valid for the active analysis universe. During
this migration stage this is local to the frame-reduced universe.
Returns:
Frame-local covariance payload produced by ``FrameCovarianceNode``.
Raises:
KeyError: If ``"frame_source"`` is missing from ``shared_data``.
IndexError: If ``frame_index`` is outside trajectory bounds.
"""
frame_source = shared_data["frame_source"]
frame_index = int(frame_index)
try:
frame_source.seek(frame_index)
except IndexError as exc:
n_frames = len(frame_source.universe.trajectory)
raise IndexError(
f"Frame index {frame_index} is outside analysis trajectory bounds "
f"for trajectory with {n_frames} frames."
) from exc
ctx = self._make_frame_ctx(
shared_data=shared_data,
frame_index=frame_index,
)
for node_name in nx.topological_sort(self._graph):
logger.debug("[FrameGraph] running %s @ frame=%s", node_name, frame_index)
self._nodes[node_name].run(ctx)
return ctx["frame_covariance"]
def _add(self, name: str, node: Any, deps: list[str] | None = None) -> None:
"""Register a node and its dependencies in the DAG."""
self._nodes[name] = node
self._graph.add_node(name)
for dep in deps or []:
self._graph.add_edge(dep, name)
@staticmethod
def _make_frame_ctx(
shared_data: dict[str, Any], frame_index: int
) -> dict[str, Any]:
"""Create a frame context dictionary for node execution.
Notes:
- The context includes a reference to `shared_data` via "shared".
- The context is intentionally frame-scoped and should not be used as
a replacement for shared workflow state.
Args:
shared_data: Shared workflow data dict.
frame_index: Absolute trajectory frame index.
Returns:
Frame context dict with required keys.
"""
return {
"shared": shared_data,
"frame_index": frame_index,
"frame_covariance": None,
}