SageMaker MLOps#
MLOps capabilities including pipelines, workflows, and model management.
Pipeline Management#
SageMaker MLOps package for workflow orchestration and model building.
This package provides high-level orchestration capabilities for SageMaker workflows, including pipeline definitions, step implementations, and model building utilities.
The MLOps package sits at the top of the dependency hierarchy and can import from: - sagemaker.core (foundation primitives) - sagemaker.train (training functionality) - sagemaker.serve (serving functionality)
Key components: - workflow: Pipeline and step orchestration - model_builder: Model building and orchestration
- Example usage:
from sagemaker.mlops import ModelBuilder from sagemaker.mlops.workflow import Pipeline, TrainingStep
- class sagemaker.mlops.ModelBuilder(model: object | str | ~sagemaker.train.model_trainer.ModelTrainer | ~sagemaker.train.base_trainer.BaseTrainer | ~sagemaker.core.resources.TrainingJob | ~sagemaker.core.resources.ModelPackage | ~typing.List[~sagemaker.core.resources.Model] | None = None, model_path: str | None = <factory>, inference_spec: ~sagemaker.serve.spec.inference_spec.InferenceSpec | None = None, schema_builder: ~sagemaker.serve.builder.schema_builder.SchemaBuilder | None = None, modelbuilder_list: ~typing.List[~sagemaker.serve.model_builder.ModelBuilder] | None = None, role_arn: str | None = None, sagemaker_session: ~sagemaker.core.helper.session_helper.Session | None = None, image_uri: str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable | None = None, s3_model_data_url: str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable | ~typing.Dict[str, ~typing.Any] | None = None, source_code: ~sagemaker.core.training.configs.SourceCode | None = None, env_vars: ~typing.Dict[str, str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable] | None = <factory>, model_server: ~sagemaker.serve.utils.types.ModelServer | None = None, model_metadata: ~typing.Dict[str, ~typing.Any] | None = None, log_level: int | None = 10, content_type: str | None = None, accept_type: str | None = None, compute: ~sagemaker.core.training.configs.Compute | None = None, network: ~sagemaker.core.training.configs.Networking | None = None, instance_type: str | None = None, mode: ~sagemaker.serve.mode.function_pointers.Mode | None = Mode.SAGEMAKER_ENDPOINT, shared_libs: ~typing.List[str] = <factory>, dependencies: ~typing.Dict[str, ~typing.Any] | None = <factory>, image_config: ~typing.Dict[str, str | ~sagemaker.core.helper.pipeline_variable.PipelineVariable] | None = None)[source]#
Bases:
_InferenceRecommenderMixin,_ModelBuilderServers,_ModelBuilderUtilsUnified interface for building and deploying machine learning models.
ModelBuilder provides a streamlined workflow for preparing and deploying ML models to Amazon SageMaker. It supports multiple frameworks (PyTorch, TensorFlow, HuggingFace, etc.), model servers (TorchServe, TGI, Triton, etc.), and deployment modes (SageMaker endpoints, local containers, in-process).
The typical workflow involves three steps: 1. Initialize ModelBuilder with your model and configuration 2. Call build() to create a deployable Model resource 3. Call deploy() to create an Endpoint resource for inference
Example
>>> from sagemaker.serve.model_builder import ModelBuilder >>> from sagemaker.serve.mode.function_pointers import Mode >>> >>> # Initialize with a trained model >>> model_builder = ModelBuilder( ... model=my_pytorch_model, ... role_arn="arn:aws:iam::123456789012:role/SageMakerRole", ... instance_type="ml.m5.xlarge" ... ) >>> >>> # Build the model (creates SageMaker Model resource) >>> model = model_builder.build() >>> >>> # Deploy to endpoint (creates SageMaker Endpoint resource) >>> endpoint = model_builder.deploy(endpoint_name="my-endpoint") >>> >>> # Make predictions >>> result = endpoint.invoke(data=input_data)
- Parameters:
model – The model to deploy. Can be a trained model object, ModelTrainer, TrainingJob, ModelPackage, or JumpStart model ID string. Either model or inference_spec is required.
model_path – Local directory path where model artifacts are stored or will be downloaded.
inference_spec – Custom inference specification with load() and invoke() functions.
schema_builder – Defines input/output schema for serialization and deserialization.
modelbuilder_list – List of ModelBuilder objects for multi-model deployments.
pipeline_models – List of Model objects for creating inference pipelines.
role_arn – IAM role ARN for SageMaker to assume.
sagemaker_session – Session object for managing SageMaker API interactions.
image_uri – Container image URI. Auto-selected if not specified.
s3_model_data_url – S3 URI where model artifacts are stored or will be uploaded.
source_code – Source code configuration for custom inference code.
env_vars – Environment variables to set in the container.
model_server – Model server to use (TORCHSERVE, TGI, TRITON, etc.).
model_metadata – Dictionary to override model metadata (HF_TASK, MLFLOW_MODEL_PATH, etc.).
log_level – Logging level for ModelBuilder operations (default: logging.DEBUG).
content_type – MIME type of input data. Auto-derived from schema_builder if provided.
accept_type – MIME type of output data. Auto-derived from schema_builder if provided.
compute – Compute configuration specifying instance type and count.
network – Network configuration including VPC settings and network isolation.
instance_type – EC2 instance type for deployment (e.g., ‘ml.m5.large’).
mode – Deployment mode (SAGEMAKER_ENDPOINT, LOCAL_CONTAINER, or IN_PROCESS).
Note
ModelBuilder returns sagemaker.core.resources.Model and sagemaker.core.resources.Endpoint objects, not the deprecated PySDK Model and Predictor classes. Use endpoint.invoke() instead of predictor.predict() for inference.
- accept_type: str | None = None#
- build(model_name: str | None = None, mode: Mode | None = None, role_arn: str | None = None, sagemaker_session: Session | None = None, region: str | None = None) Model | ModelBuilder | None[source]#
Build a deployable
Modelinstance withModelBuilder.Creates a SageMaker
Modelresource with the appropriate container image, model artifacts, and configuration. This method prepares the model for deployment but does not deploy it to an endpoint. Use the deploy() method to create an endpoint.Note: This returns a
sagemaker.core.resources.Modelobject, not the deprecated PySDK Model class.- Parameters:
model_name (str, optional) – The name for the SageMaker model. If not specified, a unique name will be generated. (Default: None).
mode (Mode, optional) – The mode of operation. Options are SAGEMAKER_ENDPOINT, LOCAL_CONTAINER, or IN_PROCESS. (Default: None, uses mode from initialization).
role_arn (str, optional) – The IAM role ARN for SageMaker to assume when creating the model and endpoint. (Default: None).
sagemaker_session (Session, optional) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, uses the session from initialization or creates one using the default AWS configuration chain. (Default: None).
region (str, optional) – The AWS region for deployment. If specified and different from the current region, a new session will be created. (Default: None).
- Returns:
- A
sagemaker.core.resources.Modelresource that represents the created SageMaker model, or a
ModelBuilderinstance for multi-model scenarios.
- A
- Return type:
Union[Model, ModelBuilder, None]
Example
>>> model_builder = ModelBuilder(model=my_model, role_arn=role) >>> model = model_builder.build() # Creates Model resource >>> endpoint = model_builder.deploy() # Creates Endpoint resource >>> result = endpoint.invoke(data=input_data)
- compute: Compute | None = None#
- configure_for_torchserve(shared_libs: List[str] | None = None, dependencies: Dict[str, Any] | None = None, image_config: Dict[str, str | PipelineVariable] | None = None) ModelBuilder[source]#
Configure ModelBuilder for TorchServe deployment.
- content_type: str | None = None#
- dependencies: Dict[str, Any] | None#
- deploy(endpoint_name: str = None, initial_instance_count: int | None = 1, instance_type: str | None = None, wait: bool = True, update_endpoint: bool | None = False, container_timeout_in_seconds: int = 300, inference_config: ServerlessInferenceConfig | AsyncInferenceConfig | BatchTransformInferenceConfig | ResourceRequirements | None = None, custom_orchestrator_instance_type: str = None, custom_orchestrator_initial_instance_count: int = None, **kwargs) Endpoint | LocalEndpoint | Transformer[source]#
Deploy the built model to an
Endpoint.Creates a SageMaker
EndpointConfigand deploys anEndpointresource from the model created by build(). The model must be built before calling deploy().Note: This returns a
sagemaker.core.resources.Endpointobject, not the deprecated PySDK Predictor class. Use endpoint.invoke() to make predictions.- Parameters:
endpoint_name (str) – The name of the endpoint to create. If not specified, a unique endpoint name will be created. (Default: “endpoint”).
initial_instance_count (int, optional) – The initial number of instances to run in the endpoint. Required for instance-based endpoints. (Default: 1).
instance_type (str, optional) – The EC2 instance type to deploy this model to. For example, ‘ml.p2.xlarge’. Required for instance-based endpoints unless using serverless inference. (Default: None).
wait (bool) – Whether the call should wait until the deployment completes. (Default: True).
update_endpoint (bool) – Flag to update the model in an existing Amazon SageMaker endpoint. If True, deploys a new EndpointConfig to an existing endpoint and deletes resources from the previous EndpointConfig. (Default: False).
container_timeout_in_seconds (int) – The timeout value, in seconds, for the container to respond to requests. (Default: 300).
(Union[ServerlessInferenceConfig (inference_config) – BatchTransformInferenceConfig, ResourceRequirements], optional): Unified inference configuration parameter. Can be used instead of individual config parameters. (Default: None).
AsyncInferenceConfig – BatchTransformInferenceConfig, ResourceRequirements], optional): Unified inference configuration parameter. Can be used instead of individual config parameters. (Default: None).
- :paramBatchTransformInferenceConfig, ResourceRequirements], optional): Unified inference
configuration parameter. Can be used instead of individual config parameters. (Default: None).
- Parameters:
custom_orchestrator_instance_type (str, optional) – Instance type for custom orchestrator deployment. (Default: None).
custom_orchestrator_initial_instance_count (int, optional) – Initial instance count for custom orchestrator deployment. (Default: None).
- Returns:
- A
sagemaker.core.resources.Endpoint resource representing the deployed endpoint, a
LocalEndpointfor local mode, or aTransformerfor batch transform inference.
- A
- Return type:
Union[Endpoint, LocalEndpoint, Transformer]
Example
>>> model_builder = ModelBuilder(model=my_model, role_arn=role, instance_type="ml.m5.xlarge") >>> model = model_builder.build() # Creates Model resource >>> endpoint = model_builder.deploy(endpoint_name="my-endpoint") # Creates Endpoint resource >>> result = endpoint.invoke(data=input_data) # Make predictions
- deploy_local(endpoint_name: str = 'endpoint', container_timeout_in_seconds: int = 300, **kwargs) LocalEndpoint[source]#
Deploy the built model to local mode for testing.
Deploys the model locally using either LOCAL_CONTAINER mode (runs in a Docker container) or IN_PROCESS mode (runs in the current Python process). This is useful for testing and development before deploying to SageMaker endpoints. The model must be built with mode=Mode.LOCAL_CONTAINER or mode=Mode.IN_PROCESS before calling this method.
Note: This returns a
LocalEndpointobject for local inference, not a SageMaker Endpoint resource. Use local_endpoint.invoke() to make predictions.- Parameters:
endpoint_name (str) – The name for the local endpoint. (Default: “endpoint”).
container_timeout_in_seconds (int) – The timeout value, in seconds, for the container to respond to requests. (Default: 300).
- Returns:
A
LocalEndpointobject for making local predictions.- Return type:
LocalEndpoint
- Raises:
ValueError – If the model was not built with LOCAL_CONTAINER or IN_PROCESS mode.
Example
>>> model_builder = ModelBuilder( ... model=my_model, ... role_arn=role, ... mode=Mode.LOCAL_CONTAINER ... ) >>> model = model_builder.build() >>> local_endpoint = model_builder.deploy_local() >>> result = local_endpoint.invoke(data=input_data)
- enable_network_isolation()[source]#
Whether to enable network isolation when creating this Model
- Returns:
If network isolation should be enabled or not.
- Return type:
bool
- env_vars: Dict[str, str | PipelineVariable] | None#
- fetch_endpoint_names_for_base_model() Set[str][source]#
Fetches endpoint names for the base model.
- Returns:
Set of endpoint names for the base model.
- classmethod from_jumpstart_config(jumpstart_config: JumpStartConfig, role_arn: str | None = None, compute: Compute | None = None, network: Networking | None = None, image_uri: str | None = None, env_vars: Dict[str, str] | None = None, model_kms_key: str | None = None, resource_requirements: ResourceRequirements | None = None, tolerate_vulnerable_model: bool | None = None, tolerate_deprecated_model: bool | None = None, sagemaker_session: Session | None = None, schema_builder: SchemaBuilder | None = None) ModelBuilder[source]#
Create a
ModelBuilderinstance from a JumpStart configuration.This class method provides a convenient way to create a ModelBuilder for deploying pre-trained models from Amazon SageMaker JumpStart. It automatically retrieves the appropriate model artifacts, container images, and default configurations for the specified JumpStart model.
- Parameters:
jumpstart_config (JumpStartConfig) – Configuration object specifying the JumpStart model to use. Must include model_id and optionally model_version and inference_config_name.
role_arn (str, optional) – The IAM role ARN for SageMaker to assume when creating the model and endpoint. If not specified, attempts to use the default SageMaker execution role. (Default: None).
compute (Compute, optional) – Compute configuration specifying instance type and instance count for deployment. For example, Compute(instance_type=’ml.g5.xlarge’, instance_count=1). (Default: None).
network (Networking, optional) – Network configuration including VPC settings and network isolation. For example, Networking(vpc_config={‘Subnets’: […], ‘SecurityGroupIds’: […]}, enable_network_isolation=False). (Default: None).
image_uri (str, optional) – Custom container image URI. If not specified, uses the default JumpStart container image for the model. (Default: None).
env_vars (Dict[str, str], optional) – Environment variables to set in the container. These will be merged with default JumpStart environment variables. (Default: None).
model_kms_key (str, optional) – KMS key ARN used to encrypt model artifacts when uploading to S3. (Default: None).
resource_requirements (ResourceRequirements, optional) – The compute resource requirements for deploying the model to an inference component based endpoint. (Default: None).
tolerate_vulnerable_model (bool, optional) – If True, allows deployment of models with known security vulnerabilities. Use with caution. (Default: None).
tolerate_deprecated_model (bool, optional) – If True, allows deployment of deprecated JumpStart models. (Default: None).
sagemaker_session (Session, optional) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, creates one using the default AWS configuration chain. (Default: None).
schema_builder (SchemaBuilder, optional) – Schema builder for defining input/output schemas. If not specified, uses default schemas for the JumpStart model. (Default: None).
- Returns:
- A configured
ModelBuilderinstance ready to build and deploy the specified JumpStart model.
- A configured
- Return type:
Example
>>> from sagemaker.core.jumpstart.configs import JumpStartConfig >>> from sagemaker.serve.model_builder import ModelBuilder >>> >>> js_config = JumpStartConfig( ... model_id="huggingface-llm-mistral-7b", ... model_version="*" ... ) >>> >>> from sagemaker.core.training.configs import Compute >>> >>> model_builder = ModelBuilder.from_jumpstart_config( ... jumpstart_config=js_config, ... compute=Compute(instance_type="ml.g5.2xlarge", instance_count=1) ... ) >>> >>> model = model_builder.build() # Creates Model resource >>> endpoint = model_builder.deploy() # Creates Endpoint resource >>> result = endpoint.invoke(data=input_data) # Make predictions
- get_deployment_config() Dict[str, Any] | None[source]#
Gets the deployment config to apply to the model.
- image_config: Dict[str, str | PipelineVariable] | None = None#
- image_uri: str | PipelineVariable | None = None#
- inference_spec: InferenceSpec | None = None#
- instance_type: str | None = None#
- is_repack() bool[source]#
Whether the source code needs to be repacked before uploading to S3.
- Returns:
if the source need to be repacked or not
- Return type:
bool
- list_deployment_configs() List[Dict[str, Any]][source]#
List deployment configs for the model in the current region.
- log_level: int | None = 10#
- mode: Mode | None = 3#
- model: object | str | ModelTrainer | BaseTrainer | TrainingJob | ModelPackage | List[Model] | None = None#
- model_metadata: Dict[str, Any] | None = None#
- model_path: str | None#
- model_server: ModelServer | None = None#
- modelbuilder_list: List[ModelBuilder] | None = None#
- network: Networking | None = None#
- optimize(model_name: str | None = 'optimize_model', output_path: str | None = None, instance_type: str | None = None, role_arn: str | None = None, sagemaker_session: Session | None = None, region: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, job_name: str | None = None, accept_eula: bool | None = None, quantization_config: Dict | None = None, compilation_config: Dict | None = None, speculative_decoding_config: Dict | None = None, sharding_config: Dict | None = None, env_vars: Dict | None = None, vpc_config: Dict | None = None, kms_key: str | None = None, image_uri: str | None = None, max_runtime_in_sec: int | None = 36000) Model[source]#
Create an optimized deployable
Modelinstance withModelBuilder.Runs a SageMaker model optimization job to quantize, compile, or shard the model for improved inference performance. Returns a
Modelresource that can be deployed using the deploy() method.Note: This returns a
sagemaker.core.resources.Modelobject.- Parameters:
output_path (str, optional) – S3 URI where the optimized model artifacts will be stored. If not specified, uses the default output path. (Default: None).
instance_type (str, optional) – Target deployment instance type that the model is optimized for. For example, ‘ml.p4d.24xlarge’. (Default: None).
role_arn (str, optional) – IAM execution role ARN for the optimization job. If not specified, uses the role from initialization. (Default: None).
sagemaker_session (Session, optional) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, uses the session from initialization or creates one using the default AWS configuration chain. (Default: None).
region (str, optional) – The AWS region for the optimization job. If specified and different from the current region, a new session will be created. (Default: None).
model_name (str, optional) – The name for the optimized SageMaker model. If not specified, a unique name will be generated. (Default: None).
tags (Tags, optional) – Tags for labeling the model optimization job. (Default: None).
job_name (str, optional) – The name of the model optimization job. If not specified, a unique name will be generated. (Default: None).
accept_eula (bool, optional) – For models that require a Model Access Config, specify True or False to indicate whether model terms of use have been accepted. The accept_eula value must be explicitly defined as True in order to accept the end-user license agreement (EULA) that some models require. (Default: None).
quantization_config (Dict, optional) – Quantization configuration specifying the quantization method and parameters. For example: {‘OverrideEnvironment’: {‘OPTION_QUANTIZE’: ‘awq’}}. (Default: None).
compilation_config (Dict, optional) – Compilation configuration for optimizing the model for specific hardware. (Default: None).
speculative_decoding_config (Dict, optional) – Speculative decoding configuration for improving inference latency of large language models. (Default: None).
sharding_config (Dict, optional) – Model sharding configuration for distributing large models across multiple devices. (Default: None).
env_vars (Dict, optional) – Additional environment variables to pass to the optimization container. (Default: None).
vpc_config (Dict, optional) – VPC configuration for the optimization job. Should contain ‘Subnets’ and ‘SecurityGroupIds’ keys. (Default: None).
kms_key (str, optional) – KMS key ARN used to encrypt the optimized model artifacts when uploading to S3. (Default: None).
image_uri (str, optional) – Custom container image URI for the optimization job. If not specified, uses the default optimization container. (Default: None).
max_runtime_in_sec (int) – Maximum job execution time in seconds. The optimization job will be stopped if it exceeds this time. (Default: 36000).
- Returns:
- A
sagemaker.core.resources.Modelresource containing the optimized model artifacts, ready for deployment.
- A
- Return type:
Example
>>> model_builder = ModelBuilder(model=my_model, role_arn=role) >>> optimized_model = model_builder.optimize( ... instance_type="ml.g5.xlarge", ... quantization_config={'OverrideEnvironment': {'OPTION_QUANTIZE': 'awq'}} ... ) >>> endpoint = model_builder.deploy() # Deploy the optimized model >>> result = endpoint.invoke(data=input_data)
- register(model_package_name: str | PipelineVariable | None = None, model_package_group_name: str | PipelineVariable | None = None, content_types: List[str | PipelineVariable] = None, response_types: List[str | PipelineVariable] = None, inference_instances: List[str | PipelineVariable] | None = None, transform_instances: List[str | PipelineVariable] | None = None, model_metrics: ModelMetrics | None = None, metadata_properties: MetadataProperties | None = None, marketplace_cert: bool = False, approval_status: str | PipelineVariable | None = None, description: str | None = None, drift_check_baselines: DriftCheckBaselines | None = None, customer_metadata_properties: Dict[str, str | PipelineVariable] | None = None, validation_specification: str | PipelineVariable | None = None, domain: str | PipelineVariable | None = None, task: str | PipelineVariable | None = None, sample_payload_url: str | PipelineVariable | None = None, nearest_model_name: str | PipelineVariable | None = None, data_input_configuration: str | PipelineVariable | None = None, skip_model_validation: str | PipelineVariable | None = None, source_uri: str | PipelineVariable | None = None, model_card: ModelPackageModelCard | ModelCard | None = None, model_life_cycle: ModelLifeCycle | None = None, accept_eula: bool | None = None, model_type: JumpStartModelType | None = None) ModelPackage | ModelPackageGroup[source]#
Creates a model package for creating SageMaker models or listing on Marketplace.
- Parameters:
content_types (list[str] or list[PipelineVariable]) – The supported MIME types for the input data.
response_types (list[str] or list[PipelineVariable]) – The supported MIME types for the output data.
inference_instances (list[str] or list[PipelineVariable]) – A list of the instance types that are used to generate inferences in real-time (default: None).
transform_instances (list[str] or list[PipelineVariable]) – A list of the instance types on which a transformation job can be run or on which an endpoint can be deployed (default: None).
model_package_name (str or PipelineVariable) – Model Package name, exclusive to model_package_group_name, using model_package_name makes the Model Package un-versioned (default: None).
model_package_group_name (str or PipelineVariable) – Model Package Group name, exclusive to model_package_name, using model_package_group_name makes the Model Package versioned (default: None).
model_metrics (ModelMetrics) – ModelMetrics object (default: None).
metadata_properties (MetadataProperties) – MetadataProperties object (default: None).
marketplace_cert (bool) – A boolean value indicating if the Model Package is certified for AWS Marketplace (default: False).
approval_status (str or PipelineVariable) – Model Approval Status, values can be “Approved”, “Rejected”, or “PendingManualApproval” (default: “PendingManualApproval”).
description (str) – Model Package description (default: None).
drift_check_baselines (DriftCheckBaselines) – DriftCheckBaselines object (default: None).
customer_metadata_properties (dict[str, str] or dict[str, PipelineVariable]) – A dictionary of key-value paired metadata properties (default: None).
domain (str or PipelineVariable) – Domain values can be “COMPUTER_VISION”, “NATURAL_LANGUAGE_PROCESSING”, “MACHINE_LEARNING” (default: None).
task (str or PipelineVariable) – Task values which are supported by Inference Recommender are “FILL_MASK”, “IMAGE_CLASSIFICATION”, “OBJECT_DETECTION”, “TEXT_GENERATION”, “IMAGE_SEGMENTATION”, “CLASSIFICATION”, “REGRESSION”, “OTHER” (default: None).
sample_payload_url (str or PipelineVariable) – The S3 path where the sample payload is stored (default: None).
nearest_model_name (str or PipelineVariable) – Name of a pre-trained machine learning benchmarked by Amazon SageMaker Inference Recommender (default: None).
data_input_configuration (str or PipelineVariable) – Input object for the model (default: None).
skip_model_validation (str or PipelineVariable) – Indicates if you want to skip model validation. Values can be “All” or “None” (default: None).
source_uri (str or PipelineVariable) – The URI of the source for the model package (default: None).
model_card (ModeCard or ModelPackageModelCard) – document contains qualitative and quantitative information about a model (default: None).
model_life_cycle (ModelLifeCycle) – ModelLifeCycle object (default: None).
accept_eula (bool) – For models that require a Model Access Config, specify True or False to indicate whether model terms of use have been accepted (default: None).
model_type (JumpStartModelType) – Type of JumpStart model (default: None).
- Returns:
A sagemaker.model.ModelPackage instance or pipeline step arguments in case the Model instance is built with
PipelineSession
Note
The following parameters are inherited from ModelBuilder.__init__ and do not need to be passed to register(): - image_uri: Use self.image_uri (defined in __init__) - framework: Use self.framework (defined in __init__) - framework_version: Use self.framework_version (defined in __init__)
- role_arn: str | None = None#
- s3_model_data_url: str | PipelineVariable | Dict[str, Any] | None = None#
- sagemaker_session: Session | None = None#
- schema_builder: SchemaBuilder | None = None#
- set_deployment_config(config_name: str, instance_type: str) None[source]#
Sets the deployment config to apply to the model.
- source_code: SourceCode | None = None#
- to_string(obj: object)[source]#
Convert an object to string
This helper function handles converting PipelineVariable object to string as well
- Parameters:
obj (object) – The object to be converted
- transformer(instance_count, instance_type, strategy=None, assemble_with=None, output_path=None, output_kms_key=None, accept=None, env=None, max_concurrent_transforms=None, max_payload=None, tags=None, volume_kms_key=None)[source]#
Return a
Transformerthat uses this Model.- Parameters:
instance_count (int) – Number of EC2 instances to use.
instance_type (str) – Type of EC2 instance to use, for example, ‘ml.c4.xlarge’.
strategy (str) – The strategy used to decide how to batch records in a single request (default: None). Valid values: ‘MultiRecord’ and ‘SingleRecord’.
assemble_with (str) – How the output is assembled (default: None). Valid values: ‘Line’ or ‘None’.
output_path (str) – S3 location for saving the transform result. If not specified, results are stored to a default bucket.
output_kms_key (str) – Optional. KMS key ID for encrypting the transform output (default: None).
accept (str) – The accept header passed by the client to the inference endpoint. If it is supported by the endpoint, it will be the format of the batch transform output.
env (dict) – Environment variables to be set for use during the transform job (default: None).
max_concurrent_transforms (int) – The maximum number of HTTP requests to be made to each individual transform container at one time.
max_payload (int) – Maximum size of the payload in a single HTTP request to the container in MB.
tags (Optional[Tags]) – Tags for labeling a transform job. If none specified, then the tags used for the training job are used for the transform job.
volume_kms_key (str) – Optional. KMS key ID for encrypting the volume attached to the ML compute instance (default: None).
Workflow Management#
SageMaker workflow orchestration module.
This module provides pipeline and step orchestration capabilities for SageMaker workflows. It contains the high-level classes that orchestrate training, processing, and serving components from the train and serve packages.
Key components: - Pipeline: Main workflow orchestration class - Steps: Various step implementations (TrainingStep, ProcessingStep, etc.) - Configuration: Pipeline configuration classes - Utilities: Helper functions for workflow management
Note: This module imports from sagemaker.core.workflow for primitives (entities, parameters, functions, conditions, properties) and can import from sagemaker.train and sagemaker.serve for orchestration purposes.
- class sagemaker.mlops.workflow.AutoMLStep(name: str, step_args: _JobStepArguments, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#
Bases:
ConfigurableRetryStepAutoMLStep for SageMaker Pipelines Workflows.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dictionary that is used to call create_auto_ml_job.
- NOTE: The CreateAutoMLJob request is not quite the
args list that workflow needs.
- ModelDeployConfig and GenerateCandidateDefinitionsOnly
attribute cannot be included.
- get_best_auto_ml_model_builder(role, sagemaker_session=None)[source]#
Get the best candidate model artifacts, image uri and env variables for the best model.
- Parameters:
role (str) – An AWS IAM role (either name or full ARN). The Amazon SageMaker AutoML jobs and APIs that create Amazon SageMaker endpoints use this role to access training data and model artifacts.
sagemaker_session (sagemaker.core.helper.session.Session) –
A SageMaker Session object, used for SageMaker interactions. If the best model will be used as part of ModelStep, then sagemaker_session should be class:~sagemaker.workflow.pipeline_context.PipelineSession. Example:
model = Model(sagemaker_session=PipelineSession()) model_step = ModelStep(step_args=model.register())
- property properties#
A Properties object representing the DescribeAutoMLJobResponse data model.
- class sagemaker.mlops.workflow.CacheConfig(enable_caching: bool = False, expire_after=None)[source]#
Bases:
objectConfiguration class to enable caching in SageMaker Pipelines Workflows.
If caching is enabled, the pipeline attempts to find a previous execution of a Step that was called with the same arguments. Step caching only considers successful execution. If a successful previous execution is found, the pipeline propagates the values from the previous execution rather than recomputing the Step. When multiple successful executions exist within the timeout period, it uses the result for the most recent successful execution.
- enable_caching#
To enable Step caching. Defaults to False.
- Type:
bool
- expire_after#
If Step caching is enabled, a timeout also needs to defined. It defines how old a previous execution can be to be considered for reuse. Value should be an ISO 8601 duration string. Defaults to None.
Examples:
'p30d' # 30 days 'P4DT12H' # 4 days and 12 hours 'T12H' # 12 hours
- Type:
str
- property config#
Configures Step caching for SageMaker Pipelines Workflows.
- enable_caching: bool#
- class sagemaker.mlops.workflow.CallbackOutput(output_name: str | None = None, output_type: CallbackOutputTypeEnum = CallbackOutputTypeEnum.String)[source]#
Bases:
objectOutput for a callback step.
- output_name#
The output name
- Type:
str
- output_type#
The output type
- Type:
CallbackOutputTypeEnum
- output_name: str#
- output_type: CallbackOutputTypeEnum#
- class sagemaker.mlops.workflow.CallbackStep(name: str, sqs_queue_url: str, inputs: dict, outputs: List[CallbackOutput], display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#
Bases:
StepCallback step for workflow.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dict that is used to define the callback step.
- property properties#
A Properties object representing the output parameters of the callback step.
- class sagemaker.mlops.workflow.CheckJobConfig(role, instance_count=1, instance_type='ml.m5.xlarge', volume_size_in_gb=30, volume_kms_key=None, output_kms_key=None, max_runtime_in_seconds=None, base_job_name=None, sagemaker_session=None, env=None, tags=None, network_config=None)[source]#
Bases:
objectCheck job config for QualityCheckStep and ClarifyCheckStep.
- class sagemaker.mlops.workflow.ClarifyCheckStep(name: str, clarify_check_config: ClarifyCheckConfig, check_job_config: CheckJobConfig, skip_check: bool | PipelineVariable = False, fail_on_violation: bool | PipelineVariable = True, register_new_baseline: bool | PipelineVariable = False, model_package_group_name: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#
Bases:
StepClarifyCheckStep step for workflow.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dict that is used to define the ClarifyCheck step.
- property properties#
A Properties object representing the output parameters of the ClarifyCheck step.
- class sagemaker.mlops.workflow.ConditionStep(name: str, depends_on: List[str | Step] | None = None, display_name: str | None = None, description: str | None = None, conditions: List[Condition] | None = None, if_steps: List[Step] | None = None, else_steps: List[Step] | None = None)[source]#
Bases:
StepConditional step for pipelines to support conditional branching in the execution of steps.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dict that is used to define the conditional branching in the pipeline.
- property properties#
A simple Properties object with Outcome as the only property
- property step_only_arguments#
Argument dict pertaining to the step only, and not the if_steps or else_steps.
- class sagemaker.mlops.workflow.ConfigurableRetryStep(name: str, step_type: StepTypeEnum, display_name: str | None = None, description: str | None = None, depends_on: List[str | Step | StepCollection] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#
Bases:
StepConfigurableRetryStep for SageMaker Pipelines Workflows.
- add_retry_policy(retry_policy: RetryPolicy)[source]#
Add a policy to the current ConfigurableRetryStep retry policies list.
- class sagemaker.mlops.workflow.EMRStep(name: str, display_name: str, description: str, cluster_id: str, step_config: EMRStepConfig, depends_on: List[str | Step] | None = None, cache_config: CacheConfig | None = None, cluster_config: Dict[str, Any] | None = None, execution_role_arn: str | None = None)[source]#
Bases:
StepEMR step for workflow.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dict that is used to call AddJobFlowSteps.
NOTE: The AddFlowJobSteps request is not quite the args list that workflow needs. The Name attribute in AddJobFlowSteps cannot be passed; it will be set during runtime. In addition to that, we will also need to include emr job inputs and output config.
- property properties: Dict[str, Any] | List[Dict[str, Any]]#
A Properties object representing the EMR DescribeStepResponse model
- class sagemaker.mlops.workflow.EMRStepConfig(jar, args: List[str] | None = None, main_class: str | None = None, properties: List[dict] | None = None)[source]#
Bases:
objectConfig for a Hadoop Jar step.
- class sagemaker.mlops.workflow.FailStep(name: str, error_message: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, depends_on: List[str | Step] | None = None)[source]#
Bases:
StepFailStep for SageMaker Pipelines Workflows.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dictionary that is used to define the FailStep.
- property properties#
A Properties object is not available for the FailStep.
Executing a FailStep will terminate the pipeline. FailStep properties should not be referenced.
- class sagemaker.mlops.workflow.LambdaOutput(output_name: str | None = None, output_type: LambdaOutputTypeEnum = LambdaOutputTypeEnum.String)[source]#
Bases:
objectOutput for a lambdaback step.
- output_name#
The output name
- Type:
str
- output_type#
The output type
- Type:
LambdaOutputTypeEnum
- output_name: str#
- output_type: LambdaOutputTypeEnum#
- class sagemaker.mlops.workflow.LambdaStep(name: str, lambda_func: Lambda, display_name: str | None = None, description: str | None = None, inputs: dict | None = None, outputs: List[LambdaOutput] | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step | StepCollection] | None = None)[source]#
Bases:
StepLambda step for workflow.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dict that is used to define the lambda step.
- property properties#
A Properties object representing the output parameters of the lambda step.
- class sagemaker.mlops.workflow.ModelStep(name: str, step_args: _ModelStepArguments | Dict, depends_on: List[str | Step | StepCollection] | None = None, retry_policies: List[RetryPolicy] | Dict[str, List[RetryPolicy]] | None = None, display_name: str | None = None, description: str | None = None, repack_model_step_settings: Dict[str, any] | None = None)[source]#
Bases:
ConfigurableRetryStepModelStep for SageMaker Pipelines Workflows.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dict that are used to call the appropriate SageMaker API.
- property properties#
A Properties object representing the appropriate SageMaker response data model.
- class sagemaker.mlops.workflow.MonitorBatchTransformStep(name: str, transform_step_args: _JobStepArguments, monitor_configuration: QualityCheckConfig | ClarifyCheckConfig, check_job_configuration: CheckJobConfig, monitor_before_transform: bool = False, fail_on_violation: bool | PipelineVariable = True, supplied_baseline_statistics: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None)[source]#
Bases:
StepCollectionCreates a Transformer step with Quality or Clarify check step
Used to monitor the inputs and outputs of the batch transform job.
- class sagemaker.mlops.workflow.NotebookJobStep(input_notebook: str, image_uri: str, kernel_name: str, name: str | None = None, display_name: str | None = None, description: str | None = None, notebook_job_name: str | None = None, role: str | None = None, s3_root_uri: str | None = None, parameters: Dict[str, str | PipelineVariable] | None = None, environment_variables: Dict[str, str | PipelineVariable] | None = None, initialization_script: str | None = None, s3_kms_key: str | PipelineVariable | None = None, instance_type: str | PipelineVariable | None = 'ml.m5.large', volume_size: int | PipelineVariable = 30, volume_kms_key: str | PipelineVariable | None = None, encrypt_inter_container_traffic: bool | PipelineVariable | None = True, security_group_ids: List[str | PipelineVariable] | None = None, subnets: List[str | PipelineVariable] | None = None, max_retry_attempts: int = 1, max_runtime_in_seconds: int = 172800, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, additional_dependencies: List[str] | None = None, retry_policies: List[RetryPolicy] | None = None, depends_on: List[Step | StepOutput] | None = None)[source]#
Bases:
ConfigurableRetryStepNotebookJobStep for SageMaker Pipelines Workflows.
For more details about SageMaker Notebook Jobs, see SageMaker Notebook Jobs.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
Generates the arguments dictionary that is used to create the job.
- property depends_on: List[str | Step | StepCollection | StepOutput] | None#
The list of steps the current Step depends on.
- property properties#
A Properties object representing the notebook job step output
- class sagemaker.mlops.workflow.ParallelismConfiguration(max_parallel_execution_steps: int)[source]#
Bases:
objectParallelism config for SageMaker pipeline.
- class sagemaker.mlops.workflow.Pipeline(name: str = '', parameters: ~typing.Sequence[~sagemaker.core.workflow.parameters.Parameter] | None = None, pipeline_experiment_config: ~sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig | None = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfig object>, mlflow_config: ~sagemaker.core.shapes.shapes.MlflowConfig | None = None, steps: ~typing.Sequence[~sagemaker.mlops.workflow.steps.Step | ~sagemaker.core.workflow.step_outputs.StepOutput] | None = None, sagemaker_session: ~sagemaker.core.helper.session_helper.Session | None = None, pipeline_definition_config: ~sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig | None = <sagemaker.core.workflow.pipeline_definition_config.PipelineDefinitionConfig object>)[source]#
Bases:
objectPipeline for workflow.
- build_parameters_from_execution(pipeline_execution_arn: str, parameter_value_overrides: Dict[str, str | bool | int | float] | None = None) Dict[str, str | bool | int | float][source]#
Gets the parameters from an execution, update with optional parameter value overrides.
- Parameters:
pipeline_execution_arn (str) – The arn of the reference pipeline execution.
parameter_value_overrides (Dict[str, Union[str, bool, int, float]]) – Parameter dict to be updated with the parameters from the referenced execution.
- Returns:
A parameter dict built from an execution and provided parameter value overrides.
- create(role_arn: str = None, description: str = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration = None) Dict[str, Any][source]#
Creates a Pipeline in the Pipelines service.
- Parameters:
role_arn (str) – The role arn that is assumed by the pipeline to create step artifacts.
description (str) – A description of the pipeline.
tags (Optional[Tags]) – Tags to be passed to the pipeline.
parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.
- Returns:
A response dict from the service.
- definition() str[source]#
Converts a request structure to string representation for workflow service calls.
- Returns:
A JSON formatted string of pipeline definition.
- delete() Dict[str, Any][source]#
Deletes a Pipeline in the Workflow service.
- Returns:
A response dict from the service.
- delete_triggers(trigger_names: List[str])[source]#
Delete Triggers for a parent SageMaker Pipeline if they exist.
- Parameters:
trigger_names (List[str]) – List of trigger names to be deleted. Currently, these can only be EventBridge schedule names.
- describe(pipeline_version_id: int | None = None) Dict[str, Any][source]#
Describes a Pipeline in the Workflow service.
- Parameters:
pipeline_version_id (Optional[str]) – version ID of the pipeline to describe.
- Returns:
Response dict from the service. See boto3 client documentation
- describe_trigger(trigger_name: str) Dict[str, Any][source]#
Describe Trigger for a parent SageMaker Pipeline.
- Parameters:
trigger_name (str) – Trigger name to be described. Currently, this can only be an EventBridge schedule name.
- Returns:
Trigger describe responses from EventBridge.
- Return type:
Dict[str, str]
- property latest_pipeline_version_id#
Retrieves the latest version id of this pipeline
- list_executions(sort_by: str | None = None, sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) Dict[str, Any][source]#
Lists a pipeline’s executions.
- Parameters:
sort_by (str) – The field by which to sort results(CreationTime/PipelineExecutionArn).
sort_order (str) – The sort order for results (Ascending/Descending).
max_results (int) – The maximum number of pipeline executions to return in the response.
next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.
- Returns:
List of Pipeline Execution Summaries. See boto3 client list_pipeline_executions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_pipeline_executions
- list_pipeline_versions(sort_order: str | None = None, max_results: int | None = None, next_token: str | None = None) str[source]#
Lists a pipeline’s versions.
- Parameters:
sort_order (str) – The sort order for results (Ascending/Descending).
max_results (int) – The maximum number of pipeline executions to return in the response.
next_token (str) – If the result of the previous ListPipelineExecutions request was truncated, the response includes a NextToken. To retrieve the next set of pipeline executions, use the token in the next request.
- Returns:
List of Pipeline Version Summaries. See boto3 client list_pipeline_versions https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/list_pipeline_versions.html#
- put_triggers(triggers: List[Trigger], role_arn: str | None = None) List[str][source]#
Attach triggers to a parent SageMaker Pipeline.
- Parameters:
triggers (List[Trigger]) – List of supported triggers. Currently, this can only be of type PipelineSchedule.
role_arn (str) – The role arn that is assumed by EventBridge service.
- Returns:
- Successfully created trigger Arn(s). Currently, the pythonSDK only supports
PipelineSchedule triggers, thus, this is a list of EventBridge Schedule Arn(s) that were created/upserted.
- Return type:
List[str]
- start(parameters: Dict[str, str | bool | int | float] = None, execution_display_name: str = None, execution_description: str = None, parallelism_config: ParallelismConfiguration = None, selective_execution_config: SelectiveExecutionConfig = None, mlflow_experiment_name: str = None, pipeline_version_id: int = None)[source]#
Starts a Pipeline execution in the Workflow service.
- Parameters:
parameters (Dict[str, Union[str, bool, int, float]]) – values to override pipeline parameters.
execution_display_name (str) – The display name of the pipeline execution.
execution_description (str) – A description of the execution.
parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.
selective_execution_config (Optional[SelectiveExecutionConfig]) – The configuration for selective step execution.
mlflow_experiment_name (str) – Optional MLflow experiment name to override the experiment name specified in the pipeline’s mlflow_config. If provided, this will override the experiment name for this specific pipeline execution only, without modifying the pipeline definition.
pipeline_version_id (Optional[str]) – version ID of the pipeline to start the execution from. If not specified, uses the latest version ID.
- Returns:
A _PipelineExecution instance, if successful.
- update(role_arn: str | None = None, description: str | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#
Updates a Pipeline in the Workflow service.
- Parameters:
role_arn (str) – The role arn that is assumed by pipelines to create step artifacts.
description (str) – A description of the pipeline.
parallelism_config (Optional[ParallelismConfiguration]) – Parallelism configuration that is applied to each of the executions of the pipeline. It takes precedence over the parallelism configuration of the parent pipeline.
- Returns:
A response dict from the service.
- upsert(role_arn: str | None = None, description: str | None = None, tags: List[Dict[str, str | PipelineVariable]] | Dict[str, str | PipelineVariable] | None = None, parallelism_config: ParallelismConfiguration | None = None) Dict[str, Any][source]#
Creates a pipeline or updates it, if it already exists.
- Parameters:
role_arn (str) – The role arn that is assumed by workflow to create step artifacts.
description (str) – A description of the pipeline.
tags (Optional[Tags]) – Tags to be passed.
steps (parallelism_config (Optional[Config for parallel) – is applied to each of the executions
that (Parallelism configuration) – is applied to each of the executions
- Returns:
response dict from service
- class sagemaker.mlops.workflow.PipelineExperimentConfig(experiment_name: str | Parameter | ExecutionVariable | PipelineVariable, trial_name: str | Parameter | ExecutionVariable | PipelineVariable)[source]#
Bases:
EntityExperiment config for SageMaker pipeline.
- class sagemaker.mlops.workflow.PipelineExperimentConfigProperties[source]#
Bases:
objectEnum-like class for all pipeline experiment config property references.
- EXPERIMENT_NAME = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfigProperty object>#
- TRIAL_NAME = <sagemaker.mlops.workflow.pipeline_experiment_config.PipelineExperimentConfigProperty object>#
- class sagemaker.mlops.workflow.PipelineExperimentConfigProperty(name: str)[source]#
Bases:
PipelineVariableReference to pipeline experiment config property.
- property expr: Dict[str, Any] | List[Dict[str, Any]]#
The ‘Get’ expression dict for a pipeline experiment config property.
- class sagemaker.mlops.workflow.PipelineGraph(steps: Sequence[Step])[source]#
Bases:
objectHelper class representing the Pipeline Directed Acyclic Graph (DAG)
- classmethod from_pipeline(pipeline: Pipeline)[source]#
Create a PipelineGraph object from the Pipeline object.
- class sagemaker.mlops.workflow.PipelineSchedule(name: str | None = None, enabled: bool | None = True, start_date: datetime | None = None, at: datetime | None = None, rate: tuple | None = None, cron: str | None = None)[source]#
Bases:
TriggerPipeline Schedule trigger type used to create EventBridge Schedules for SageMaker Pipelines.
To create a pipeline schedule, specify a single type using the
at,rate, orcronparameters. For more information about EventBridge syntax, see Schedule types on EventBridge Scheduler.- Parameters:
start_date (datetime) – The start date of the schedule. Default is
time.now().at (datetime) – An “At” EventBridge expression. Defaults to UTC timezone. Note that if you use
datetime.now(), the result is a snapshot of your current local time. Eventbridge requires a time in UTC format. You can convert the result ofdatetime.now()to UTC by usingdatetime.utcnow()ordatetime.now(tz=pytz.utc). For example, you can create a time two minutes from now with the expressiondatetime.now(tz=pytz.utc) + timedelta(0, 120).rate (tuple) – A “Rate” EventBridge expression. Format is (value, unit).
cron (str) – A “Cron” EventBridge expression. Format is “minutes hours day-of-month month day-of-week year”.
name (str) – The schedule name. Default is
None.enabled (boolean) – If the schedule is enabled. Defaults to
True.
- at: datetime | None#
- cron: str | None#
- rate: tuple | None#
- resolve_schedule_expression() str[source]#
Resolve schedule expression
- Format schedule expression for an EventBridge client call from the specified
at, rate, or cron parameter. After resolution, if there are any othererrors in the syntax, this will throw an expected ValidationException from EventBridge.
- Returns:
Correctly string formatted schedule expression based on type.
- Return type:
schedule_expression
- start_date: datetime | None#
- class sagemaker.mlops.workflow.ProcessingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, property_files: List[PropertyFile] | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#
Bases:
ConfigurableRetryStepProcessingStep for SageMaker Pipelines Workflows.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dictionary that is used to call create_processing_job.
NOTE: The CreateProcessingJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.
- property properties#
A Properties object representing the DescribeProcessingJobResponse data model.
- class sagemaker.mlops.workflow.QualityCheckConfig(baseline_dataset: str | PipelineVariable, dataset_format: dict, *, output_s3_uri: str | PipelineVariable | None = None, post_analytics_processor_script: str | None = None)[source]#
Bases:
ABCQuality Check Config.
- baseline_dataset#
The path to the baseline_dataset file. This can be a local path or an S3 uri string
- Type:
str or PipelineVariable
- dataset_format#
The format of the baseline_dataset.
- Type:
dict
- output_s3_uri#
Desired S3 destination of the constraint_violations and statistics json files (default: None). If not specified an auto generated path will be used: “s3://<default_session_bucket>/model-monitor/baselining/<job_name>/results”
- Type:
str or PipelineVariable
- post_analytics_processor_script#
The path to the record post-analytics processor script (default: None). This can be a local path or an S3 uri string but CANNOT be any type of the PipelineVariable.
- Type:
str
- baseline_dataset: str | PipelineVariable#
- dataset_format: dict#
- output_s3_uri: str | PipelineVariable#
- post_analytics_processor_script: str#
- class sagemaker.mlops.workflow.QualityCheckStep(name: str, quality_check_config: QualityCheckConfig, check_job_config: CheckJobConfig, skip_check: bool | PipelineVariable = False, fail_on_violation: bool | PipelineVariable = True, register_new_baseline: bool | PipelineVariable = False, model_package_group_name: str | PipelineVariable | None = None, supplied_baseline_statistics: str | PipelineVariable | None = None, supplied_baseline_constraints: str | PipelineVariable | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None)[source]#
Bases:
StepQualityCheck step for workflow.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dict that is used to define the QualityCheck step.
- property properties#
A Properties object representing the output parameters of the QualityCheck step.
- class sagemaker.mlops.workflow.RetryPolicy(backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#
Bases:
EntityRetryPolicy base class
- backoff_rate#
The multiplier by which the retry interval increases during each attempt (default: 2.0)
- Type:
float
- interval_seconds#
An integer that represents the number of seconds before the first retry attempt (default: 1)
- Type:
int
- max_attempts#
A positive integer that represents the maximum number of retry attempts. (default: None)
- Type:
int
- expire_after_mins#
A positive integer that represents the maximum minute to expire any further retry attempt (default: None)
- Type:
int
- backoff_rate: float#
- expire_after_mins: int#
- interval_seconds: int#
- max_attempts: int#
- class sagemaker.mlops.workflow.SageMakerJobExceptionTypeEnum(*args, value=<object object>, **kwargs)[source]#
Bases:
EnumSageMaker Job ExceptionType enum.
- CAPACITY_ERROR = 'SageMaker.CAPACITY_ERROR'#
- INTERNAL_ERROR = 'SageMaker.JOB_INTERNAL_ERROR'#
- RESOURCE_LIMIT = 'SageMaker.RESOURCE_LIMIT'#
- class sagemaker.mlops.workflow.SageMakerJobStepRetryPolicy(exception_types: List[SageMakerJobExceptionTypeEnum] | None = None, failure_reason_types: List[SageMakerJobExceptionTypeEnum] | None = None, backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#
Bases:
RetryPolicyRetryPolicy for exception thrown by SageMaker Job.
- exception_types#
The SageMaker exception to match for this policy. The SageMaker exceptions captured here are the exceptions thrown by synchronously creating the job. For instance the resource limit exception.
- Type:
- failure_reason_types#
the SageMaker failure reason types to match for this policy. The failure reason type is presented in FailureReason field of the Describe response, it indicates the runtime failure reason for a job.
- Type:
- backoff_rate#
The multiplier by which the retry interval increases during each attempt (default: 2.0)
- Type:
float
- interval_seconds#
An integer that represents the number of seconds before the first retry attempt (default: 1)
- Type:
int
- max_attempts#
A positive integer that represents the maximum number of retry attempts. (default: None)
- Type:
int
- expire_after_mins#
A positive integer that represents the maximum minute to expire any further retry attempt (default: None)
- Type:
int
- class sagemaker.mlops.workflow.SelectiveExecutionConfig(selected_steps: List[str], reference_latest_execution: bool = True, source_pipeline_execution_arn: str | None = None)[source]#
Bases:
objectThe selective execution configuration, which defines a subset of pipeline steps to run in
another SageMaker pipeline run.
- class sagemaker.mlops.workflow.Step(name: str, display_name: str | None = None, description: str | None = None, step_type: StepTypeEnum = None, depends_on: List[str | Step | StepCollection | StepOutput] | None = None)[source]#
Bases:
EntityPipeline Step for SageMaker Pipelines Workflows.
- add_depends_on(step_names: List[str | Step | StepCollection | StepOutput])[source]#
Add Step names or Step instances to the current Step depends on list.
- abstract property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments to the particular Step service call.
- property depends_on: List[str | Step | StepCollection | StepOutput] | None#
The list of steps the current Step depends on.
- abstract property properties#
The properties of the particular Step.
- property ref: Dict[str, str]#
Gets a reference dictionary for Step instances.
- property step_only_arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments to this Step only.
Compound Steps such as the ConditionStep will have to override this method to return arguments pertaining to only that step.
- class sagemaker.mlops.workflow.StepCollection(name: str, steps: List[Step] = NOTHING, depends_on: List[str | Step | StepCollection | StepOutput] = None)[source]#
Bases:
objectA wrapper of pipeline steps for workflow.
- name#
The name of the StepCollection.
- Type:
str
- depends_on#
The list of Step/StepCollection names or Step/StepCollection/StepOutput instances that the current Step depends on.
- Type:
List[Union[str, Step, StepCollection, StepOutput]]
- depends_on: List[str | Step | StepCollection | StepOutput]#
- name: str#
- property properties#
The properties of the particular StepCollection.
- class sagemaker.mlops.workflow.StepExceptionTypeEnum(*args, value=<object object>, **kwargs)[source]#
Bases:
EnumStep ExceptionType enum.
- SERVICE_FAULT = 'Step.SERVICE_FAULT'#
- THROTTLING = 'Step.THROTTLING'#
- class sagemaker.mlops.workflow.StepRetryPolicy(exception_types: List[StepExceptionTypeEnum], backoff_rate: float = 2.0, interval_seconds: int = 1, max_attempts: int | None = None, expire_after_mins: int | None = None)[source]#
Bases:
RetryPolicyRetryPolicy for a retryable step. The pipeline service will retry
sagemaker.workflow.retry.StepRetryExceptionTypeEnum.SERVICE_FAULT and sagemaker.workflow.retry.StepRetryExceptionTypeEnum.THROTTLING regardless of pipeline step type by default. However, for step defined as retryable, you can override them by specifying a StepRetryPolicy.
- exception_types#
the exception types to match for this policy
- Type:
List[StepExceptionTypeEnum]
- backoff_rate#
The multiplier by which the retry interval increases during each attempt (default: 2.0)
- Type:
float
- interval_seconds#
An integer that represents the number of seconds before the first retry attempt (default: 1)
- Type:
int
- max_attempts#
A positive integer that represents the maximum number of retry attempts. (default: None)
- Type:
int
- expire_after_mins#
A positive integer that represents the maximum minute to expire any further retry attempt (default: None)
- Type:
int
- class sagemaker.mlops.workflow.StepTypeEnum(value)[source]#
Bases:
EnumEnum of Step types.
- AUTOML = 'AutoML'#
- CALLBACK = 'Callback'#
- CLARIFY_CHECK = 'ClarifyCheck'#
- CONDITION = 'Condition'#
- CREATE_MODEL = 'Model'#
- EMR = 'EMR'#
- FAIL = 'Fail'#
- LAMBDA = 'Lambda'#
- PROCESSING = 'Processing'#
- QUALITY_CHECK = 'QualityCheck'#
- REGISTER_MODEL = 'RegisterModel'#
- TRAINING = 'Training'#
- TRANSFORM = 'Transform'#
- TUNING = 'Tuning'#
- class sagemaker.mlops.workflow.TrainingStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#
Bases:
ConfigurableRetryStepTrainingStep for SageMaker Pipelines Workflows.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dictionary that is used to call create_training_job.
NOTE: The CreateTrainingJob request is not quite the args list that workflow needs.
- property properties#
A Properties object representing the DescribeTrainingJobResponse data model.
- class sagemaker.mlops.workflow.TransformStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#
Bases:
ConfigurableRetryStepTransformStep for SageMaker Pipelines Workflows.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dictionary that is used to call create_transform_job.
NOTE: The CreateTransformJob request is not quite the args list that workflow needs. ExperimentConfig cannot be included in the arguments.
- property properties#
A Properties object representing the DescribeTransformJobResponse data model.
- class sagemaker.mlops.workflow.Trigger(name: str | None = None, enabled: bool | None = True)[source]#
Bases:
objectAbstract class representing a Pipeline Trigger
- name#
The name of the trigger, default to pipeline_name.
- Type:
str
- enabled#
The state of the schedule, default True resolves to ‘ENABLED’.
- Type:
boolean
- enabled: bool | None#
- name: str | None#
- class sagemaker.mlops.workflow.TuningStep(name: str, step_args: _JobStepArguments | None = None, display_name: str | None = None, description: str | None = None, cache_config: CacheConfig | None = None, depends_on: List[str | Step] | None = None, retry_policies: List[RetryPolicy] | None = None)[source]#
Bases:
ConfigurableRetryStepTuningStep for SageMaker Pipelines Workflows.
- property arguments: Dict[str, Any] | List[Dict[str, Any]]#
The arguments dictionary that is used to call create_hyper_parameter_tuning_job.
- NOTE: The CreateHyperParameterTuningJob request is not quite the
args list that workflow needs.
- get_top_model_s3_uri(top_k: int, s3_bucket: str, prefix: str = '') Join[source]#
Get the model artifact S3 URI from the top performing training jobs.
- Parameters:
top_k (int) – The index of the top performing training job tuning step stores up to 50 top performing training jobs. A valid top_k value is from 0 to 49. The best training job model is at index 0.
s3_bucket (str) – The S3 bucket to store the training job output artifact.
prefix (str) – The S3 key prefix to store the training job output artifact.
- property properties#
A Properties object
A Properties object representing DescribeHyperParameterTuningJobResponse and ListTrainingJobsForHyperParameterTuningJobResponse data model.
Local Development#
Local pipeline execution for SageMaker MLOps.
- class sagemaker.mlops.local.LocalPipelineSession(*args, **kwargs)[source]#
Bases:
LocalSessionExtends LocalSession with pipeline execution capabilities.
This class provides local pipeline execution functionality that was previously in LocalSession. It’s now in the MLOps package since pipeline orchestration is an MLOps concern.
- Usage:
from sagemaker.mlops.local import LocalPipelineSession from sagemaker.mlops.workflow import Pipeline
session = LocalPipelineSession() session.create_pipeline(pipeline, “My pipeline”)
- create_pipeline(pipeline, pipeline_description, **kwargs)[source]#
Create a local pipeline.
- Parameters:
pipeline (Pipeline) – Pipeline object
pipeline_description (str) – Description of the pipeline
- Returns:
Pipeline metadata (PipelineArn)
- delete_pipeline(PipelineName)[source]#
Delete the local pipeline.
- Parameters:
PipelineName (str) – Name of the pipeline
- Returns:
Pipeline metadata (PipelineArn)
- describe_pipeline(PipelineName)[source]#
Describe the pipeline.
- Parameters:
PipelineName (str) – Name of the pipeline
- Returns:
Pipeline metadata (PipelineArn, PipelineDefinition, LastModifiedTime, etc)