"""Benchmark evaluator module for SageMaker Model Evaluation.
This module provides benchmark evaluation capabilities for SageMaker models, supporting
various standard benchmarks like MMLU, BBH, MATH, and others. It handles benchmark
configuration, validation, and execution of evaluation pipelines.
"""
from __future__ import absolute_import
import logging
import re
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Type, Union
from pydantic import BaseModel, Field, validator
from sagemaker.core.resources import ModelPackageGroup
from .base_evaluator import BaseEvaluator
from .constants import EvalType
from .execution import EvaluationPipelineExecution
from sagemaker.core.telemetry.telemetry_logging import _telemetry_emitter
from sagemaker.core.telemetry.constants import Feature
from sagemaker.train.constants import get_sagemaker_hub_name
_logger = logging.getLogger(__name__)
def _is_placeholder(value) -> bool:
"""Check if a value is an unresolved template placeholder like '{{key}}'."""
return isinstance(value, str) and "{{" in value and "}}" in value
# Internal enums and classes - not meant for direct user access
class _Benchmark(str, Enum):
"""Internal benchmark types for model evaluation"""
MMLU = "mmlu"
MMLU_PRO = "mmlu_pro"
BBH = "bbh"
GPQA = "gpqa"
MATH = "math"
STRONG_REJECT = "strong_reject"
IFEVAL = "ifeval"
MMMU = "mmmu"
LLM_JUDGE = "llm_judge"
# Internal benchmark configuration mapping - using plain dictionaries
_BENCHMARK_CONFIG: Dict[_Benchmark, Dict[str, Any]] = {
_Benchmark.MMLU: {
"modality": "Text",
"description": "Multi-task Language Understanding – Tests knowledge across 57 subjects.",
"metrics": ["accuracy"],
"strategy": "zs_cot",
"subtask_available": True,
"subtasks": [
"abstract_algebra", "anatomy", "astronomy", "business_ethics",
"clinical_knowledge", "college_biology", "college_chemistry",
"college_computer_science", "college_mathematics", "college_medicine",
"college_physics", "computer_security", "conceptual_physics",
"econometrics", "electrical_engineering", "elementary_mathematics",
"formal_logic", "global_facts", "high_school_biology",
"high_school_chemistry", "high_school_computer_science",
"high_school_european_history", "high_school_geography",
"high_school_government_and_politics", "high_school_macroeconomics",
"high_school_mathematics", "high_school_microeconomics",
"high_school_physics", "high_school_psychology",
"high_school_statistics", "high_school_us_history",
"high_school_world_history", "human_aging", "human_sexuality",
"international_law", "jurisprudence", "logical_fallacies",
"machine_learning", "management", "marketing", "medical_genetics",
"miscellaneous", "moral_disputes", "moral_scenarios", "nutrition",
"philosophy", "prehistory", "professional_accounting",
"professional_law", "professional_medicine", "professional_psychology",
"public_relations", "security_studies", "sociology",
"us_foreign_policy", "virology", "world_religions"
]
},
_Benchmark.MMLU_PRO: {
"modality": "Text",
"description": "MMLU – Professional Subset – Focuses on professional domains such as law, medicine, accounting, and engineering.",
"metrics": ["accuracy"],
"strategy": "zs_cot",
"subtask_available": False,
"subtasks": None
},
_Benchmark.BBH: {
"modality": "Text",
"description": "Advanced Reasoning Tasks – A collection of challenging problems that test higher-level cognitive and problem-solving skills.",
"metrics": ["accuracy"],
"strategy": "fs_cot",
"subtask_available": True,
"subtasks": [
"boolean_expressions", "causal_judgement", "date_understanding",
"disambiguation_qa", "dyck_languages", "formal_fallacies",
"geometric_shapes", "hyperbaton", "logical_deduction_five_objects",
"logical_deduction_seven_objects", "logical_deduction_three_objects",
"movie_recommendation", "multistep_arithmetic_two", "navigate",
"object_counting", "penguins_in_a_table",
"reasoning_about_colored_objects", "ruin_names",
"salient_translation_error_detection", "snarks",
"sports_understanding", "temporal_sequences",
"tracking_shuffled_objects_five_objects",
"tracking_shuffled_objects_seven_objects",
"tracking_shuffled_objects_three_objects", "web_of_lies",
"word_sorting"
]
},
_Benchmark.GPQA: {
"modality": "Text",
"description": "General Physics Question Answering – Assesses comprehension of physics concepts and related problem-solving abilities.",
"metrics": ["accuracy"],
"strategy": "zs_cot",
"subtask_available": False,
"subtasks": None
},
_Benchmark.MATH: {
"modality": "Text",
"description": "Mathematical Problem Solving – Measures mathematical reasoning across topics including algebra, calculus, and word problems.",
"metrics": ["exact_match"],
"strategy": "zs_cot",
"subtask_available": True,
"subtasks": [
"algebra", "counting_and_probability", "geometry",
"intermediate_algebra", "number_theory", "prealgebra",
"precalculus"
]
},
_Benchmark.STRONG_REJECT: {
"modality": "Text",
"description": "Quality-Control Task – Tests the model's ability to detect and reject inappropriate, harmful, or incorrect content.",
"metrics": ["deflection"],
"strategy": "zs",
"subtask_available": True,
"subtasks": None # Documentation doesn't specify subtasks for strong_reject
},
_Benchmark.IFEVAL: {
"modality": "Text",
"description": "Instruction-Following Evaluation – Gauges how accurately a model follows given instructions and completes tasks to specification.",
"metrics": ["accuracy"],
"strategy": "zs",
"subtask_available": False,
"subtasks": None
},
_Benchmark.MMMU: {
"modality": "Multi-Modal",
"description": "Massive Multidiscipline Multimodal Understanding (MMMU) – College-level benchmark comprising multiple-choice and open-ended questions from 30 disciplines.",
"metrics": ["accuracy"],
"strategy": "zs_cot",
"subtask_available": True,
"subtasks": [
"Accounting", "Agriculture", "Architecture_and_Engineering",
"Art", "Art_Theory", "Basic_Medical_Science", "Biology",
"Chemistry", "Clinical_Medicine", "Computer_Science", "Design",
"Diagnostics_and_Laboratory_Medicine", "Economics", "Electronics",
"Energy_and_Power", "Finance", "Geography", "History",
"Literature", "Manage", "Marketing", "Materials", "Math",
"Mechanical_Engineering", "Music", "Pharmacy", "Physics",
"Psychology", "Public_Health", "Sociology"
]
},
_Benchmark.LLM_JUDGE: {
"modality": "Text",
"description": "LLM-as-a-Judge - Uses a user-selected judge model to judge a set of customer-provided inference responses.",
"metrics": ["all"],
"strategy": "judge",
"subtask_available": False,
"subtasks": None
},
}
# Public utility methods
[docs]
def get_benchmarks() -> Type[_Benchmark]:
"""Get the Benchmark enum for selecting available benchmarks.
This utility method provides access to the internal Benchmark enum,
allowing users to reference available benchmarks without directly
accessing internal implementation details.
Returns:
Type[_Benchmark]: The Benchmark enum class containing all available benchmarks.
Example:
.. code:: python
Benchmark = get_benchmarks()
evaluator = BenchMarkEvaluator(
benchmark=Benchmark.MMLU,
sagemaker_session=session,
s3_output_path="s3://bucket/output"
)
Note:
In the future, this will be extended to dynamically generate the
enum from a backend API call to fetch the latest available benchmarks.
"""
return _Benchmark
[docs]
def get_benchmark_properties(benchmark: _Benchmark) -> Dict[str, Any]:
"""Get properties for a specific benchmark.
This utility method returns the properties associated with a given benchmark
as a dictionary, including information about modality, metrics, strategy,
and available subtasks.
Args:
benchmark (_Benchmark): The benchmark to get properties for (from ``get_benchmarks()``).
Returns:
Dict[str, Any]: Dictionary containing benchmark properties with keys:
- ``modality`` (str): The modality type (e.g., "Text", "Multi-Modal")
- ``description`` (str): Description of the benchmark
- ``metrics`` (list[str]): List of supported metrics
- ``strategy`` (str): The evaluation strategy used
- ``subtask_available`` (bool): Whether subtasks are supported
- ``subtasks`` (Optional[list[str]]): List of available subtasks, if applicable
Raises:
ValueError: If the provided benchmark is not found in the configuration.
Example:
.. code:: python
Benchmark = get_benchmarks()
props = get_benchmark_properties(Benchmark.MMLU)
print(props['description'])
# 'Multi-task Language Understanding – Tests knowledge across 57 subjects.'
print(props['subtasks'][:3])
# ['abstract_algebra', 'anatomy', 'astronomy']
Note:
In the future, this will be extended to dynamically fetch benchmark
properties from a backend API call instead of using the internal static configuration.
"""
config = _BENCHMARK_CONFIG.get(benchmark)
if config is None:
raise ValueError(
f"Benchmark '{benchmark.value}' not found in configuration. "
f"Available benchmarks: {', '.join(b.value for b in _BENCHMARK_CONFIG.keys())}"
)
# Return a copy of the configuration dictionary
return config.copy()
[docs]
class BenchMarkEvaluator(BaseEvaluator):
"""Benchmark evaluator for standard model evaluation tasks.
This evaluator accepts a benchmark enum and automatically deduces the appropriate
metrics, strategy, and subtask availability based on the benchmark configuration.
Supports various standard benchmarks like MMLU, BBH, MATH, MMMU, and others.
Attributes:
benchmark (_Benchmark): Benchmark type from the Benchmark enum obtained via ``get_benchmarks()``.
Required. Use get_benchmarks() to access available benchmark types.
subtasks (Optional[Union[str, list[str]]]): Benchmark subtask(s) to evaluate. Defaults to
'ALL' for benchmarks that support subtasks. Can be a single subtask string, a list of
subtasks, or 'ALL' to run all subtasks. For benchmarks without subtask support, must
be None.
mlflow_resource_arn (Optional[str]): ARN of the MLflow tracking server for experiment tracking.
Optional. If not provided, the system will attempt to resolve it using the default
MLflow app experience (checks domain match, account default, or creates a new app).
Format: arn:aws:sagemaker:region:account:mlflow-tracking-server/name
evaluate_base_model (bool): Whether to evaluate the base model in addition to the custom
model. Set to False to skip base model evaluation and only evaluate the custom model.
Defaults to True (evaluates both models).
recipe (Optional[str]): Path to a user recipe YAML file (local or S3 URI) for evaluation
configuration. Optional. When provided, values are merged with overrides.
overrides (Optional[Dict[str, Any]]): Programmatic overrides dict (nested structure).
These take highest precedence over recipe file and base defaults.
region (Optional[str]): AWS region. Inherited from BaseEvaluator.
sagemaker_session (Optional[Any]): SageMaker session object. Inherited from BaseEvaluator.
model (Union[str, Any]): Model for evaluation. Inherited from BaseEvaluator.
base_eval_name (Optional[str]): Base name for evaluation jobs. Inherited from BaseEvaluator.
s3_output_path (str): S3 location for evaluation outputs. Inherited from BaseEvaluator.
mlflow_experiment_name (Optional[str]): MLflow experiment name. Inherited from BaseEvaluator.
mlflow_run_name (Optional[str]): MLflow run name. Inherited from BaseEvaluator.
networking (Optional[VpcConfig]): VPC configuration. Inherited from BaseEvaluator.
kms_key_id (Optional[str]): KMS key ID for encryption. Inherited from BaseEvaluator.
model_package_group (Optional[Union[str, ModelPackageGroup]]): Model package group.
Inherited from BaseEvaluator.
Example:
.. code:: python
# Get available benchmarks
Benchmark = get_benchmarks()
# Create evaluator with benchmark and subtasks
evaluator = BenchMarkEvaluator(
benchmark=Benchmark.MMLU,
subtasks=["abstract_algebra", "anatomy", "astronomy"],
model="llama3-2-1b-instruct",
s3_output_path="s3://bucket/outputs/",
mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)
# Run evaluation with configured subtasks
execution = evaluator.evaluate()
execution.wait()
# Or override subtasks at evaluation time
execution = evaluator.evaluate(subtask="abstract_algebra")
# With recipe file for evaluation config
evaluator = BenchMarkEvaluator(
benchmark=Benchmark.MMLU,
model="amazon-nova-pro-v2",
s3_output_path="s3://bucket/outputs/",
recipe="./eval-recipes/nova-pro-mmlu.yaml",
overrides={"inference": {"max_new_tokens": 4096, "temperature": 0}},
)
resolved = evaluator.get_resolved_recipe()
"""
benchmark: _Benchmark
subtasks: Optional[Union[str, List[str]]] = None
evaluate_base_model: bool = False
_hyperparameters: Optional[Any] = None
@validator('benchmark')
def _validate_benchmark_model_compatibility(cls, v, values):
"""Validate that benchmark is compatible with model type (Nova vs non-Nova)"""
from ..common_utils.recipe_utils import _is_nova_model
# Get resolved model info if available
resolved_info = values.get('_resolved_model_info')
if resolved_info and resolved_info.base_model_name:
base_model_name = resolved_info.base_model_name
is_nova = _is_nova_model(base_model_name)
benchmark_value = v.value
# mmmu is only allowed for Nova models
if benchmark_value == "mmmu" and not is_nova:
raise ValueError(
f"Benchmark 'mmmu' is only supported for Nova models. "
f"The current model '{base_model_name}' is not a Nova model."
)
# llm_judge is not allowed for Nova models
if benchmark_value == "llm_judge" and is_nova:
raise ValueError(
f"Benchmark 'llm_judge' is not supported for Nova models. "
f"The current model '{base_model_name}' is a Nova model."
)
return v
@validator('subtasks', always=True)
def _validate_subtasks(cls, v, values):
"""Validate that subtasks is provided when required and in correct format"""
if 'benchmark' in values:
benchmark = values['benchmark']
config = _BENCHMARK_CONFIG.get(benchmark)
if config and config.get("subtask_available"):
# Default to "ALL" if not provided for benchmarks that support subtasks
if v is None:
return "ALL"
# Validate format
if isinstance(v, list):
if len(v) == 0:
raise ValueError(
f"Subtask list cannot be empty for benchmark '{benchmark.value}'. "
f"Provide at least one subtask or use 'ALL'."
)
# Validate each subtask in the list
for subtask in v:
if not isinstance(subtask, str):
raise ValueError(
f"All subtasks in the list must be strings. "
f"Found {type(subtask).__name__}: {subtask}"
)
# Validate against available subtasks if defined
if config.get("subtasks") and subtask not in config["subtasks"]:
raise ValueError(
f"Invalid subtask '{subtask}' for benchmark '{benchmark.value}'. "
f"Available subtasks: {', '.join(config['subtasks'])}"
)
elif isinstance(v, str):
# Skip validation for "ALL" keyword
if v.upper() != "ALL":
# Validate single subtask against available subtasks if defined
if config.get("subtasks") and v not in config["subtasks"]:
raise ValueError(
f"Invalid subtask '{v}' for benchmark '{benchmark.value}'. "
f"Available subtasks: {', '.join(config['subtasks'])}"
)
else:
raise ValueError(
f"Subtask must be a string, a list of strings, or 'ALL'. "
f"Got {type(v).__name__}"
)
if config and not config.get("subtask_available") and v is not None:
raise ValueError(
f"Subtask is not supported for benchmark '{benchmark.value}'. "
f"Please set subtasks to None."
)
return v
def _get_eval_recipe_display_name_filter(self) -> str:
"""Prefer 'general text benchmark' recipes for BenchMarkEvaluator."""
return "benchmark"
@property
@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="BenchMarkEvaluator.hyperparameters")
def hyperparameters(self):
"""Get evaluation hyperparameters as a FineTuningOptions object.
This property provides access to evaluation hyperparameters with validation,
type checking, and user-friendly information display. Hyperparameters are
lazily loaded from the JumpStart Hub when first accessed.
Returns:
FineTuningOptions: Dynamic object with evaluation hyperparameters
Raises:
ValueError: If base model name is not available or if hyperparameters cannot be loaded
Example:
.. code:: python
evaluator = BenchMarkEvaluator(...)
# Access current values
print(evaluator.hyperparameters.temperature)
# Modify values (with validation)
evaluator.hyperparameters.temperature = 0.5
# Get as dictionary
params = evaluator.hyperparameters.to_dict()
# Display parameter information
evaluator.hyperparameters.get_info()
evaluator.hyperparameters.get_info('temperature')
"""
if self._hyperparameters is None:
from ..common import FineTuningOptions
from ..common_utils.recipe_utils import _get_evaluation_override_params, _extract_eval_override_options, _is_nova_model
# Get the hub content name from the base model
hub_content_name = self._base_model_name
if not hub_content_name:
raise ValueError(
"Base model name not available. Cannot load hyperparameters. "
"Ensure base_model is properly configured. "
"The base_model parameter must be set to a valid model identifier (e.g., JumpStart model ID, "
"model package ARN, or model ARN) to enable hyperparameter configuration."
)
# Get region
# region = (self.sagemaker_session.boto_region_name
# if hasattr(self.sagemaker_session, 'boto_region_name')
# else 'us-west-2')
region = self.region
# Determine evaluation type based on model and task
evaluation_type = "DeterministicEvaluation" # Default for non-Nova models
if _is_nova_model(hub_content_name):
# For Nova models, evaluation type depends on the task
task = self.benchmark.value
if task == "mmmu":
evaluation_type = "DeterministicMultiModalBenchmark"
else:
evaluation_type = "DeterministicTextBenchmark"
# Fetch override parameters from hub (let exceptions propagate)
_logger.info(f"Fetching evaluation override parameters for hyperparameters property")
# Extract boto_session from sagemaker_core Session
# HubContent.get() in recipe_utils expects boto3 session, not sagemaker_core Session
boto_session = (self.sagemaker_session.boto_session
if hasattr(self.sagemaker_session, 'boto_session')
else self.sagemaker_session)
override_params = _get_evaluation_override_params(
hub_content_name=hub_content_name,
hub_name=get_sagemaker_hub_name(),
evaluation_type=evaluation_type,
region=region,
session=boto_session
)
# Extract full parameter specifications
configurable_params = _extract_eval_override_options(override_params, return_full_spec=True)
# Create FineTuningOptions object from full specifications
self._hyperparameters = FineTuningOptions(configurable_params)
return self._hyperparameters
def _resolve_subtask_for_evaluation(self, subtask: Optional[Union[str, List[str]]]) -> Optional[Union[str, List[str]]]:
"""Resolve and validate subtask for evaluation.
Args:
subtask: Subtask parameter from evaluate() call
Returns:
Optional[Union[str, List[str]]]: Resolved subtask (uses constructor value if not provided)
Raises:
ValueError: If subtask is invalid for the benchmark
"""
# Use provided subtask or fall back to constructor subtasks
eval_subtask = subtask if subtask is not None else self.subtasks
if eval_subtask is None or (isinstance(eval_subtask, str) and eval_subtask.upper() == "ALL"):
#TODO : Check All Vs None subtask for evaluation
return None
# Validate the subtask
config = _BENCHMARK_CONFIG.get(self.benchmark)
if config and config.get("subtask_available"):
if isinstance(eval_subtask, str):
if eval_subtask.upper() != "ALL" and config.get("subtasks") and eval_subtask not in config["subtasks"]:
raise ValueError(
f"Invalid subtask '{eval_subtask}' for benchmark '{self.benchmark.value}'. "
f"Available subtasks: {', '.join(config['subtasks'])}"
)
elif isinstance(eval_subtask, list):
if len(eval_subtask) == 0:
raise ValueError(
f"Subtask list cannot be empty for benchmark '{self.benchmark.value}'. "
f"Provide at least one subtask or use 'ALL'."
)
# Validate each subtask in the list
for st in eval_subtask:
if config.get("subtasks") and st not in config["subtasks"]:
raise ValueError(
f"Invalid subtask '{st}' for benchmark '{self.benchmark.value}'. "
f"Available subtasks: {', '.join(config['subtasks'])}"
)
return eval_subtask
def _get_benchmark_template_additions(self, eval_subtask: Optional[Union[str, List[str]]],
config: Dict[str, Any]) -> dict:
"""Get benchmark-specific template context additions.
Args:
eval_subtask: Resolved subtask value
config: Benchmark configuration dictionary
Returns:
dict: Benchmark-specific template context fields
"""
from ..common_utils.recipe_utils import _is_nova_model
# Get effective hyperparameters (recipe/overrides take precedence if provided)
configured_params = self._get_effective_hyperparameters()
_logger.info(f"Using configured hyperparameters: {configured_params}")
# Determine if this is a Nova model
is_nova = _is_nova_model(self._base_model_name)
metric_key = 'metric' if is_nova else 'evaluation_metric'
# Build benchmark-specific context
benchmark_context = {
'task': self.benchmark.value,
'strategy': config["strategy"],
metric_key: config["metrics"][0] if config.get("metrics") else 'accuracy',
'evaluate_base_model': self.evaluate_base_model,
}
if isinstance(eval_subtask, str):
benchmark_context['subtask'] = eval_subtask
elif isinstance(eval_subtask, list):
# Convert list to comma-separated string
benchmark_context['subtask'] = ','.join(eval_subtask)
# Add all configured hyperparameters
for key in configured_params.keys():
benchmark_context[key] = configured_params[key]
return benchmark_context
[docs]
@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="BenchMarkEvaluator.evaluate")
def evaluate(self, subtask: Optional[Union[str, List[str]]] = None) -> EvaluationPipelineExecution:
"""Create and start a benchmark evaluation job.
Supports multiple compute backends via the ``compute`` parameter set at
construction time:
- **Serverless** (default): Runs via SageMaker Pipelines.
- **SMTJ**: Runs on user-managed instances via ModelTrainer.
- **HyperPod**: Submits to a HyperPod cluster via the HyperPod CLI.
Args:
subtask (Optional[Union[str, list[str]]]): Optional subtask(s) to evaluate. If not provided,
uses the subtasks from constructor. Can be a single subtask string, a list of
subtasks, or 'ALL' to run all subtasks.
Returns:
EvaluationPipelineExecution: The created benchmark evaluation execution.
Example:
.. code:: python
Benchmark = get_benchmarks()
evaluator = BenchMarkEvaluator(
benchmark=Benchmark.MMLU,
subtasks="ALL",
model="llama3-2-1b-instruct",
s3_output_path="s3://bucket/outputs/"
)
# Evaluate single subtask
execution = evaluator.evaluate(subtask="abstract_algebra")
# Evaluate multiple subtasks
execution = evaluator.evaluate(subtask=["abstract_algebra", "anatomy"])
# Evaluate all subtasks (uses constructor default)
execution = evaluator.evaluate()
"""
from sagemaker.core.training.configs import Compute, HyperPodCompute
# Dispatch based on compute type
# Validate platform compatibility (HP checkpoints must eval on HP, SMTJ on SMTJ)
from sagemaker.train.common_utils.finetune_utils import validate_eval_platform_compatibility
model_info = self._get_resolved_model_info()
model_path = getattr(model_info, 's3_model_path', None) if model_info else None
validate_eval_platform_compatibility(model_path, self.compute)
if isinstance(self.compute, Compute) and not isinstance(self.compute, HyperPodCompute):
return self._evaluate_serverful_smtj(subtask=subtask)
elif isinstance(self.compute, HyperPodCompute):
return self._evaluate_hyperpod(subtask=subtask)
# Default: serverless compute via SageMaker Pipelines
# S3 checkpoint paths are not supported on serverless — require SMTJ or HyperPod compute
from sagemaker.train.common_utils.model_resolution import _ModelType
info = self._get_resolved_model_info()
if info and info.model_type == _ModelType.S3_CHECKPOINT:
raise ValueError(
"S3 checkpoint paths cannot be used with serverless evaluation. "
"Please provide a 'compute' parameter (e.g., TrainingJobCompute or HyperPodCompute) "
"to run evaluation on dedicated instances."
)
from .pipeline_templates import DETERMINISTIC_TEMPLATE, DETERMINISTIC_TEMPLATE_BASE_MODEL_ONLY
# Resolve and validate subtask
eval_subtask = self._resolve_subtask_for_evaluation(subtask)
# Get benchmark configuration
config = _BENCHMARK_CONFIG.get(self.benchmark)
# Get AWS execution context (role ARN, region, account ID)
aws_context = self._get_aws_execution_context()
# Resolve model artifacts
artifacts = self._resolve_model_artifacts(aws_context['region'])
# Get or infer model_package_group ARN (handles all cases internally)
model_package_group_arn = self._get_model_package_group_arn()
# Log resolved model information for debugging
_logger.info(f"Resolved model info - base_model_name: {self._base_model_name}, base_model_arn: {self._base_model_arn}, source_model_package_arn: {self._source_model_package_arn}")
# Build base template context
template_context = self._get_base_template_context(
role_arn=aws_context['role_arn'],
region=aws_context['region'],
account_id=aws_context['account_id'],
model_package_group_arn=model_package_group_arn,
resolved_model_artifact_arn=artifacts['resolved_model_artifact_arn']
)
# Add benchmark-specific template additions
benchmark_additions = self._get_benchmark_template_additions(eval_subtask, config)
template_context.update(benchmark_additions)
# Add VPC and KMS configuration
template_context = self._add_vpc_and_kms_to_context(template_context)
# Select appropriate template
template_str = self._select_template(
DETERMINISTIC_TEMPLATE_BASE_MODEL_ONLY,
DETERMINISTIC_TEMPLATE
)
# Render pipeline definition
pipeline_definition = self._render_pipeline_definition(template_str, template_context)
# Generate execution name
name = self.base_eval_name or f"benchmark-eval-{self.benchmark.value}"
# Start execution
return self._start_execution(
eval_type=EvalType.BENCHMARK,
name=name,
pipeline_definition=pipeline_definition,
role_arn=aws_context['role_arn'],
region=aws_context['region']
)
[docs]
@classmethod
@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="BenchMarkEvaluator.get_all")
def get_all(
cls,
session: Optional[Any] = None,
region: Optional[str] = None
) -> Iterator[EvaluationPipelineExecution]:
"""Get all benchmark evaluation executions.
Uses ``EvaluationPipelineExecution.get_all()`` to retrieve all benchmark
evaluation executions as an iterator.
Args:
session (Optional[Any]): Optional boto3 session. If not provided, will be inferred.
region (Optional[str]): Optional AWS region. If not provided, will be inferred.
Yields:
EvaluationPipelineExecution: Benchmark evaluation execution instances.
Example:
.. code:: python
# Get all benchmark evaluations as iterator
eval_iter = BenchMarkEvaluator.get_all()
all_executions = list(eval_iter)
# Or iterate directly
for execution in BenchMarkEvaluator.get_all():
print(f"{execution.name}: {execution.status.overall_status}")
# With specific session/region
eval_iter = BenchMarkEvaluator.get_all(session=my_session, region='us-west-2')
all_executions = list(eval_iter)
"""
# Use EvaluationPipelineExecution.get_all() with BENCHMARK eval_type
# This returns a generator, so we yield from it
yield from EvaluationPipelineExecution.get_all(
eval_type=EvalType.BENCHMARK,
session=session,
region=region
)
def _evaluate_serverful_smtj(self, subtask=None):
"""Execute benchmark evaluation on SMTJ compute via ModelTrainer.
Fetches the evaluation recipe template from SageMaker Hub (filtered by
Type=Evaluation), injects benchmark-specific parameters, and launches
via ModelTrainer.from_recipe(). Follows the same Hub lookup pattern as
the Nova Forge SDK.
"""
from sagemaker.train.utils import _get_unique_name
# --- Validate platform compatibility ---
# HyperPod-trained checkpoints cannot be evaluated on SMTJ
from sagemaker.train.common_utils.model_resolution import _ModelType, _detect_checkpoint_platform, _CheckpointPlatform
info = self._get_resolved_model_info()
if info and info.model_type == _ModelType.S3_CHECKPOINT and info.s3_model_path:
checkpoint_platform = _detect_checkpoint_platform(info.s3_model_path)
if checkpoint_platform == _CheckpointPlatform.HYPERPOD:
raise ValueError(
f"HyperPod-trained checkpoints cannot be evaluated on SMTJ compute. "
f"The checkpoint at '{info.s3_model_path}' was trained on HyperPod. "
f"Please use HyperPodCompute for evaluation instead:\n\n"
f" compute=HyperPodCompute(\n"
f" cluster_name='your-cluster',\n"
f" namespace='kubeflow',\n"
f" instance_type='ml.p5.48xlarge',\n"
f" node_count=1,\n"
f" )"
)
# --- Common setup ---
sagemaker_session, role, region = self._get_smtj_session_and_role()
# --- Find SMTJ evaluation recipe ---
smtj_eval_recipes = self._get_smtj_eval_recipes(sagemaker_session, region)
# For standard benchmarks (MMLU, BBH, etc.), filter for "general text benchmark"
benchmark_recipes = [
r for r in smtj_eval_recipes
if "general text benchmark" in r.get("DisplayName", "").lower()
]
# Fall back to first available SMTJ eval recipe if no benchmark match
recipe_metadata = benchmark_recipes[0] if benchmark_recipes else smtj_eval_recipes[0]
# Get recipe template S3 URI and image URI
training_image = self.training_image or recipe_metadata.get("SmtjImageUri")
# --- Download and load recipe ---
recipe_dict, recipe_tmp_path = self._download_and_load_recipe(
recipe_metadata["SmtjRecipeTemplateS3Uri"], sagemaker_session
)
# --- Fetch the Hub-declared overridable field set + defaults ---
# The recipe's SmtjOverrideParamsS3Uri declares which fields it accepts
# and their defaults/types. Driving the field set from here (instead of a
# hardcoded list) means a model whose recipe exposes different fields is
# handled automatically. Mirrors the Nova Forge SDK RecipeBuilder.
override_spec = self._download_eval_override_spec(recipe_metadata, sagemaker_session)
# --- Inject benchmark-specific fields ---
config = _BENCHMARK_CONFIG.get(self.benchmark)
base_job_name = self.base_eval_name or f"eval-{self.benchmark.value}"
task_value = str(self.benchmark.value)
strategy_value = config["strategy"]
metric_value = config["metrics"][0] if config.get("metrics") else "accuracy"
# --- Resolve subtask value ---
eval_subtask = self._resolve_subtask_for_evaluation(subtask)
if eval_subtask:
subtask_value = ",".join(eval_subtask) if isinstance(eval_subtask, list) else eval_subtask
else:
subtask_value = ""
# --- Resolve model path (fine-tuned checkpoint or OSS base weights) ---
# OSS eval requires model_name_or_path to point at the base model weights
# when there is no fine-tuned checkpoint; Nova eval resolves via model_type.
# OSS artifacts are delivered via a dedicated "model" input channel so the
# container's checkpoints/hf_merged resolution runs against a local mount
# (reproducing the serverless experience); Nova keeps the raw S3 path.
model_path, model_channel = self._resolve_eval_model_input(
sagemaker_session, region
)
if not model_path and self._source_model_package_arn:
raise ValueError(
f"Could not resolve S3 model artifacts path from model package "
f"'{self._source_model_package_arn}'. SMTJ evaluation requires an S3 "
f"checkpoint path. Ensure the model package was created from a SMTJ "
f"training job with accessible model artifacts."
)
# --- Build the SDK-derived semantic values ---
# The metric field name differs across recipe families (Nova: 'metric',
# OpenWeights: 'evaluation_metric'); set both aliases so the value lands
# in whichever leaf the recipe actually declares. Likewise infra fields:
# Nova recipes use 'output_s3_path', the OSS eval recipe uses 'output_path'
# and also carries 'base_model_name' / 'instance_count' (run.replicas).
# The eval container requires non-empty experiment and run names whenever
# an MLflow tracking URI is set (OSS); Nova tolerates empty names but
# still benefits from a meaningful default. Default both to base_job_name
# for a consistent MLflow experience. See
# BaseEvaluator._resolve_mlflow_tracking_fields.
mlflow_tracking_uri, mlflow_experiment_name, mlflow_run_name = (
self._resolve_mlflow_tracking_fields(base_job_name)
)
semantic_values = {
"name": _get_unique_name(base_job_name),
"output_s3_path": self.s3_output_path or "",
"output_path": self.s3_output_path or "",
"base_model_name": self._base_model_name or "",
"instance_count": self.compute.instance_count,
"kms_key_id": self.kms_key_id or "",
"mlflow_tracking_uri": mlflow_tracking_uri,
"mlflow_experiment_name": mlflow_experiment_name,
"mlflow_run_name": mlflow_run_name,
"task": task_value,
"strategy": strategy_value,
"metric": metric_value,
"evaluation_metric": metric_value,
"subtask": subtask_value,
# Standard benchmarks do not use a custom dataset or Lambda processor.
"data_s3_path": "",
"lambda_arn": "",
"preset_reward_function": "",
}
if model_path:
semantic_values["model_name_or_path"] = model_path
# --- Merge: spec defaults < semantic values < user overrides ---
value_map = self._build_eval_value_map(
override_spec, semantic_values=semantic_values, user_overrides=self.overrides
)
# --- Inject values by leaf-key name / placeholder (schema-agnostic) ---
self._apply_eval_recipe_values(recipe_dict, value_map)
# Nova recipes carry infra fields in a `run` section; ensure they are
# present even if the template omitted a key (preserves prior behavior).
# Scoped to Nova: OSS recipes use different infra field names
# (output_path, output.mlflow_*), which the spec/injection already
# covered — adding Nova-style keys here would pollute the OSS recipe.
from ..common_utils.recipe_utils import _is_nova_model
if "run" in recipe_dict and _is_nova_model(self._base_model_name):
run = recipe_dict["run"]
run.setdefault("name", semantic_values["name"])
run.setdefault("output_s3_path", semantic_values["output_s3_path"])
run.setdefault("mlflow_tracking_uri", semantic_values["mlflow_tracking_uri"])
run.setdefault("mlflow_experiment_name", semantic_values["mlflow_experiment_name"])
run.setdefault("mlflow_run_name", semantic_values["mlflow_run_name"])
if model_path:
run.setdefault("model_name_or_path", model_path)
# --- Common: resolve any remaining inference placeholders ---
# Fallback for inference fields the override spec did not declare.
self._resolve_inference_placeholders(recipe_dict)
# Blank any remaining optional infra placeholders (e.g. tensorboard dir,
# mlflow run id) so the recipe has no unresolved {{...}} tokens.
blanked = self._blank_unresolved_placeholders(recipe_dict)
if blanked:
_logger.warning(
f"Blanked unresolved eval recipe placeholders (not declared in the "
f"override spec or set by the SDK): {blanked}"
)
# --- Common: write recipe and submit ---
input_data_config = [model_channel] if model_channel else None
return self._write_and_submit_smtj_recipe(
recipe_dict, recipe_tmp_path, training_image, sagemaker_session, role, base_job_name,
input_data_config=input_data_config,
)
def _evaluate_hyperpod(self, subtask=None):
"""Execute benchmark evaluation on HyperPod cluster.
Builds benchmark-specific override parameters (eval task, subtask)
and delegates to BaseEvaluator._submit_hyperpod_eval_job().
"""
override_parameters = {}
# Eval task config
eval_subtask = self._resolve_subtask_for_evaluation(subtask)
override_parameters["recipes.evaluation.task"] = str(self.benchmark.value)
if eval_subtask:
if isinstance(eval_subtask, list):
override_parameters["recipes.evaluation.subtask"] = ",".join(eval_subtask)
else:
override_parameters["recipes.evaluation.subtask"] = eval_subtask
# User-provided overrides (e.g. inference params)
if self.overrides:
override_parameters.update(self.overrides)
return self._submit_hyperpod_eval_job(
override_parameters=override_parameters,
base_job_name=f"eval-{self.benchmark.value}",
)