Source code for CodeEntropy.levels.execution.policy
"""Internal policy for hierarchy-level frame map-reduce execution.
Users provide compute resources. CodeEntropy keeps scheduling choices such as
chunk size and in-flight task limits internal so the public configuration remains
stable and simple.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Any
[docs]
@dataclass(frozen=True)
class ExecutionPolicy:
"""Internal policy for scalable, deterministic frame execution."""
target_frame_chunks_per_worker: int = 2
min_frame_chunk_size: int = 1
max_frame_chunk_size: int = 32
max_frame_in_flight_multiplier: int = 1
[docs]
def requested_worker_count(self, shared_data: dict[str, Any]) -> int:
"""Return the requested worker-process count.
Args:
shared_data: Shared workflow data that may contain ``args`` with local Dask
or HPC worker settings.
Returns:
The requested worker count, clamped to at least one.
"""
args = shared_data.get("args")
dask_workers = getattr(args, "dask_workers", None)
if dask_workers is not None:
return max(1, int(dask_workers))
if bool(getattr(args, "hpc", False)):
hpc_nodes = max(1, int(getattr(args, "hpc_nodes", 1) or 1))
hpc_processes = max(1, int(getattr(args, "hpc_processes", 1) or 1))
return hpc_nodes * hpc_processes
return 1
[docs]
def frame_chunk_size(self, shared_data: dict[str, Any], *, n_frames: int) -> int:
"""Choose a deterministic frame chunk size.
Args:
shared_data: Shared workflow data used to infer requested worker count.
n_frames: Number of selected frames for the current run.
Returns:
The frame chunk size clamped between the policy minimum and maximum.
"""
n_frames = max(1, int(n_frames))
workers = self.requested_worker_count(shared_data)
target_chunks = max(1, workers * int(self.target_frame_chunks_per_worker))
chunk_size = math.ceil(n_frames / target_chunks)
return max(
int(self.min_frame_chunk_size),
min(int(self.max_frame_chunk_size), int(chunk_size)),
)
[docs]
def max_frame_in_flight_tasks(
self,
shared_data: dict[str, Any],
*,
n_chunks: int,
) -> int:
"""Choose the maximum number of active frame-chunk tasks.
Args:
shared_data: Shared workflow data used to infer requested worker count.
n_chunks: Number of frame chunks available for submission.
Returns:
The number of frame-chunk tasks allowed to be active at once.
"""
workers = self.requested_worker_count(shared_data)
max_in_flight = workers * int(self.max_frame_in_flight_multiplier)
return max(1, min(int(n_chunks), int(max_in_flight)))