SageMaker Train

Contents

SageMaker Train#

Training capabilities including model training, hyperparameter tuning, and distributed training.

Model Training#

SageMaker Python SDK Train Module.

Distributed Training#

Distributed module.

class sagemaker.train.distributed.DistributedConfig[source]#

Bases: BaseConfig, ABC

Abstract base class for distributed training configurations.

This class defines the interface that all distributed training configurations must implement. It provides a standardized way to specify driver scripts and their locations for distributed training jobs.

abstract property driver_dir: str#

Directory containing the driver script.

This property should return the path to the directory containing the driver script, relative to the container’s working directory.

Returns:

Path to directory containing the driver script

Return type:

str

abstract property driver_script: str#

Name of the driver script.

This property should return the name of the Python script that implements the distributed training driver logic.

Returns:

Name of the driver script file

Return type:

str

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class sagemaker.train.distributed.MPI(*, process_count_per_node: int | None = None, mpi_additional_options: List[str] | None = None)[source]#

Bases: DistributedConfig

MPI.

The MPI class configures a job that uses mpirun in the backend to launch distributed training.

Parameters:
  • process_count_per_node (int) – The number of processes to run on each node in the training job. Will default to the number of GPUs available in the container.

  • mpi_additional_options (Optional[str]) – The custom MPI options to use for the training job.

property driver_dir: str#

Directory containing the driver script.

Returns:

Path to directory containing the driver script

Return type:

str

property driver_script: str#

Name of the driver script.

Returns:

Name of the driver script

Return type:

str

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

mpi_additional_options: List[str] | None#
process_count_per_node: int | None#
class sagemaker.train.distributed.SMP(*, hybrid_shard_degree: int | None = None, sm_activation_offloading: bool | None = None, activation_loading_horizon: int | None = None, fsdp_cache_flush_warnings: bool | None = None, allow_empty_shards: bool | None = None, tensor_parallel_degree: int | None = None, context_parallel_degree: int | None = None, expert_parallel_degree: int | None = None, random_seed: int | None = None)[source]#

Bases: BaseConfig

SMP.

This class is used for configuring the SageMaker Model Parallelism v2 parameters. For more information on the model parallelism parameters, see: https://docs.aws.amazon.com/sagemaker/latest/dg/distributed-model-parallel-v2-reference.html#distributed-model-parallel-v2-reference-init-config

Parameters:
  • hybrid_shard_degree (Optional[int]) – Specifies a sharded parallelism degree for the model.

  • sm_activation_offloading (Optional[bool]) – Specifies whether to enable the SMP activation offloading implementation.

  • activation_loading_horizon (Optional[int]) – An integer specifying the activation offloading horizon type for FSDP. This is the maximum number of checkpointed or offloaded layers whose inputs can be in the GPU memory simultaneously.

  • fsdp_cache_flush_warnings (Optional[bool]) – Detects and warns if cache flushes happen in the PyTorch memory manager, because they can degrade computational performance.

  • allow_empty_shards (Optional[bool]) – Whether to allow empty shards when sharding tensors if tensor is not divisible. This is an experimental fix for crash during checkpointing in certain scenarios. Disabling this falls back to the original PyTorch behavior.

  • tensor_parallel_degree (Optional[int]) – Specifies a tensor parallelism degree. The value must be between 1 and world_size.

  • context_parallel_degree (Optional[int]) – Specifies the context parallelism degree. The value must be between 1 and world_size , and must be <= hybrid_shard_degree.

  • expert_parallel_degree (Optional[int]) – Specifies a expert parallelism degree. The value must be between 1 and world_size.

  • random_seed (Optional[int]) – A seed number for the random operations in distributed modules by SMP tensor parallelism or expert parallelism.

activation_loading_horizon: int | None#
allow_empty_shards: bool | None#
context_parallel_degree: int | None#
expert_parallel_degree: int | None#
fsdp_cache_flush_warnings: bool | None#
hybrid_shard_degree: int | None#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

random_seed: int | None#
sm_activation_offloading: bool | None#
tensor_parallel_degree: int | None#
class sagemaker.train.distributed.Torchrun(*, process_count_per_node: int | None = None, smp: SMP | None = None)[source]#

Bases: DistributedConfig

Torchrun.

The Torchrun class configures a job that uses torchrun or torch.distributed.launch in the backend to launch distributed training.

Parameters:
  • process_count_per_node (int) – The number of processes to run on each node in the training job. Will default to the number of GPUs available in the container.

  • smp (Optional[SMP]) – The SageMaker Model Parallelism v2 parameters.

property driver_dir: str#

Directory containing the driver script.

Returns:

Path to directory containing the driver script

Return type:

str

property driver_script: str#

Name of the driver script.

Returns:

Name of the driver script file

Return type:

str

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'validate_assignment': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

process_count_per_node: int | None#
smp: SMP | None#

Multi-Turn RL Training#

MultiTurnRLTrainer — trainer for Agentic Reinforcement Fine-Tuning (Multi-Turn RL) jobs.

class sagemaker.train.multi_turn_rl_trainer.MultiTurnRLTrainer(model: str | ModelPackage, agent_env: str | CustomAgentLambda, training_dataset: str | DataSet | None = None, mlflow_app_arn: str | MlflowApp | None = None, s3_output_path: str | None = None, output_model_package_group: str | ModelPackageGroup | None = None, intermediate_checkpoint_model_package_group: str | ModelPackageGroup | None = None, validation_dataset: str | DataSet | None = None, bedrock_agentcore_qualifier: str = 'DEFAULT', mlflow_experiment_name: str | None = None, mlflow_run_name: str | None = None, networking: VpcConfig | None = None, kms_key_arn: str | None = None, accept_eula: bool = False, **kwargs)[source]#

Bases: BaseTrainer

Trainer for Agentic Reinforcement Fine-Tuning (Multi-Turn RL) jobs.

Uses CreateJob API (not CreateTrainingJob) with a JobConfigDocument JSON string.

Example:

from sagemaker.train.multi_turn_rl_trainer import MultiTurnRLTrainer

trainer = MultiTurnRLTrainer(
    model="huggingface-reasoning-qwen3-32b",
    agent_env="arn:aws:bedrock-agentcore::us-west-2:123456789012:runtime/AGENTID",
    training_dataset="s3://my-bucket/",
    output_model_package_group="arn:aws:sagemaker:us-west-2:123456789012:model-package-group/grp",
    mlflow_app_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-app/srv",
    s3_output_path="s3://my-bucket/output/",
    accept_eula=True,
)
job = trainer.train()
Parameters:
  • model – JumpStart model ID string or JumpStart hub content Model ARN.

  • agent_env – Bedrock AgentCore ARN, agent runtime ID, Lambda ARN, or CustomAgentLambda. When a bare agent runtime ID is provided (e.g. "myRuntime-aBcDeFgHiJ"), it is resolved to the full ARN via GetAgentRuntime.

  • training_dataset – S3 URI, DataSet object, or DataSet ARN string (optional). Must be provided at __init__ or train() time.

  • mlflow_app_arn – MLflow app ARN or MlflowApp object (optional). If not specified, uses the default MLflow experience.

  • s3_output_path – S3 path for output artifacts (optional). If not specified, defaults to s3://sagemaker-<region>-<account>/output.

  • output_model_package_group – ModelPackageGroup object or ARN string (optional).

  • intermediate_checkpoint_model_package_group – ModelPackageGroup object or ARN string for intermediate checkpoints (optional). If not provided, auto-creates {model_name}-mtrl-checkpoint-mpg. Must differ from output_model_package_group.

  • validation_dataset – S3 URI, DataSet object, or DataSet ARN string (optional).

  • bedrock_agentcore_qualifier – Bedrock AgentCore qualifier (default: "DEFAULT").

  • mlflow_experiment_name – MLflow experiment name (optional).

  • mlflow_run_name – MLflow run name (optional).

  • networking – VpcConfig for the job (optional).

  • kms_key_arn – KMS key ID for output encryption (optional).

  • accept_eula – Boolean for EULA acceptance (optional).

  • **kwargs – Passed to BaseTrainer (sagemaker_session, role, base_job_name, tags).

classmethod attach(job_name: str, session=None) AgentRFTJob[source]#

Attach to an existing Agentic RFT job by name.

Parameters:
  • job_name – The name of the job.

  • session – Optional boto3 session.

Returns:

AgentRFTJob wrapping the existing job.

static list_bedrock_agentcore_runtimes(session=None) list[dict][source]#

List Bedrock AgentCore runtimes.

Parameters:

session – Optional boto3 session.

Returns:

List of dicts, each with keys name, runtime_id, arn, and status.

static list_supported_models(session=None) list[str][source]#

Return the list of supported model names.

Queries SageMakerPublicHub to discover all models with MTRL recipes in their RecipeCollection.

Parameters:

session – Optional boto3 session.

Returns:

List of hub content model names supporting MTRL.

property output_model_package_arn: str | None#

The output model package ARN from the latest completed training job.

train(training_dataset: str | DataSet | None = None, wait: bool = True) AgentRFTJob[source]#

Launch an Agentic RFT job.

Parameters:
  • training_dataset – Training dataset override.

  • wait – If True (default), block until job reaches terminal status.

Returns:

AgentRFTJob instance for tracking the job.

AgentRFTJob — wrapper around sagemaker-core Job for AgentRFT job category.

class sagemaker.train.agent_rft_job.AgentRFTJob(job: Job)[source]#

Bases: object

Wrapper around sagemaker-core Job for AgentRFT job category.

Delegates lifecycle methods to the underlying Job and adds typed convenience properties by parsing the JobConfigDocument JSON string.

Parameters:

job – The sagemaker-core Job instance to wrap.

JOB_CATEGORY = 'AgentRFT'#
property agent_config: dict | None#

Full AgentConfig section from JobConfigDocument.

property billable_token_usage: dict | None#

Billable token usage from ServiceOutput.

Returns dict with keys: TrainTokenCount, PrefillTokenCount, SampleTokenCount.

property creation_time#
delete()[source]#

Delete the job via DeleteJob API.

description: str | None#
property end_time#
property failure_reason: str | None#
classmethod from_job(job: Job) AgentRFTJob[source]#

Create an AgentRFTJob from a sagemaker-core Job instance.

classmethod get(job_name: str, session=None) AgentRFTJob[source]#

Attach to an existing AgentRFT job by name.

Parameters:
  • job_name – The name of the job.

  • session – Optional boto3 session.

Returns:

AgentRFTJob wrapping the existing job.

classmethod get_all(session=None, **kwargs)[source]#

List all AgentRFT jobs.

Delegates to Job.get_all with job_category pre-filled. Additional keyword arguments (e.g. creation_time_after, name_contains, sort_by, sort_order, status_equals) are forwarded.

Parameters:
  • session – Optional boto3 session.

  • **kwargs – Additional filter arguments forwarded to Job.get_all.

Yields:

AgentRFTJob instances.

get_mlflow_url() str | None[source]#

Generate a fresh presigned MLflow URL for this job’s experiment/run.

In Jupyter notebooks, also renders a clickable link.

Returns:

Presigned URL string, or None if MLflow is not configured.

get_training_metrics() list[dict][source]#

Fetch per-step MTRL training metrics from MLflow.

Retrieves rollout/reward/mean, rollout/turns/mean, training/total_tokens, and training/num_trajectories for each training step and prints a summary table.

Returns:

List of dicts, one per step, with keys step, rollout/reward/mean, rollout/turns/mean, training/total_tokens, and training/num_trajectories.

property job_arn: str#
property job_name: str#
property job_status: str#
property last_modified_time#
property mlflow_details: dict | None#

MLflow experiment/run details from ServiceOutput.

Returns dict with keys: ExperimentName, RunName, ExperimentId, RunId.

property output_model_package_arn: str | None#

ARN of the output model package from ServiceOutput, or None.

property progress_info: dict | None#

Training progress from ServiceOutput.

Supports two formats: - Epoch-based: dict with MaxEpoch, StepsPerEpoch, CurrentEpoch, CurrentStep. - Step-only: dict with MaxSteps, CurrentStep.

Returns None if not available.

refresh()[source]#

Refresh job state from DescribeJob API.

property s3_output_path: str | None#

S3 output path from OutputDataConfig.

property secondary_status: str#
property secondary_status_transitions: list#
stop()[source]#

Stop the job via StopJob API.

property training_config: dict | None#

Full TrainingConfig section from JobConfigDocument.

wait(poll: int = 5, timeout: int | None = 3000, max_log_lines: int = 20)[source]#

Wait for job to reach terminal status.

Parameters:
  • poll – Seconds between polls.

  • timeout – Maximum seconds to wait.

  • max_log_lines – Maximum number of log lines to display. Defaults to 20.

wait_for_delete()[source]#

Wait for job deletion to complete.

CustomAgentLambda — Lambda-based agent environment for Agentic RFT.

class sagemaker.train.custom_agent_lambda.CustomAgentLambda(lambda_arn: str)[source]#

Bases: object

Lambda-based agent environment for Agentic RFT.

Creates and wraps Lambda functions that serve as agent environments or bridges between SageMaker and custom agent environments (e.g., LangSmith, EKS, Fargate).

Parameters:

lambda_arn – ARN of the Lambda function.

classmethod create(source: str, function_name: str | None = None, role: str | None = None, runtime: str = 'python3.12', handler: str = 'lambda_function.handler', timeout: int = 900, memory_size: int = 256, environment: dict | None = None, sagemaker_session=None) CustomAgentLambda[source]#

Create a new Lambda function and return an CustomAgentLambda.

The source parameter accepts three formats:

  • S3 URI (s3://bucket/key.zip): deploys from an S3 artifact.

  • Local file path: reads the file, packages it as a zip, and uploads.

  • Inline code string: packages the raw code as a zip and uploads.

Detection order: S3 URI → existing local path → inline code.

Parameters:
  • source – S3 URI, local file path, or inline Python code string.

  • function_name – Lambda function name. If not provided, a unique name is generated automatically.

  • role – IAM role ARN for the Lambda execution role.

  • runtime – Lambda runtime (default: "python3.12").

  • handler – Lambda handler (default: "lambda_function.handler").

  • timeout – Lambda timeout in seconds (default: 900).

  • memory_size – Lambda memory in MB (default: 256).

  • environment – Dict of environment variables for the Lambda.

  • sagemaker_session – Optional SageMaker session for role resolution.

Returns:

CustomAgentLambda wrapping the created Lambda ARN.

Raises:

ValueError – If source is empty.

classmethod get(lambda_arn: str) CustomAgentLambda[source]#

Wrap an existing Lambda ARN.

Validates the Lambda exists by calling GetFunction.

Parameters:

lambda_arn – ARN of an existing Lambda function.

Returns:

CustomAgentLambda wrapping the Lambda ARN.

Raises:

botocore.exceptions.ClientError – If the Lambda does not exist.

Model Evaluation#

SageMaker Model Evaluation Module.

This module provides comprehensive evaluation capabilities for SageMaker models:

Classes:
  • BaseEvaluator: Abstract base class for all evaluators

  • BenchMarkEvaluator: Standard benchmark evaluations

  • CustomScorerEvaluator: Custom scorer and preset metrics evaluations

  • LLMAsJudgeEvaluator: LLM-as-judge evaluations

  • EvaluationPipelineExecution: Pipeline-based evaluation execution implementation

  • PipelineExecutionStatus: Combined status with step details and failure reason

  • StepDetail: Individual pipeline step information

class sagemaker.train.evaluate.BaseEvaluator(*, region: str | None = None, role: str | None = None, sagemaker_session: Any | None = None, model: str | BaseTrainer | AgentRFTJob | ModelPackage, base_eval_name: str | None = None, s3_output_path: str, mlflow_resource_arn: str | None = None, mlflow_experiment_name: str | None = None, mlflow_run_name: str | None = None, networking: VpcConfig | None = None, kms_key_id: str | None = None, model_package_group: str | ModelPackageGroup | None = None)[source]#

Bases: BaseModel

Base class for SageMaker model evaluators.

Provides common functionality for all evaluators including model resolution, MLflow integration, and AWS resource configuration. Subclasses must implement the evaluate() method.

region#

AWS region for evaluation jobs. If not provided, will use SAGEMAKER_REGION env var or default region.

Type:

Optional[str]

role#

IAM execution role ARN for SageMaker pipeline and training jobs. If not provided, will be derived from the session’s caller identity. Use this when running outside SageMaker-managed environments (e.g., local notebooks, CI/CD) where the caller identity is not a SageMaker-assumable role.

Type:

Optional[str]

sagemaker_session#

SageMaker session object. If not provided, a default session will be created automatically.

Type:

Optional[Any]

model#

Model for evaluation. Can be: - JumpStart model ID (str): e.g., ‘llama3-2-1b-instruct’ - ModelPackage object: A fine-tuned model package - ModelPackage ARN (str): e.g., ‘arn:aws:sagemaker:region:account:model-package/name/version’ - BaseTrainer object: A completed training job (i.e., it must have _latest_training_job with output_model_package_arn populated)

Type:

Union[str, Any]

base_eval_name#

Optional base name for evaluation jobs. This name is used as the PipelineExecutionDisplayName when creating the SageMaker pipeline execution. The actual display name will be “{base_eval_name}-{timestamp}”. This parameter can be used to cross-reference the pipeline execution ARN with a human-readable display name in the SageMaker console. If not provided, a unique name will be generated automatically in the format “eval-{model_name}-{uuid}”.

Type:

Optional[str]

s3_output_path#

S3 location for evaluation outputs. Required.

Type:

str

mlflow_resource_arn#

MLflow resource ARN for experiment tracking. Optional. If not provided, the system will attempt to resolve it using the default MLflow app experience (checks domain match, account default, or creates a new app). Supported formats: - MLflow tracking server: arn:aws:sagemaker:region:account:mlflow-tracking-server/name - MLflow app: arn:aws:sagemaker:region:account:mlflow-app/app-id

Type:

Optional[str]

mlflow_experiment_name#

Optional MLflow experiment name for tracking evaluation runs.

Type:

Optional[str]

mlflow_run_name#

Optional MLflow run name for tracking individual evaluation executions.

Type:

Optional[str]

networking#

VPC configuration for evaluation jobs. Accepts a sagemaker_core.shapes.VpcConfig object with security_group_ids and subnets attributes. When provided, evaluation jobs will run within the specified VPC for enhanced security and access to private resources.

Type:

Optional[VpcConfig]

kms_key_id#

AWS KMS key ID for encrypting output data. When provided, evaluation job outputs will be encrypted using this KMS key for enhanced data security.

Type:

Optional[str]

model_package_group#

Model package group. Accepts: 1. ARN string (e.g., ‘arn:aws:sagemaker:region:account:model-package-group/name’) 2. ModelPackageGroup object (ARN will be extracted from model_package_group_arn attribute) 3. Model package group name string (will fetch the object and extract ARN) Required when model is a JumpStart model ID. Optional when model is a ModelPackage ARN/object (will be inferred automatically).

Type:

Optional[Union[str, ModelPackageGroup]]

class Config[source]#

Bases: object

arbitrary_types_allowed = True#
base_eval_name: str | None#
evaluate() Any[source]#

Create and start an evaluation execution.

This method must be implemented by subclasses to define the specific evaluation logic for different evaluation types (benchmark, custom scorer, LLM-as-judge, etc.).

Returns:

The created evaluation execution object.

Return type:

EvaluationPipelineExecution

Raises:

NotImplementedError – This is an abstract method that must be implemented by subclasses.

Example

>>> # In a subclass implementation
>>> class CustomEvaluator(BaseEvaluator):
...     def evaluate(self):
...         # Create pipeline definition
...         pipeline_definition = self._build_pipeline()
...         # Start execution
...         return EvaluationPipelineExecution.start(...)
kms_key_id: str | None#
mlflow_experiment_name: str | None#
mlflow_resource_arn: str | None#
mlflow_run_name: str | None#
model: str | BaseTrainer | AgentRFTJob | ModelPackage#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_package_group: str | ModelPackageGroup | None#
networking: VpcConfig | None#
region: str | None#
role: str | None#
s3_output_path: str#
sagemaker_session: Any | None#
class sagemaker.train.evaluate.BenchMarkEvaluator(*, region: str | None = None, role: str | None = None, sagemaker_session: Any | None = None, model: str | BaseTrainer | AgentRFTJob | ModelPackage, base_eval_name: str | None = None, s3_output_path: str, mlflow_resource_arn: str | None = None, mlflow_experiment_name: str | None = None, mlflow_run_name: str | None = None, networking: VpcConfig | None = None, kms_key_id: str | None = None, model_package_group: str | ModelPackageGroup | None = None, benchmark: _Benchmark, subtasks: str | List[str] | None = None, evaluate_base_model: bool = False)[source]#

Bases: BaseEvaluator

Benchmark evaluator for standard model evaluation tasks.

This evaluator accepts a benchmark enum and automatically deduces the appropriate metrics, strategy, and subtask availability based on the benchmark configuration. Supports various standard benchmarks like MMLU, BBH, MATH, MMMU, and others.

benchmark#

Benchmark type from the Benchmark enum obtained via get_benchmarks(). Required. Use get_benchmarks() to access available benchmark types.

Type:

_Benchmark

subtasks#

Benchmark subtask(s) to evaluate. Defaults to ‘ALL’ for benchmarks that support subtasks. Can be a single subtask string, a list of subtasks, or ‘ALL’ to run all subtasks. For benchmarks without subtask support, must be None.

Type:

Optional[Union[str, list[str]]]

mlflow_resource_arn#

ARN of the MLflow tracking server for experiment tracking. Optional. If not provided, the system will attempt to resolve it using the default MLflow app experience (checks domain match, account default, or creates a new app). Format: arn:aws:sagemaker:region:account:mlflow-tracking-server/name

Type:

Optional[str]

evaluate_base_model#

Whether to evaluate the base model in addition to the custom model. Set to False to skip base model evaluation and only evaluate the custom model. Defaults to True (evaluates both models).

Type:

bool

region#

AWS region. Inherited from BaseEvaluator.

Type:

Optional[str]

sagemaker_session#

SageMaker session object. Inherited from BaseEvaluator.

Type:

Optional[Any]

model#

Model for evaluation. Inherited from BaseEvaluator.

Type:

Union[str, Any]

base_eval_name#

Base name for evaluation jobs. Inherited from BaseEvaluator.

Type:

Optional[str]

s3_output_path#

S3 location for evaluation outputs. Inherited from BaseEvaluator.

Type:

str

mlflow_experiment_name#

MLflow experiment name. Inherited from BaseEvaluator.

Type:

Optional[str]

mlflow_run_name#

MLflow run name. Inherited from BaseEvaluator.

Type:

Optional[str]

networking#

VPC configuration. Inherited from BaseEvaluator.

Type:

Optional[VpcConfig]

kms_key_id#

KMS key ID for encryption. Inherited from BaseEvaluator.

Type:

Optional[str]

model_package_group#

Model package group. Inherited from BaseEvaluator.

Type:

Optional[Union[str, ModelPackageGroup]]

Example

# Get available benchmarks
Benchmark = get_benchmarks()

# Create evaluator with benchmark and subtasks
evaluator = BenchMarkEvaluator(
    benchmark=Benchmark.MMLU,
    subtasks=["abstract_algebra", "anatomy", "astronomy"],
    model="llama3-2-1b-instruct",
    s3_output_path="s3://bucket/outputs/",
    mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)

# Run evaluation with configured subtasks
execution = evaluator.evaluate()
execution.wait()

# Or override subtasks at evaluation time
execution = evaluator.evaluate(subtask="abstract_algebra")
base_eval_name: str | None#
benchmark: _Benchmark#
evaluate(subtask: str | List[str] | None = None) EvaluationPipelineExecution[source]#

Create and start a benchmark evaluation job.

Parameters:

subtask (Optional[Union[str, list[str]]]) – Optional subtask(s) to evaluate. If not provided, uses the subtasks from constructor. Can be a single subtask string, a list of subtasks, or ‘ALL’ to run all subtasks.

Returns:

The created benchmark evaluation execution.

Return type:

EvaluationPipelineExecution

Example

Benchmark = get_benchmarks()
evaluator = BenchMarkEvaluator(
    benchmark=Benchmark.MMLU,
    subtasks="ALL",
    model="llama3-2-1b-instruct",
    s3_output_path="s3://bucket/outputs/"
)

# Evaluate single subtask
execution = evaluator.evaluate(subtask="abstract_algebra")

# Evaluate multiple subtasks
execution = evaluator.evaluate(subtask=["abstract_algebra", "anatomy"])

# Evaluate all subtasks (uses constructor default)
execution = evaluator.evaluate()
evaluate_base_model: bool#
classmethod get_all(session: Any | None = None, region: str | None = None) Iterator[EvaluationPipelineExecution][source]#

Get all benchmark evaluation executions.

Uses EvaluationPipelineExecution.get_all() to retrieve all benchmark evaluation executions as an iterator.

Parameters:
  • session (Optional[Any]) – Optional boto3 session. If not provided, will be inferred.

  • region (Optional[str]) – Optional AWS region. If not provided, will be inferred.

Yields:

EvaluationPipelineExecution – Benchmark evaluation execution instances.

Example

# Get all benchmark evaluations as iterator
eval_iter = BenchMarkEvaluator.get_all()
all_executions = list(eval_iter)

# Or iterate directly
for execution in BenchMarkEvaluator.get_all():
    print(f"{execution.name}: {execution.status.overall_status}")

# With specific session/region
eval_iter = BenchMarkEvaluator.get_all(session=my_session, region='us-west-2')
all_executions = list(eval_iter)
property hyperparameters#

Get evaluation hyperparameters as a FineTuningOptions object.

This property provides access to evaluation hyperparameters with validation, type checking, and user-friendly information display. Hyperparameters are lazily loaded from the JumpStart Hub when first accessed.

Returns:

Dynamic object with evaluation hyperparameters

Return type:

FineTuningOptions

Raises:

ValueError – If base model name is not available or if hyperparameters cannot be loaded

Example

evaluator = BenchMarkEvaluator(...)

# Access current values
print(evaluator.hyperparameters.temperature)

# Modify values (with validation)
evaluator.hyperparameters.temperature = 0.5

# Get as dictionary
params = evaluator.hyperparameters.to_dict()

# Display parameter information
evaluator.hyperparameters.get_info()
evaluator.hyperparameters.get_info('temperature')
kms_key_id: str | None#
mlflow_experiment_name: str | None#
mlflow_resource_arn: str | None#
mlflow_run_name: str | None#
model: str | BaseTrainer | AgentRFTJob | ModelPackage#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_package_group: str | ModelPackageGroup | None#
model_post_init(context: Any, /) None#

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

networking: VpcConfig | None#
region: str | None#
role: str | None#
s3_output_path: str#
sagemaker_session: Any | None#
subtasks: str | List[str] | None#
class sagemaker.train.evaluate.CustomScorerEvaluator(*, region: str | None = None, role: str | None = None, sagemaker_session: Any | None = None, model: str | BaseTrainer | AgentRFTJob | ModelPackage, base_eval_name: str | None = None, s3_output_path: str, mlflow_resource_arn: str | None = None, mlflow_experiment_name: str | None = None, mlflow_run_name: str | None = None, networking: VpcConfig | None = None, kms_key_id: str | None = None, model_package_group: str | ModelPackageGroup | None = None, evaluator: str | Any, dataset: Any, evaluate_base_model: bool = False)[source]#

Bases: BaseEvaluator

Custom scorer evaluation job for preset or custom evaluator metrics.

This evaluator supports both preset metrics (via built-in metrics enum) and custom evaluator implementations for specialized evaluation needs.

evaluator#

Built-in metric enum value, Evaluator object, or Evaluator ARN string. Required. Use get_builtin_metrics() for available preset metrics.

Type:

Union[str, Any]

dataset#

Dataset for evaluation. Required. Accepts S3 URI, Dataset ARN, or DataSet object.

Type:

Any

mlflow_resource_arn#

ARN of the MLflow tracking server for experiment tracking. Optional. If not provided, the system will attempt to resolve it using the default MLflow app experience (checks domain match, account default, or creates a new app). Inherited from BaseEvaluator.

Type:

Optional[str]

evaluate_base_model#

Whether to evaluate the base model in addition to the custom model. Set to False to skip base model evaluation and only evaluate the custom model. Defaults to True (evaluates both models).

Type:

bool

region#

AWS region. Inherited from BaseEvaluator.

Type:

Optional[str]

sagemaker_session#

SageMaker session object. Inherited from BaseEvaluator.

Type:

Optional[Any]

model#

Model for evaluation. Inherited from BaseEvaluator.

Type:

Union[str, Any]

base_eval_name#

Base name for evaluation jobs. Inherited from BaseEvaluator.

Type:

Optional[str]

s3_output_path#

S3 location for evaluation outputs. Inherited from BaseEvaluator.

Type:

str

mlflow_experiment_name#

MLflow experiment name. Inherited from BaseEvaluator.

Type:

Optional[str]

mlflow_run_name#

MLflow run name. Inherited from BaseEvaluator.

Type:

Optional[str]

networking#

VPC configuration. Inherited from BaseEvaluator.

Type:

Optional[VpcConfig]

kms_key_id#

KMS key ID for encryption. Inherited from BaseEvaluator.

Type:

Optional[str]

model_package_group#

Model package group. Inherited from BaseEvaluator.

Type:

Optional[Union[str, ModelPackageGroup]]

Example

from sagemaker.train.evaluate.custom_scorer_evaluator import (
    CustomScorerEvaluator,
    get_builtin_metrics
)
from sagemaker.ai_registry.evaluator import Evaluator

# Using preset metric
BuiltInMetric = get_builtin_metrics()
evaluator = CustomScorerEvaluator(
    evaluator=BuiltInMetric.PRIME_MATH,
    dataset=my_dataset,
    base_model="my-model",
    s3_output_path="s3://bucket/output",
    mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)

# Using custom evaluator
my_evaluator = Evaluator.create(
    name="my-custom-evaluator",
    function_source="/path/to/evaluator.py",
    sub_type="AWS/Evaluator"
)
evaluator = CustomScorerEvaluator(
    evaluator=my_evaluator,
    dataset=my_dataset,
    base_model="my-model",
    s3_output_path="s3://bucket/output",
    mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)

# Using evaluator ARN string
evaluator = CustomScorerEvaluator(
    evaluator="arn:aws:sagemaker:us-west-2:123456789012:hub-content/AIRegistry/Evaluator/my-evaluator/1",
    dataset=my_dataset,
    base_model="my-model",
    s3_output_path="s3://bucket/output",
    mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server"
)

job = evaluator.evaluate()
base_eval_name: str | None#
dataset: Any#
evaluate() EvaluationPipelineExecution[source]#

Create and start a custom scorer evaluation job.

Returns:

The created custom scorer evaluation execution

Return type:

EvaluationPipelineExecution

Example

evaluator = CustomScorerEvaluator(
    evaluator=BuiltInMetric.CODE_EXECUTIONS,
    dataset=my_dataset,
    base_model="my-model",
    s3_output_path="s3://bucket/output",
    mlflow_resource_arn="arn:..."
)
execution = evaluator.evaluate()
execution.wait()
evaluate_base_model: bool#
evaluator: str | Any#
classmethod get_all(session: Any | None = None, region: str | None = None)[source]#

Get all custom scorer evaluation executions.

Uses EvaluationPipelineExecution.get_all() to retrieve all custom scorer evaluation executions as an iterator.

Parameters:
  • session (Optional[Any]) – Optional boto3 session. If not provided, will be inferred.

  • region (Optional[str]) – Optional AWS region. If not provided, will be inferred.

Yields:

EvaluationPipelineExecution – Custom scorer evaluation execution instances

Example

# Get all custom scorer evaluations as iterator
evaluations = CustomScorerEvaluator.get_all()
all_executions = list(evaluations)

# Or iterate directly
for execution in CustomScorerEvaluator.get_all():
    print(f"{execution.name}: {execution.status.overall_status}")

# With specific session/region
evaluations = CustomScorerEvaluator.get_all(session=my_session, region='us-west-2')
all_executions = list(evaluations)
property hyperparameters#

Get evaluation hyperparameters as a FineTuningOptions object.

This property provides access to evaluation hyperparameters with validation, type checking, and user-friendly information display. Hyperparameters are lazily loaded from the JumpStart Hub when first accessed.

Returns:

Dynamic object with evaluation hyperparameters

Return type:

FineTuningOptions

Raises:

ValueError – If base model name is not available or if hyperparameters cannot be loaded

Example

evaluator = CustomScorerEvaluator(...)

# Access current values
print(evaluator.hyperparameters.temperature)

# Modify values (with validation)
evaluator.hyperparameters.temperature = 0.5

# Get as dictionary
params = evaluator.hyperparameters.to_dict()

# Display parameter information
evaluator.hyperparameters.get_info()
evaluator.hyperparameters.get_info('temperature')
kms_key_id: str | None#
mlflow_experiment_name: str | None#
mlflow_resource_arn: str | None#
mlflow_run_name: str | None#
model: str | BaseTrainer | AgentRFTJob | ModelPackage#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_package_group: str | ModelPackageGroup | None#
model_post_init(context: Any, /) None#

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

networking: VpcConfig | None#
region: str | None#
role: str | None#
s3_output_path: str#
sagemaker_session: Any | None#
class sagemaker.train.evaluate.EvaluationPipelineExecution(*, arn: str | None = None, name: str, status: ~sagemaker.train.evaluate.execution.PipelineExecutionStatus = <factory>, last_modified_time: ~datetime.datetime | None = None, eval_type: ~sagemaker.train.evaluate.constants.EvalType | None = None, s3_output_path: str | None = None, steps: ~typing.List[~typing.Dict[str, ~typing.Any]] = <factory>)[source]#

Bases: BaseModel

Manages SageMaker pipeline-based evaluation execution lifecycle.

This class wraps SageMaker Pipeline execution to provide a simplified interface for running, monitoring, and managing evaluation jobs. Users typically don’t instantiate this class directly, but receive instances from evaluator classes.

Example

from sagemaker.train.evaluate import BenchmarkEvaluator
from sagemaker.train.evaluate.execution import EvaluationPipelineExecution

# Start evaluation through evaluator
evaluator = BenchmarkEvaluator(...)
execution = evaluator.evaluate()

# Monitor execution
print(f"Status: {execution.status.overall_status}")
print(f"Steps: {len(execution.status.step_details)}")

# Wait for completion
execution.wait()

# Display results
execution.show_results()

# Retrieve past executions
all_executions = list(EvaluationPipelineExecution.get_all())
specific_execution = EvaluationPipelineExecution.get(arn="arn:...")
Parameters:
  • arn (Optional[str]) – ARN of the pipeline execution.

  • name (str) – Name of the evaluation execution.

  • status (PipelineExecutionStatus) – Combined status with step details and failure reason.

  • last_modified_time (Optional[datetime]) – Last modification timestamp.

  • eval_type (Optional[EvalType]) – Type of evaluation (BENCHMARK, CUSTOM_SCORER, LLM_AS_JUDGE).

  • s3_output_path (Optional[str]) – S3 location where evaluation results are stored.

  • steps (List[Dict[str, Any]]) – Raw step information from SageMaker.

class Config[source]#

Bases: object

arbitrary_types_allowed = True#
arn: str | None#
eval_type: EvalType | None#
classmethod get(arn: str, session: Session | None = None, region: str | None = None) EvaluationPipelineExecution[source]#

Get a sagemaker pipeline execution instance by ARN.

Parameters:
  • arn (str) – ARN of the pipeline execution.

  • session (Optional[Session]) – Boto3 session. Will be inferred if not provided.

  • region (Optional[str]) – AWS region. Will be inferred if not provided.

Returns:

Retrieved pipeline execution instance.

Return type:

EvaluationPipelineExecution

Raises:

ClientError – If AWS service call fails.

Example

# Get execution by ARN
arn = "arn:aws:sagemaker:us-west-2:123456789012:pipeline/eval-pipeline/execution/abc123"
execution = EvaluationPipelineExecution.get(arn=arn)
print(execution.status.overall_status)
classmethod get_all(eval_type: EvalType | None = None, session: Session | None = None, region: str | None = None)[source]#

Get all pipeline executions, optionally filtered by evaluation type.

Searches for existing pipelines using prefix and tag validation, then retrieves executions from those pipelines.

Parameters:
  • eval_type (Optional[EvalType]) – Evaluation type to filter by (e.g., EvalType.BENCHMARK). If None, returns executions from all evaluation pipelines.

  • session (Optional[Session]) – Boto3 session. Will be inferred if not provided.

  • region (Optional[str]) – AWS region. Will be inferred if not provided.

Yields:

EvaluationPipelineExecution – Pipeline execution instances.

Example

# Get all evaluation executions as iterator
iter = EvaluationPipelineExecution.get_all()
all_executions = list(iter)

# Get only benchmark evaluations
iter = EvaluationPipelineExecution.get_all(eval_type=EvalType.BENCHMARK)
benchmark_executions = list(iter)
last_modified_time: datetime | None#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str#
refresh() None[source]#

Describe a pipeline execution and update job status

s3_output_path: str | None#
classmethod start(eval_type: EvalType, name: str, pipeline_definition: str, role_arn: str, s3_output_path: str | None = None, session: Session | None = None, region: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | None = []) EvaluationPipelineExecution[source]#

Create sagemaker pipeline execution. Optionally creates pipeline.

Parameters:
  • eval_type (EvalType) – Type of evaluation (BENCHMARK, CUSTOM_SCORER, LLM_AS_JUDGE).

  • name (str) – Name for the evaluation execution.

  • pipeline_definition (str) – Complete rendered pipeline definition as JSON string.

  • role_arn (str) – IAM role ARN for pipeline execution.

  • s3_output_path (Optional[str]) – S3 location where evaluation results are stored.

  • session (Optional[Session]) – Boto3 session for API calls.

  • region (Optional[str]) – AWS region for the pipeline.

  • tags (Optional[List[TagsDict]]) – List of tags to include in pipeline

Returns:

Started pipeline execution instance.

Return type:

EvaluationPipelineExecution

Raises:
  • ValueError – If pipeline_definition is not valid JSON.

  • ClientError – If AWS service call fails.

status: PipelineExecutionStatus#
steps: List[Dict[str, Any]]#
stop() None[source]#

Stop a pipeline execution

wait(target_status: Literal['Executing', 'Stopping', 'Stopped', 'Failed', 'Succeeded'] = 'Succeeded', poll: int = 5, timeout: int | None = None) None[source]#

Wait for a pipeline execution to reach certain status.

This method provides a hybrid implementation that works in both Jupyter notebooks and terminal environments, with appropriate visual feedback for each.

Parameters:
  • target_status – The status to wait for

  • poll – The number of seconds to wait between each poll

  • timeout – The maximum number of seconds to wait before timing out

class sagemaker.train.evaluate.LLMAsJudgeEvaluator(*, region: str | None = None, role: str | None = None, sagemaker_session: Any | None = None, model: str | BaseTrainer | AgentRFTJob | ModelPackage, base_eval_name: str | None = None, s3_output_path: str, mlflow_resource_arn: str | None = None, mlflow_experiment_name: str | None = None, mlflow_run_name: str | None = None, networking: VpcConfig | None = None, kms_key_id: str | None = None, model_package_group: str | ModelPackageGroup | None = None, evaluator_model: str, dataset: str | Any, builtin_metrics: List[str] | None = None, custom_metrics: str | None = None, evaluate_base_model: bool = False)[source]#

Bases: BaseEvaluator

LLM-as-judge evaluation job.

This evaluator uses foundation models to evaluate LLM responses based on various quality and responsible AI metrics.

This feature is powered by Amazon Bedrock Evaluations. Your use of this feature is subject to pricing of Amazon Bedrock Evaluations, the Service Terms applicable to Amazon Bedrock, and the terms that apply to your usage of third-party models. Amazon Bedrock Evaluations may securely transmit data across AWS Regions within your geography for processing. For more information, access Amazon Bedrock Evaluations documentation.

Documentation: https://docs.aws.amazon.com/bedrock/latest/userguide/evaluation-judge.html

evaluator_model#

AWS Bedrock foundation model identifier to use as the judge. Required. For supported models, see: https://docs.aws.amazon.com/bedrock/latest/userguide/evaluation-judge.html#evaluation-judge-supported

Type:

str

dataset#

Evaluation dataset. Required. Accepts: - S3 URI (str): e.g., ‘s3://bucket/path/dataset.jsonl’ - Dataset ARN (str): e.g., ‘arn:aws:sagemaker:…:hub-content/AIRegistry/DataSet/…’ - DataSet object: sagemaker.ai_registry.dataset.DataSet instance (ARN inferred automatically)

Type:

Union[str, Any]

builtin_metrics#

List of built-in evaluation metric names to compute. The ‘Builtin.’ prefix from Bedrock documentation is optional and will be automatically removed if present. Examples: [‘Correctness’, ‘Faithfulness’] or [‘Builtin.Correctness’, ‘Builtin.Faithfulness’]. Optional.

Type:

Optional[List[str]]

custom_metrics#

JSON string containing array of custom metric definitions. Optional. For format details, see: https://docs.aws.amazon.com/bedrock/latest/userguide/model-evaluation-custom-metrics-prompt-formats.html

Type:

Optional[str]

mlflow_resource_arn#

ARN of the MLflow tracking server for experiment tracking. Optional. If not provided, the system will attempt to resolve it using the default MLflow app experience (checks domain match, account default, or creates a new app). Inherited from BaseEvaluator.

Type:

Optional[str]

evaluate_base_model#

Whether to evaluate the base model in addition to the custom model. Set to False to skip base model evaluation and only evaluate the custom model. Defaults to True (evaluates both models).

Type:

bool

region#

AWS region. Inherited from BaseEvaluator.

Type:

Optional[str]

sagemaker_session#

SageMaker session object. Inherited from BaseEvaluator.

Type:

Optional[Any]

model#

Model for evaluation. Inherited from BaseEvaluator.

Type:

Union[str, Any]

base_eval_name#

Base name for evaluation jobs. Inherited from BaseEvaluator.

Type:

Optional[str]

s3_output_path#

S3 location for evaluation outputs. Inherited from BaseEvaluator.

Type:

str

mlflow_experiment_name#

MLflow experiment name. Inherited from BaseEvaluator.

Type:

Optional[str]

mlflow_run_name#

MLflow run name. Inherited from BaseEvaluator.

Type:

Optional[str]

networking#

VPC configuration. Inherited from BaseEvaluator.

Type:

Optional[VpcConfig]

kms_key_id#

KMS key ID for encryption. Inherited from BaseEvaluator.

Type:

Optional[str]

model_package_group#

Model package group. Inherited from BaseEvaluator.

Type:

Optional[Union[str, ModelPackageGroup]]

Example

from sagemaker.train.evaluate import LLMAsJudgeEvaluator

# Example with built-in metrics (prefix optional)
# Both formats work - with or without 'Builtin.' prefix
evaluator = LLMAsJudgeEvaluator(
    base_model="llama-3-3-70b-instruct",
    evaluator_model="anthropic.claude-3-5-sonnet-20240620-v1:0",
    dataset="s3://my-bucket/my-dataset.jsonl",
    builtin_metrics=["Correctness", "Helpfulness"],  # Prefix optional
    mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server",
    s3_output_path="s3://my-bucket/output"
)
execution = evaluator.evaluate()

# Example with custom metrics
custom_metrics = [
    {
        "customMetricDefinition": {
            "name": "PositiveSentiment",
            "instructions": "Assess if the response has positive sentiment. Prompt: {{prompt}}\nResponse: {{prediction}}",
            "ratingScale": [
                {"definition": "Good", "value": {"floatValue": 1.0}},
                {"definition": "Poor", "value": {"floatValue": 0.0}}
            ]
        }
    }
]

evaluator = LLMAsJudgeEvaluator(
    base_model="llama-3-3-70b-instruct",
    evaluator_model="anthropic.claude-3-haiku-20240307-v1:0",
    dataset="s3://my-bucket/dataset.jsonl",
    custom_metrics=custom_metrics,
    s3_output_path="s3://my-bucket/output"
)
execution = evaluator.evaluate()

# Example evaluating only custom model (skip base model)
evaluator = LLMAsJudgeEvaluator(
    base_model="llama-3-3-70b-instruct",
    evaluator_model="anthropic.claude-3-5-sonnet-20240620-v1:0",
    dataset="s3://my-bucket/my-dataset.jsonl",
    builtin_metrics=["Correctness"],  # Prefix optional
    evaluate_base_model=False,
    s3_output_path="s3://my-bucket/output"
)
execution = evaluator.evaluate()
base_eval_name: str | None#
builtin_metrics: List[str] | None#
custom_metrics: str | None#
dataset: str | Any#
evaluate()[source]#

Create and start an LLM-as-judge evaluation job.

This method initiates a 2-phase evaluation job:

  1. Phase 1: Generate inference responses from base and custom models

  2. Phase 2: Use judge model to evaluate responses with built-in and custom metrics

Returns:

The created LLM-as-judge evaluation execution

Return type:

EvaluationPipelineExecution

Raises:

ValueError – If invalid model, dataset, or metric configurations are provided

Example

evaluator = LLMAsJudgeEvaluator(
    base_model="llama-3-3-70b-instruct",
evaluator_model="anthropic.claude-3-5-sonnet-20240620-v1:0",
dataset="s3://my-bucket/my-dataset.jsonl",
builtin_metrics=["Correctness", "Helpfulness"],  # Prefix optional
s3_output_path="s3://my-bucket/output"
)

evaluator_model=”anthropic.claude-3-5-sonnet-20240620-v1:0”, dataset=”s3://my-bucket/my-dataset.jsonl”, builtin_metrics=[“Correctness”, “Helpfulness”], s3_output_path=”s3://my-bucket/output”

) execution = evaluator.evaluate() execution.wait()

evaluate_base_model: bool#
evaluator_model: str#
classmethod get_all(session: Any | None = None, region: str | None = None)[source]#

Get all LLM-as-judge evaluation executions.

Uses EvaluationPipelineExecution.get_all() to retrieve all LLM-as-judge evaluation executions as an iterator.

Parameters:
  • session (Optional[Any]) – Optional boto3 session. If not provided, will be inferred.

  • region (Optional[str]) – Optional AWS region. If not provided, will be inferred.

Yields:

EvaluationPipelineExecution – LLM-as-judge evaluation execution instances

Example

# Get all LLM-as-judge evaluations as iterator
evaluations = LLMAsJudgeEvaluator.get_all()
all_executions = list(evaluations)

# Or iterate directly
for execution in LLMAsJudgeEvaluator.get_all():
    print(f"{execution.name}: {execution.status.overall_status}")

# With specific session/region
evaluations = LLMAsJudgeEvaluator.get_all(session=my_session, region='us-west-2')
all_executions = list(evaluations)
kms_key_id: str | None#
mlflow_experiment_name: str | None#
mlflow_resource_arn: str | None#
mlflow_run_name: str | None#
model: str | BaseTrainer | AgentRFTJob | ModelPackage#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_package_group: str | ModelPackageGroup | None#
networking: VpcConfig | None#
region: str | None#
role: str | None#
s3_output_path: str#
sagemaker_session: Any | None#
class sagemaker.train.evaluate.MultiTurnRLEvaluator(*, region: str | None = None, role: str | None = None, sagemaker_session: Any | None = None, model: str | BaseTrainer | AgentRFTJob | ModelPackage, base_eval_name: str | None = None, s3_output_path: str, mlflow_resource_arn: str | None = None, mlflow_experiment_name: str | None = None, mlflow_run_name: str | None = None, networking: VpcConfig | None = None, kms_key_id: str | None = None, model_package_group: str | ModelPackageGroup | None = None, dataset: Any, agent_config: Any | None = None, agent_qualifier: str | None = None, accept_eula: bool = True, evaluate_base_model: bool = False, stopping_condition: int = 86400, tags: List[Dict[str, str]] | None = None)[source]#

Bases: BaseEvaluator

Evaluate a multi-turn RL agent model against a held-out prompt dataset.

The evaluator runs rollouts of the agent against an environment (Bedrock AgentCore runtime or a Lambda-wrapped agent) and computes aggregate metrics (pass@k, mean reward, etc.). Execution routes through SageMaker Pipelines using the new AgentRFT Job step type (JobCategory="AgentRFTEvaluation").

The evaluator supports three evaluation shapes, selected automatically based on the provided inputs:

  • Base-model only — pass a base model (JumpStart ID or ModelPackage) with an explicit agent_config.

  • Fine-tuned only — pass a MultiTurnRLTrainer or a fine-tuned ModelPackage; the evaluator extracts the source model package ARN and evaluates it only.

  • Base + fine-tuned comparison — pass evaluate_base_model=True along with a fine-tuned trainer / ModelPackage; both runs land in the same MLflow experiment for side-by-side comparison.

dataset#

Prompt dataset — S3 URI, hub-content DataSet ARN, or object exposing an .arn attribute. Required.

Type:

Union[str, Any]

agent_config#

Agent environment — Bedrock AgentCore ARN or Lambda ARN. Auto-resolved from a MultiTurnRLTrainer when provided as model.

Type:

Optional[Union[str, Any]]

agent_qualifier#

Bedrock AgentCore qualifier (e.g. "PROD"). Ignored when agent_config is a Lambda.

Type:

Optional[str]

accept_eula#

Forwarded to JobConfigDocument.EvaluationConfig.AcceptEula. Defaults to True (templates emit true unconditionally; flag kept for future backend schemas).

Type:

bool

evaluate_base_model#

When True and a fine-tuned model is present, render the comparison template (both base and fine-tuned are evaluated). Defaults to False — fine-tuned only.

Type:

bool

stopping_condition#

Maximum job duration in seconds. Default 86400 (24 hours); must be in (0, 259200].

Type:

int

tags#

Customer tags propagated to the pipeline + step Tags list.

Type:

Optional[List[Dict[str, str]]]

See :class:`BaseEvaluator` for inherited fields (``model``,
``s3_output_path``, ``mlflow_resource_arn``,
``mlflow_experiment_name``, ``networking``, ``kms_key_id``,
``model_package_group``, ``base_eval_name``, ``region``, ``role``,
``sagemaker_session``).

Example

from sagemaker.train.evaluate import MultiTurnRLEvaluator

# Evaluate a fine-tuned MTRL trainer output
evaluator = MultiTurnRLEvaluator(
    model=completed_mtrl_trainer,
    dataset='s3://my-bucket/eval-prompts.jsonl',
    s3_output_path='s3://my-bucket/mtrl-eval-output/',
)

execution = evaluator.evaluate()
execution.wait()
execution.show_results()
accept_eula: bool#
agent_config: Any | None#
agent_qualifier: str | None#
base_eval_name: str | None#
dataset: Any#
evaluate() MTRLEvaluationExecution[source]#

Render the MTRL pipeline and start a non-blocking execution.

Returns:

The started pipeline execution. Call .wait() to block until completion and .show_results() to render the aggregate report.

Return type:

MTRLEvaluationExecution

Example

execution = evaluator.evaluate()
execution.wait()
execution.show_results()
evaluate_base_model: bool#
classmethod get_all(session=None, region=None)[source]#

List all MTRL evaluation executions in the account / region.

Parameters:
  • session – Optional boto3 session.

  • region – Optional AWS region.

Yields:

EvaluationPipelineExecution – MTRL evaluation execution instances.

property hyperparameters#

Lazy-load evaluation hyperparameters from the JumpStart hub.

Returns a FineTuningOptions object exposing to_dict(), get_info(), and attribute-style read/write access with hub-sourced validation (type + range).

Supported parameters (sourced from the AgentRFT evaluation recipe): eval_group_size, sampling_temperature, top_p, max_tokens, pass_k_values, success_threshold.

Raises:

ValueError – If the base model name is not available or the hub does not expose an AgentRFTEvaluation override spec for the model.

kms_key_id: str | None#
static list_bedrock_agentcore_runtimes(session=None) list[source]#

List Bedrock AgentCore runtimes.

Parameters:

session – Optional boto3 session.

Returns:

List of dicts, each with keys name, runtime_id, arn, and status.

static list_supported_models(session=None) list[source]#

Return the list of models that support MTRL evaluation.

Queries SageMakerPublicHub to discover all models with MTRL recipes in their RecipeCollection.

Parameters:

session – Optional boto3 session.

Returns:

List of hub content model names supporting MTRL evaluation.

mlflow_experiment_name: str | None#
mlflow_resource_arn: str | None#
mlflow_run_name: str | None#
model: str | BaseTrainer | AgentRFTJob | ModelPackage#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_package_group: str | ModelPackageGroup | None#
model_post_init(context: Any, /) None#

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

networking: VpcConfig | None#
region: str | None#
role: str | None#
s3_output_path: str#
sagemaker_session: Any | None#
stopping_condition: int#
tags: List[Dict[str, str]] | None#
class sagemaker.train.evaluate.PipelineExecutionStatus(*, overall_status: str, step_details: ~typing.List[~sagemaker.train.evaluate.execution.StepDetail] = <factory>, failure_reason: str | None = None)[source]#

Bases: BaseModel

Combined pipeline execution status with step details and failure reason.

Aggregates the overall execution status along with detailed information about individual pipeline steps and any failure reasons.

Parameters:
  • overall_status (str) – Overall execution status (Starting, Executing, Completed, Failed, etc.).

  • step_details (List[StepDetail]) – List of individual pipeline step details.

  • failure_reason (Optional[str]) – Detailed reason if the execution failed.

failure_reason: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

overall_status: str#
step_details: List[StepDetail]#
class sagemaker.train.evaluate.StepDetail(*, name: str, status: str, start_time: str | None = None, end_time: str | None = None, display_name: str | None = None, failure_reason: str | None = None, job_arn: str | None = None)[source]#

Bases: BaseModel

Pipeline step details for tracking execution progress.

Represents the status and timing information for a single step in a SageMaker pipeline execution.

Parameters:
  • name (str) – Name of the pipeline step.

  • status (str) – Status of the step (Completed, Executing, Waiting, Failed).

  • start_time (Optional[str]) – ISO format timestamp when step started.

  • end_time (Optional[str]) – ISO format timestamp when step ended.

  • display_name (Optional[str]) – Human-readable display name for the step.

  • failure_reason (Optional[str]) – Detailed reason if the step failed.

display_name: str | None#
end_time: str | None#
failure_reason: str | None#
job_arn: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str#
start_time: str | None#
status: str#
sagemaker.train.evaluate.get_benchmark_properties(benchmark: _Benchmark) Dict[str, Any][source]#

Get properties for a specific benchmark.

This utility method returns the properties associated with a given benchmark as a dictionary, including information about modality, metrics, strategy, and available subtasks.

Parameters:

benchmark (_Benchmark) – The benchmark to get properties for (from get_benchmarks()).

Returns:

Dictionary containing benchmark properties with keys:

  • modality (str): The modality type (e.g., “Text”, “Multi-Modal”)

  • description (str): Description of the benchmark

  • metrics (list[str]): List of supported metrics

  • strategy (str): The evaluation strategy used

  • subtask_available (bool): Whether subtasks are supported

  • subtasks (Optional[list[str]]): List of available subtasks, if applicable

Return type:

Dict[str, Any]

Raises:

ValueError – If the provided benchmark is not found in the configuration.

Example

Benchmark = get_benchmarks()
props = get_benchmark_properties(Benchmark.MMLU)
print(props['description'])
# 'Multi-task Language Understanding – Tests knowledge across 57 subjects.'
print(props['subtasks'][:3])
# ['abstract_algebra', 'anatomy', 'astronomy']

Note

In the future, this will be extended to dynamically fetch benchmark properties from a backend API call instead of using the internal static configuration.

sagemaker.train.evaluate.get_benchmarks() Type[_Benchmark][source]#

Get the Benchmark enum for selecting available benchmarks.

This utility method provides access to the internal Benchmark enum, allowing users to reference available benchmarks without directly accessing internal implementation details.

Returns:

The Benchmark enum class containing all available benchmarks.

Return type:

Type[_Benchmark]

Example

Benchmark = get_benchmarks()
evaluator = BenchMarkEvaluator(
    benchmark=Benchmark.MMLU,
    sagemaker_session=session,
    s3_output_path="s3://bucket/output"
)

Note

In the future, this will be extended to dynamically generate the enum from a backend API call to fetch the latest available benchmarks.

sagemaker.train.evaluate.get_builtin_metrics() Type[_BuiltInMetric][source]#

Get the built-in metrics enum for custom scorer evaluation.

This utility function provides access to preset metrics for custom scorer evaluation.

Returns:

The built-in metric enum class

Return type:

Type[_BuiltInMetric]

Example

from sagemaker.train.evaluate import get_builtin_metrics

BuiltInMetric = get_builtin_metrics()
evaluator = CustomScorerEvaluator(
    evaluator=BuiltInMetric.PRIME_MATH,
    dataset=my_dataset,
    base_model="my-model",
    s3_output_path="s3://bucket/output",
    mlflow_resource_arn="arn:..."
)

Multi-Turn RL Evaluation#

MultiTurnRLEvaluator — evaluate MTRL agents on held-out prompts.

This module implements MultiTurnRLEvaluator, the SDK surface for evaluating Multi-Turn Reinforcement Learning (MTRL) agent models via the AgentRFT CreateJob pipeline step. Mirrors the architecture of sagemaker.train.evaluate.BenchMarkEvaluator, with MTRL-specific fields, validators, and the three-template rendering surface defined in sagemaker.train.evaluate.mtrl_pipeline_templates.

class sagemaker.train.evaluate.multi_turn_rl_evaluator.MultiTurnRLEvaluator(*, region: str | None = None, role: str | None = None, sagemaker_session: Any | None = None, model: str | BaseTrainer | AgentRFTJob | ModelPackage, base_eval_name: str | None = None, s3_output_path: str, mlflow_resource_arn: str | None = None, mlflow_experiment_name: str | None = None, mlflow_run_name: str | None = None, networking: VpcConfig | None = None, kms_key_id: str | None = None, model_package_group: str | ModelPackageGroup | None = None, dataset: Any, agent_config: Any | None = None, agent_qualifier: str | None = None, accept_eula: bool = True, evaluate_base_model: bool = False, stopping_condition: int = 86400, tags: List[Dict[str, str]] | None = None)[source]#

Bases: BaseEvaluator

Evaluate a multi-turn RL agent model against a held-out prompt dataset.

The evaluator runs rollouts of the agent against an environment (Bedrock AgentCore runtime or a Lambda-wrapped agent) and computes aggregate metrics (pass@k, mean reward, etc.). Execution routes through SageMaker Pipelines using the new AgentRFT Job step type (JobCategory="AgentRFTEvaluation").

The evaluator supports three evaluation shapes, selected automatically based on the provided inputs:

  • Base-model only — pass a base model (JumpStart ID or ModelPackage) with an explicit agent_config.

  • Fine-tuned only — pass a MultiTurnRLTrainer or a fine-tuned ModelPackage; the evaluator extracts the source model package ARN and evaluates it only.

  • Base + fine-tuned comparison — pass evaluate_base_model=True along with a fine-tuned trainer / ModelPackage; both runs land in the same MLflow experiment for side-by-side comparison.

dataset#

Prompt dataset — S3 URI, hub-content DataSet ARN, or object exposing an .arn attribute. Required.

Type:

Union[str, Any]

agent_config#

Agent environment — Bedrock AgentCore ARN or Lambda ARN. Auto-resolved from a MultiTurnRLTrainer when provided as model.

Type:

Optional[Union[str, Any]]

agent_qualifier#

Bedrock AgentCore qualifier (e.g. "PROD"). Ignored when agent_config is a Lambda.

Type:

Optional[str]

accept_eula#

Forwarded to JobConfigDocument.EvaluationConfig.AcceptEula. Defaults to True (templates emit true unconditionally; flag kept for future backend schemas).

Type:

bool

evaluate_base_model#

When True and a fine-tuned model is present, render the comparison template (both base and fine-tuned are evaluated). Defaults to False — fine-tuned only.

Type:

bool

stopping_condition#

Maximum job duration in seconds. Default 86400 (24 hours); must be in (0, 259200].

Type:

int

tags#

Customer tags propagated to the pipeline + step Tags list.

Type:

Optional[List[Dict[str, str]]]

See :class:`BaseEvaluator` for inherited fields (``model``,
``s3_output_path``, ``mlflow_resource_arn``,
``mlflow_experiment_name``, ``networking``, ``kms_key_id``,
``model_package_group``, ``base_eval_name``, ``region``, ``role``,
``sagemaker_session``).

Example

from sagemaker.train.evaluate import MultiTurnRLEvaluator

# Evaluate a fine-tuned MTRL trainer output
evaluator = MultiTurnRLEvaluator(
    model=completed_mtrl_trainer,
    dataset='s3://my-bucket/eval-prompts.jsonl',
    s3_output_path='s3://my-bucket/mtrl-eval-output/',
)

execution = evaluator.evaluate()
execution.wait()
execution.show_results()
accept_eula: bool#
agent_config: Any | None#
agent_qualifier: str | None#
base_eval_name: str | None#
dataset: Any#
evaluate() MTRLEvaluationExecution[source]#

Render the MTRL pipeline and start a non-blocking execution.

Returns:

The started pipeline execution. Call .wait() to block until completion and .show_results() to render the aggregate report.

Return type:

MTRLEvaluationExecution

Example

execution = evaluator.evaluate()
execution.wait()
execution.show_results()
evaluate_base_model: bool#
classmethod get_all(session=None, region=None)[source]#

List all MTRL evaluation executions in the account / region.

Parameters:
  • session – Optional boto3 session.

  • region – Optional AWS region.

Yields:

EvaluationPipelineExecution – MTRL evaluation execution instances.

property hyperparameters#

Lazy-load evaluation hyperparameters from the JumpStart hub.

Returns a FineTuningOptions object exposing to_dict(), get_info(), and attribute-style read/write access with hub-sourced validation (type + range).

Supported parameters (sourced from the AgentRFT evaluation recipe): eval_group_size, sampling_temperature, top_p, max_tokens, pass_k_values, success_threshold.

Raises:

ValueError – If the base model name is not available or the hub does not expose an AgentRFTEvaluation override spec for the model.

kms_key_id: str | None#
static list_bedrock_agentcore_runtimes(session=None) list[source]#

List Bedrock AgentCore runtimes.

Parameters:

session – Optional boto3 session.

Returns:

List of dicts, each with keys name, runtime_id, arn, and status.

static list_supported_models(session=None) list[source]#

Return the list of models that support MTRL evaluation.

Queries SageMakerPublicHub to discover all models with MTRL recipes in their RecipeCollection.

Parameters:

session – Optional boto3 session.

Returns:

List of hub content model names supporting MTRL evaluation.

mlflow_experiment_name: str | None#
mlflow_resource_arn: str | None#
mlflow_run_name: str | None#
model: str | BaseTrainer | AgentRFTJob | ModelPackage#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_package_group: str | ModelPackageGroup | None#
model_post_init(context: Any, /) None#

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

networking: VpcConfig | None#
region: str | None#
role: str | None#
s3_output_path: str#
sagemaker_session: Any | None#
stopping_condition: int#
tags: List[Dict[str, str]] | None#