Source code for CodeEntropy.entropy.nodes.aggregate

"""Aggregates entropy outputs produced by upstream DAG nodes."""

from __future__ import annotations

from collections.abc import Mapping, MutableMapping
from dataclasses import dataclass
from typing import Any

EntropyResults = dict[str, Any]


[docs] @dataclass(frozen=True, slots=True) class AggregateEntropyNode: """Aggregate entropy results into a single shared output object. This node is intentionally small and single-purpose: it gathers previously-computed entropy components from `shared_data` and writes a canonical `shared_data["entropy_results"]` mapping. Attributes: vibrational_key: Key in `shared_data` where vibrational entropy is stored. configurational_key: Key in `shared_data` where configurational entropy is stored. output_key: Key in `shared_data` where the aggregated mapping is written. """ vibrational_key: str = "vibrational_entropy" configurational_key: str = "configurational_entropy" orientational_key: str = "orientational_entropy" output_key: str = "entropy_results"
[docs] def run( self, shared_data: MutableMapping[str, Any], **_: Any ) -> dict[str, EntropyResults]: """Run the aggregation step. Args: shared_data: Shared workflow state. Must contain (or may contain) keys for vibrational and configurational entropy results. Returns: A dict containing a single key, `"entropy_results"`, which maps to the aggregated results dict. Side Effects: Writes the aggregated results to `shared_data[self.output_key]`. Notes: This node does not validate the shapes/types of upstream results. Validation should live with the producer nodes (single responsibility). """ results = self._collect_entropy_results(shared_data) shared_data[self.output_key] = results return {self.output_key: results}
def _collect_entropy_results( self, shared_data: Mapping[str, Any] ) -> EntropyResults: """Collect entropy results from shared data. Args: shared_data: Shared workflow state. Returns: A mapping with keys `"vibrational_entropy"` and `"configurational_entropy"`. """ return { "vibrational_entropy": self._get_optional( shared_data, self.vibrational_key ), "configurational_entropy": self._get_optional( shared_data, self.configurational_key ), "orientational_entropy": self._get_optional( shared_data, self.orientational_key ), } @staticmethod def _get_optional(shared_data: Mapping[str, Any], key: str) -> Any | None: """Fetch an optional value from shared data. Args: shared_data: Shared workflow state. key: Key to fetch. Returns: The value if present, otherwise None. """ return shared_data.get(key)