Source code for CodeEntropy.entropy.graph
"""Entropy graph orchestration.
This module defines `EntropyGraph`, a small directed acyclic graph (DAG) that
executes entropy calculation nodes in dependency order.
The graph is intentionally simple:
* Vibrational entropy
* Configurational entropy
* Aggregation of results
The nodes themselves encapsulate the detailed calculations.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
import networkx as nx
from CodeEntropy.entropy.nodes.aggregate import AggregateEntropyNode
from CodeEntropy.entropy.nodes.configurational import ConfigurationalEntropyNode
from CodeEntropy.entropy.nodes.orientational import OrientationalEntropyNode
from CodeEntropy.entropy.nodes.vibrational import VibrationalEntropyNode
logger = logging.getLogger(__name__)
SharedData = dict[str, Any]
[docs]
@dataclass(frozen=True)
class NodeSpec:
"""Specification for a node within the entropy graph.
Attributes:
name: Unique node name.
node: Node instance. Must implement `run(shared_data, **kwargs)`.
deps: Optional list of node names that must run before this node.
"""
name: str
node: Any
deps: tuple[str, ...] = ()
[docs]
class EntropyGraph:
"""Build and execute the entropy calculation DAG.
The graph is built once via `build()` and executed via `execute()`.
Examples:
graph = EntropyGraph().build()
results = graph.execute(shared_data)
"""
def __init__(self) -> None:
"""Initialize an empty entropy graph."""
self._graph: nx.DiGraph = nx.DiGraph()
self._nodes: dict[str, Any] = {}
[docs]
def build(self) -> EntropyGraph:
"""Populate the graph with the standard entropy workflow.
Returns:
Self for fluent chaining.
"""
specs = (
NodeSpec("vibrational_entropy", VibrationalEntropyNode()),
NodeSpec("configurational_entropy", ConfigurationalEntropyNode()),
NodeSpec("orientational_entropy", OrientationalEntropyNode()),
NodeSpec(
"aggregate_entropy",
AggregateEntropyNode(),
deps=(
"vibrational_entropy",
"configurational_entropy",
"orientational_entropy",
),
),
)
for spec in specs:
self._add_node(spec)
return self
[docs]
def execute(
self, shared_data: SharedData, *, progress: object | None = None
) -> dict[str, Any]:
"""Execute the entropy graph in topological order.
Nodes are executed in dependency order (topological sort). Each node reads
from and may mutate `shared_data`. Dict-like outputs returned by nodes are
merged into a single results dictionary.
This method intentionally does *not* create a progress bar/task for the
entropy graph itself because the graph is typically very fast. If a progress
sink is provided, it is forwarded to nodes that accept it.
Args:
shared_data: Mutable shared data dictionary passed to each node.
progress: Optional progress sink (e.g., from ResultsReporter.progress()).
Forwarded to node `run()` methods that accept a `progress` keyword.
Returns:
Dictionary containing merged dict outputs produced by nodes. On key
collision, later nodes overwrite earlier keys.
Raises:
KeyError: If a node name is missing from the internal node registry.
"""
results: dict[str, Any] = {}
for node_name in nx.topological_sort(self._graph):
node = self._nodes[node_name]
if progress is not None:
try:
out = node.run(shared_data, progress=progress)
except TypeError:
out = node.run(shared_data)
else:
out = node.run(shared_data)
if isinstance(out, dict):
results.update(out)
return results
def _add_node(self, spec: NodeSpec) -> None:
"""Add a node and its dependencies to the graph.
Args:
spec: Node specification.
Raises:
ValueError: If a duplicate node name is added.
"""
if spec.name in self._nodes:
raise ValueError(f"Duplicate node name: {spec.name}")
self._nodes[spec.name] = spec.node
self._graph.add_node(spec.name)
for dep in spec.deps:
self._graph.add_edge(dep, spec.name)