Amazon SageMaker Model Building Pipeline

Background

Amazon SageMaker Model Building Pipelines is the first purpose-built, easy-to-use continuous integration and continuous delivery (CI/CD) service for machine learning (ML). With SageMaker Pipelines, you can create, automate, and manage end-to-end ML workflows at scale.

A pipeline is a series of interconnected steps that is defined by a JSON pipeline definition. The SageMaker Python SDK offers convenient abstractions to help construct a pipeline with ease. The following pages include code examples for some of the most common SageMaker Pipelines use cases.

For a higher-level conceptual guide and more information on integration with additional AWS services and features, see :Amazon SageMaker Model Building Pipelines in the Amazon SageMaker Developer Guide.

Code Examples

Pipeline Session

Pipeline Session helps manage AWS service integrations during pipeline creation.

Pipeline Session is an extension of SageMaker Session, which manages the interactions between SageMaker APIs and AWS services like Amazon S3. SageMaker Session provides convenient methods for manipulating entities and resources that Amazon SageMaker uses, such as training jobs, endpoints, and S3 input datasets. AWS service calls are delegated to an underlying Boto3 session, which by default is initialized using the AWS configuration chain.

The following example shows how to construct an estimator and start a TrainingJob with SageMaker Session:

pytorch_estimator = PyTorch(
    *sagemaker_session=sagemaker.Session(),*
    role=sagemaker.get_execution_role(),
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="1.8.0",
    py_version="py36",
    entry_point="./entry_point.py",
)
pytorch_estimator.fit(
    inputs=TrainingInput(s3_data="s3://my-bucket/my-data/train"),
)

The sagemaker.estimator.EstimatorBase.fit method will call the underlying SageMaker CreateTrainingJob API to start a TrainingJob immediately. When composing a pipeline to run a training job, one need to define a sagemaker.workflow.steps.TrainingStep first, and we need the training job to be started only when this sagemaker.workflow.steps.TrainingStep gets executed during a pipeline execution. This is where the pipeline session sagemaker.workflow.pipeline_context.PipelineSession came in.

pytorch_estimator = PyTorch(
    sagemaker_session=sagemaker.Session(),
    role=sagemaker.get_execution_role(),
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="1.8.0",
    py_version="py36",
    entry_point="./entry_point.py",
)
pytorch_estimator.fit(
    inputs=TrainingInput(s3_data="s3://my-bucket/my-data/train"),
)
from sagemaker.workflow.pipeline_context import PipelineSession

pytorch_estimator = PyTorch(
    sagemaker_session=PipelineSession(),
    role=sagemaker.get_execution_role(),
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="1.8.0",
    py_version="py36",
    entry_point="./entry_point.py",
)

step = TrainingStep(
    name="MyTrainingStep",
    // code just like how you trigger a training job before,
    // pipeline session will take care of delaying the start
    // of the training job during pipeline execution.
    step_args=pytorch_estimator.fit(
        inputs=TrainingInput(s3_data="s3://my-bucket/my-data/train"),
    ),
    displayName="MyTrainingStepDisplayName",
    description="This is MyTrainingStep",
    cache_config=CacheConfig(...),
    retry_policies=[...],
    depends_on=[...],
)

When you use sagemaker.workflow.pipeline_context.PipelineSession rather than sagemaker.session.Session, the .fit method does not immediately start a training job. Instead, the .fit method delays the request to call CreateTrainingJob, so that you can first define your sagemaker.workflow.steps.TrainingStep.

Warning

A sagemaker.workflow.pipeline_context.PipelineSession must be given in order to start the job during pipeline execution time. Otherwise, a training job will get started immediately.

Local Pipeline Session

Like Pipeline Session, Local Pipeline Session provides a convenient way to capture input job arguments without starting the job. These input arguments can be provided in the step_args parameter to their corresponding Pipelines step type. The difference between sagemaker.workflow.pipeline_context.PipelineSession and sagemaker.workflow.pipeline_context.LocalPipelineSession is that sagemaker.workflow.pipeline_context.LocalPipelineSession is used to run SageMaker pipelines locally (in local mode) whereas using sagemaker.workflow.pipeline_context.PipelineSession runs the job on the managed service.

from sagemaker.workflow.pipeline_context import LocalPipelineSession

local_pipeline_session = LocalPipelineSession()

pytorch_estimator = PyTorch(
    sagemaker_session=local_pipeline_session,
    role=sagemaker.get_execution_role(),
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="1.8.0",
    py_version="py36",
    entry_point="./entry_point.py",
)

step = TrainingStep(
    name="MyTrainingStep",
    step_args=pytorch_estimator.fit(
        inputs=TrainingInput(s3_data="s3://my-bucket/my-data/train"),
    )
)

pipeline = Pipeline(
    name="MyPipeline",
    steps=[step],
    sagemaker_session=local_pipeline_session
)

pipeline.create(
    role_arn=sagemaker.get_execution_role(),
    description="local pipeline example"
)

// pipeline will execute locally
pipeline.start()

steps = pipeline.list_steps()

training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']

step_outputs = pipeline_session.sagemaker_client.describe_training_job(TrainingJobName = training_job_name)

Pipeline Parameters

You can parameterize your pipeline definition using parameters. You can reference parameters that you define throughout your pipeline definition. Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution.

Here is an example:

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean,
)
from sagemaker.workflow.pipeline_context import PipelineSession

session = PipelineSession()

instance_count = ParameterInteger(name="InstanceCount", default_value=2)
app_managed = ParameterBoolean(name="AppManaged", default_value=False)

inputs = [
    ProcessingInput(
        source="s3://my-bucket/sourcefile",
        destination="/opt/ml/processing/inputs/",
        app_managed=app_managed
    ),
]

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker.get_execution_role(),
    instance_type="ml.m5.xlarge",
    instance_count=instance_count,
    command=["python3"],
    sagemaker_session=session,
    base_job_name="test-sklearn",
)

step_sklearn = ProcessingStep(
    name="MyProcessingStep",
    step_args=sklearn_processor.run(
        inputs=inputs, code="./my-local/script.py"
    ),
)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[instance_count, app_managed],
    steps=[step_sklearn],
    sagemaker_session=session,
)

# you can override the default parameter values
pipeline.start({
   "InstanceCount": 2,
   "AppManaged": True,
})

Step Dependencies

There are two types of step dependencies: a data dependency and a custom dependency. To create data dependencies between steps, pass the properties or the outputs of one step as the input to another step in the pipeline. This is called property reference. Alternatively, you can specify a custom dependency to make sure that a pipeline execution does not start a new step until all dependent steps are completed.

Data Dependency — Property Reference

A step property is an attribute of a step that represents the output values from a step execution. For example, TrainingStep.Properties.TrainingJobName is a property of a sagemaker.workflow.steps.TrainingStep.

For a step that references a SageMaker job (e.g. sagemaker.workflow.steps.ProcessingStep, sagemaker.workflow.steps.TrainingStep, or sagemaker.workflow.steps.TransformStep), the step property matches the attributes of that SageMaker job. For example, sagemaker.workflow.steps.TrainingStep. properties match the attributes that result from calling DescribeTrainingJob. TrainingJobName is an attribute from a DescribeTrainingJob result. Therefore, TrainingJobName is a sagemaker.workflow.steps.TrainingStep property, and can be referenced as TrainingStep.Properties.TrainingJobName.

You can build data dependencies from one step to another using this kind of property reference. These data dependencies are then used by SageMaker Pipelines to construct the directed acyclic graph (DAG) from the pipeline definition. These properties can be referenced as placeholder values and are resolved at runtime.

For each step type you can refer to the following properties for data dependency creation:

TrainingStep

Referable Property List:

Example:

step_train = TrainingStep(...)
model = Model(
    image_uri="my-dummy-image",
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    ...
)
# assume your training job will produce a metric called "val:acc"
# and you would like to use it to demtermine if you want to create
# a SageMaker Model for it.
step_condition = ConditionStep(
    conditions = [
        ConditionGreaterThanOrEqualTo(
            left=step_train.properties.FinalMetricDataList['val:acc'].Value
            right=0.95
    )],
    if_steps = [step_model_create],
)
ProcessingStep

Referable Property List:

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=PipelineSession(),
    role=sagemaker.get_execution_role(),
)

step_process = ProcessingStep(
    name="MyProcessingStep",
    ...,
    step_args = sklearn_processor.run(
        ...,
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ],
        code="./local/preprocess.py",
        arguments=["--input-data", "s3://my-input"]
    ),
)

step_args = estimator.fit(inputs=TrainingInput(
    s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
))
TransformStep

Referable Property List:

DescribeTransformJob

step_transform = TransformStep(...)
transform_output = step_transform.TransformOutput.S3OutputPath
TuningStep

Referable Property List:

Example:

bucket = "my-bucket"
model_prefix = "my-model"

step_tune = TuningStep(...)
# tuning step can launch multiple training jobs, thus producing multiple model artifacts
# we can create a model with the best performance
best_model = Model(
    model_data=Join(
        on="/",
        values=[
            f"s3://{bucket}/{model_prefix}",
            # from DescribeHyperParameterTuningJob
            step_tune.properties.BestTrainingJob.TrainingJobName,
            "output/model.tar.gz",
        ],
    )
)
# we can also access any top-k best as we wish
second_best_model = Model(
    model_data=Join(
        on="/",
        values=[
            f"s3://{bucket}/{model_prefix}",
            # from ListTrainingJobsForHyperParameterTuningJob
            step_tune.properties.TrainingJobSummaries[1].TrainingJobName,
            "output/model.tar.gz",
        ],
    )
)

sagemaker.workflow.steps.TuningStep also has a helper function to generate any top-k model data URI easily:

model_data = step_tune.get_top_model_s3_uri(
    top_k=0, # best model
    s3_bucket=bucket,
    prefix=model_prefix
)
AutoMLStep

Referable Property List:

Example:

step_automl = AutoMLStep(...)

auto_ml_model = step_automl.get_best_model(<role>)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=auto_ml_step.properties.BestCandidateProperties.ModelInsightsJsonReportPath,
        content_type="application/json",
    ),
    explainability=MetricsSource(
        s3_uri=auto_ml_step.properties.BestCandidateProperties.ExplainabilityJsonReportPath,
        content_type="application/json",
    )
)

step_args_register_model = auto_ml_model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.large"],
transform_instances=["ml.m5.large"],
model_package_group_name="My model package group name",
approval_status="PendingManualApproval",
model_metrics=model_metrics,
)

step_register_model = ModelStep(
    name="auto-ml-model-register",
    step_args=step_args_register_model,
)
CreateModelStep

Referable Property List:

Example:

step_model = CreateModelStep(...)
model_data = step_model.PrimaryContainer.ModelDataUrl
LambdaStep

Referable Property List:

  • OutputParameters: A list of key-value pairs OutputParameter as the output of the Lambda execution.

Example:

str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)

step_lambda = LambdaStep(
    name="MyLambdaStep",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda",
        session=PipelineSession(),
    ),
    inputs={"arg1": "foo", "arg2": 5},
    outputs=[
        str_outputParam, int_outputParam, bool_outputParam, float_outputParam
   ],
)
output_ref = step_lambda.properties.Outputs["output1"]

Where the lambda function with arn arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda should output like this:

def handler(event, context):
    ...
    return {
        "output1": "string_value",
        "output2": 1,
        "output3": True,
        "output4": 2.0,
    }

Note that the output parameters can not be nested. Otherwise, the value will be treated as a single string. For instance, if your lambda outputs

{
    "output1": {
        "nested_output1": "my-output"
    }
}

This will be resolved as {"output1": "{"nested_output1":"my-output"}"} by which if you refer step_lambda.properties.Outputs["output1"]["nested_output1"] later, a non-retryable client error will be thrown.

CallbackStep

Referable Property List:

Example:

param = ParameterInteger(name="MyInt")
outputParam = CallbackOutput(output_name="output1", output_type=CallbackOutputTypeEnum.String)
step_callback = CallbackStep(
    name="MyCallbackStep",
    depends_on=["TestStep"],
    sqs_queue_url="https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue",
    inputs={"arg1": "foo", "arg2": 5, "arg3": param},
    outputs=[outputParam],
)
output_ref = step_callback.properties.Outputs["output1]

The output parameters cannot be nested. If the values are nested, they will be treated as a single string value. For example, a nested output value of

{
    "output1": {
        "nested_output1": "my-output"
    }
}

is resolved as {"output1": "{"nested_output1":"my-output"}"}. If you try to refer to step_callback.properties.Outputs["output1"]["nested_output1"] this will throw a non-retryable client error.

QualityCheckStep

Referable Property List:

  • CalculatedBaselineConstraints: The baseline constraints file calculated by the underlying Model Monitor container.

  • CalculatedBaselineStatistics: The baseline statistics file calculated by the underlying Model Monitor container.

  • BaselineUsedForDriftCheckStatistics & BaselineUsedForDriftCheckConstraints: These are the two properties used to set drift_check_baseline in the Model Registry. The values set in these properties vary depending on the parameters passed to the step.

ClarifyCheckStep

Referable Property List:

  • CalculatedBaselineConstraints: The baseline constraints file calculated by the underlying Clarify container.

  • BaselineUsedForDriftCheckConstraints: This property is used to set drift_check_baseline in the Model Registry. The values set in this property will vary depending on the parameters passed to the step.

More examples about QualityCheckStep and ClarifyCheckStep can be found in SageMaker Pipelines integration with Model Monitor and Clarify notebook

EMRStep

Referable Property List:

  • ClusterId: The Id of the EMR cluster.

You can see more details at AWS official developer guide for Step Introductions

Custom Dependency

To build a custom dependency, simply add the desired step or steps to another step’s depends_on attribute as follows:

step_1 = ProcessingStep(
    name="MyProcessingStep",
    step_args=sklearn_processor.run(
        inputs=inputs,
        code="./my-local/my-first-script.py"
    ),
)

step_2 = ProcessingStep(
    name="MyProcessingStep",
    step_args=sklearn_processor.run(
        inputs=inputs,
        code="./my-local/my-second-script.py"
    ),
    depends_on=[step_1.name],
)

In this case, step_2 will start only when step_1 is done.

Property File

A sagemaker.workflow.properties.PropertyFile is designed to store information that is output from sagemaker.workflow.steps.ProcessingStep. The sagemaker.workflow.functions.JsonGet function processes a property file . You can use JsonPath notation to query the information.

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="hyperparam", source="/opt/ml/processing/evaluation"),
    ],
    code="./local/preprocess.py",
    arguments=["--input-data", "s3://my-input"],
)

hyperparam_report = PropertyFile(
    name="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

step_process = ProcessingStep(
   name="PreprocessAbaloneData",
   step_args=step_args,
   property_files=[hyperparam_report],
)

To retrieve a file produced by the sagemaker.workflow.steps.ProcessingStep as a property file, the ProcessingOutput.output_name and the PropertyFile.output_name values must be the same. For this example, assume that the hyperparam.json value produced by the ProcessingStep in the /opt/ml/processing/evaluation directory looks similar to the following:

{
    "hyperparam": {
        "eta": {
            "value": 0.6
        }
    }
}

Then, you can query this value using sagemaker.workflow.functions.JsonGet and use the value for any subsequent steps:

eta = JsonGet(
 step_name=step_process.name,
 property_file=hyperparam_report,
 json_path="hyperparam.eta.value",
)

Conditions

Condition step is used to evaluate the condition of step properties to assess which action should be taken next in the pipeline. It takes a list of conditions, and a list steps to execute if all conditions are evaluated to be true, and another list of steps to execute otherwise. For instance:

step_condition = ConditionStep(
    # The conditions are evaluated with operator AND
    conditions = [condition_1, condition_2, condition_3, condition_4],
    if_steps = [step_register],
    else_steps = [step_fail],
)

There are eight types of condition are supported, they are:

sagemaker.workflow.properties.PropertyFile and sagemaker.workflow.functions.JsonGet introduced above is particularly handy when used together with conditions. Here is an example:

step_train = TrainingStep(...)
model = Model(
    image_uri="my-dummy-image",
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    ...
)

step_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="hyperparam", source="/opt/ml/processing/evaluation"),
    ],
    code="./local/preprocess.py",
    arguments=["--input-data", "s3://my-input"],
)

eval_report = PropertyFile(
    name="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    step_args=step_args,
    property_files=[eval_report],
)

eval_score = JsonGet(
    step_name=step_process.name,
    property_file=eval_report,
    json_path="eval.accuracy",
)

# register the model if evaluation score is satisfactory
register_arg = model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name="my-model-pkg-name",
    approval_status="Approved",
)
step_register = ModelStep(
    name="MyModelCreationStep",
    step_args=register_arg,
)
# otherwise, transit to a failure step
step_fail = FailStep(name="FailStep", ...)

cond = ConditionStep(
   conditions = [ConditionGreaterThanOrEqualTo(left=eval_score, right=0.95)],
   if_steps = [step_register],
   else_steps = [step_fail],
)

Pipeline Functions

Several pipeline built-in functions are offered to help you compose your pipeline. Use pipeline functions to assign values to properties that are not available until pipeline execution time.

Join

Use the sagemaker.workflow.functions.Join function to join a list of properties. For example, you can use Join to construct an S3 URI that can only be evaluated at run time, and use that URI to construct the sagemaker.workflow.steps.TrainingStep at compile time.

bucket = ParameterString('bucket', default_value='my-bucket')

input_uri = Join(
    on="/",
    values=['s3:/', bucket, "my-input")]
)

step = TrainingStep(
    name="MyTrainingStep",
    run_args=estimator.fit(inputs=TrainingInput(s3_data=input_uri)),
)

JsonGet

Use sagemaker.workflow.functions.JsonGet to extract a Json property from a sagemaker.workflow.properties.PropertyFile produced by a sagemaker.workflow.steps.ProcessingStep, and pass it to subsequent steps. The following example retrieves a hyperparameter value from the sagemaker.workflow.properties.PropertyFile, and pass it to a subsequent sagemaker.workflow.steps.TrainingStep

session = PipelineSession()

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ProcessingOutput(output_name="hyperparam", source="/opt/ml/processing/evaluation"),
    ],
    code="./local/preprocess.py",
    arguments=["--input-data", "s3://my-input"],
)

hyperparam_report = PropertyFile(
    name="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

step_process = ProcessingStep(
    name="PreprocessAbaloneData",
   step_args=step_args,
    property_files=[hyperparam_report],
)

xgb_train = Estimator(
    image_uri="s3://my-image-uri",
    instance_type="ml.c5.xlarge",
    instance_count=1,
    output_path="s3://my-output-path",
    base_job_name="abalone-train",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

eta = JsonGet(
 step_name=step_process.name,
 property_file=hyperparam_report,
 json_path="hyperparam.eta.value",
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=eta,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

step_args = xgb_train.fit(inputs={
    "train": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "validation"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
},)

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=step_args,
)

Execution Variable

There are a number of properties for a pipeline execution that can only be resolved at run time. However, they can be accessed at compile time using execution variables.

  • sagemaker.workflow.execution_variables.ExecutionVariables.START_DATETIME: The start time of an execution.

  • sagemaker.workflow.execution_variables.ExecutionVariables.CURRENT_DATETIME: The time when the variable is being evaluated during an execution.

  • sagemaker.workflow.execution_variables.ExecutionVariables.PIPELINE_EXECUTION_ID: The Id of an execution.

  • sagemaker.workflow.execution_variables.ExecutionVariables.PIPELINE_EXECUTION_ARN: The execution ARN for an execution.

  • sagemaker.workflow.execution_variables.ExecutionVariables.PIPELINE_NAME: The name of the pipeline.

  • sagemaker.workflow.execution_variables.ExecutionVariables.PIPELINE_ARN: The ARN of the pipeline.

  • sagemaker.workflow.execution_variables.ExecutionVariables.TRAINING_JOB_NAME: The name of the training job launched by the training step.

  • sagemaker.workflow.execution_variables.ExecutionVariables.PROCESSING_JOB_NAME: The name of the processing job launched by the processing step.

You can use these execution variables as you see fit. The following example uses the START_DATETIME execution variable to construct a processing output path:

bucket = ParameterString('bucket', default_value='my-bucket')

output_path = Join(
    on="/",
    values=['s3:/', bucket, 'my-train-output-', ExecutionVariables.START_DATETIME])]
)

step = ProcessingStep(
    name="MyTrainingStep",
    step_args=processor.fit(
        inputs=ProcessingInput(source="s3://my-input"),
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/train",
                destination=output_path,
            ),
        ],
    ),
)

Step Parallelism

When a step does not depend on any other step, it is run immediately upon pipeline execution. However, executing too many pipeline steps in parallel can quickly exhaust available resources. Control the number of concurrent steps for a pipeline execution with sagemaker.workflow.parallelism_config.ParallelismConfiguration.

The following example uses sagemaker.workflow.parallelism_config.ParallelismConfiguration to set the concurrent step limit to five.

pipeline.create(
    parallelism_config=ParallelismConfiguration(5),
)

Caching Configuration

Executing the step without changing its configurations, inputs, or outputs can be a waste. Thus, we can enable caching for pipeline steps. When you use step signature caching, SageMaker Pipelines tries to use a previous run of your current pipeline step instead of running the step again. When previous runs are considered for reuse, certain arguments from the step are evaluated to see if any have changed. If any of these arguments have been updated, the step will execute again with the new configuration.

When you turn on caching, you supply an expiration time (in ISO8601 duration string format). The expiration time indicates how old a previous execution can be to be considered for reuse.

cache_config = CacheConfig(
    enable_caching=True,
    expire_after="P30d" # 30-day
)

You can format your ISO8601 duration strings like the following examples:

  • p30d: 30 days

  • P4DT12H: 4 days and 12 hours

  • T12H: 12 hours

Caching is supported for the following step types:

In order to create pipeline steps and eventually construct a SageMaker pipeline, you provide parameters within a Python script or notebook. The SageMaker Python SDK creates a pipeline definition by translating these parameters into SageMaker job attributes. Some of these attributes, when changed, cause the step to re-run (See Caching Pipeline Steps for a detailed list). Therefore, if you update a SDK parameter that is used to create such an attribute, the step will rerun. See the following discussion for examples of this in processing and training steps, which are commonly used steps in Pipelines.

The following example creates a processing step:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.dataset_definition.inputs import S3Input
from sagemaker.processing import ProcessingInput, ProcessingOutput

pipeline_session = PipelineSession()

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    role=role,
    sagemaker_session=pipeline_session
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(
            source="artifacts/data/abalone-dataset.csv",
            input_name="abalone-dataset",
            s3_input=S3Input(
                local_path="/opt/ml/processing/input",
                s3_uri="artifacts/data/abalone-dataset.csv",
                s3_data_type="S3Prefix",
                s3_input_mode="File",
                s3_data_distribution_type="FullyReplicated",
                s3_compression_type="None",
            )
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="artifacts/code/process/preprocessing.py",
)

processing_step = ProcessingStep(
    name="Process",
    step_args=processor_args,
    cache_config=cache_config
)

The following parameters from the example cause additional processing step iterations when you change them:

  • framework_version: This parameter is used to construct the image_uri for the AppSpecification attribute of the processing job.

  • inputs: Any ProcessingInputs are passed through directly as job ProcessingInputs. Input source files that exist in the container’s local file system are uploaded to S3 and given a new S3_Uri. If the S3 path changes, a new processing job is initiated. For examples of S3 paths, see the S3 Artifact Folder Structure section.

  • code: The code parameter is also packaged as a ProcessingInput job. For local files, a unique hash is created from the file. The file is then uploaded to S3 with the hash included in the path. When a different local file is used, a new hash is created and the S3 path for that ProcessingInput changes, initiating a new step run. For examples S3 paths, see the S3 Artifact Folder Structure section.

The following example creates a training step:

from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import TrainingStep

pipeline_session = PipelineSession()

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

hyperparameters = {
    "dataset_frequency": "H",
    "timestamp_format": "yyyy-MM-dd hh:mm:ss",
    "number_of_backtest_windows": "1",
    "role_arn": role_arn,
    "region": region,
}

sklearn_estimator = SKLearn(
    entry_point="train.py",
    role=role_arn,
    image_uri=container_image_uri,
    instance_type=training_instance_type,
    sagemaker_session=pipeline_session,
    base_job_name="training_job",
    hyperparameters=hyperparameters,
    enable_sagemaker_metrics=True,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

training_step = TrainingStep(
    name="Train",
    estimator=sklearn_estimator,
    cache_config=cache_config
)

The following parameters from the example cause additional training step iterations when you change them:

  • image_uri: The image_uri parameter defines the image used for training, and is used directly in the AlgorithmSpecification attribute of the training job.

  • hyperparameters: All of the hyperparameters are used directly in the HyperParameters attribute for the training job.

  • entry_point: The entry point file is included in the training job’s InputDataConfig Channel array. A unique hash is created from the file (and any other dependencies), and then the file is uploaded to S3 with the hash included in the path. When a different entry point file is used, a new hash is created and the S3 path for that InputDataConfig Channel object changes, initiating a new step run. For examples of what the S3 paths look like, see the S3 Artifact Folder Structure section.

  • inputs: The inputs are also included in the training job’s InputDataConfig. Local inputs are uploaded to S3. If the S3 path changes, a new training job is initiated. For examples of S3 paths, see the S3 Artifact Folder Structure section.

S3 Artifact Folder Structure

You use the following S3 paths when uploading local input and code artifacts, and when saving output artifacts.

Processing

  • Code: s3://bucket_name/pipeline_name/code/<code_hash>/file.py. The file could also be a tar.gz of source_dir and dependencies.

  • Input Data: s3://bucket_name/pipeline_name/step_name/input/input_name/file.csv

  • Configuration: s3://bucket_name/pipeline_name/step_name/input/conf/<configuration_hash>/configuration.json

  • Output: s3://bucket_name/pipeline_name/<execution_id>/step_name/output/output_name

Training

  • Code: s3://bucket_name/code_location/pipeline_name/code/<code_hash>/code.tar.gz

  • Output: The output paths for Training jobs can vary - the default output path is the root of the s3 bucket: s3://bucket_name. For Training jobs created from a Tuning job, the default path includes the Training job name created by the Training platform, formatted as s3://bucket_name/<training_job_name>/output/model.tar.gz.

Transform

  • Output: s3://bucket_name/pipeline_name/<execution_id>/step_name

Warning

For input artifacts such as data or code files, the actual content of the artifacts is not tracked, only the S3 path. This means that if a file in S3 is updated and re-uploaded directly with an identical name and path, then the step does NOT run again.

Retry Policy

We can configure step wise retry behavior for certain step types. During a pipeline step execution, there are two points in which you might encounter errors.

  1. You might encounter errors when trying to create or start a SageMaker job like a ProcessingJob or TrainingJob.

  2. You might encounter errors when a SageMaker job like a ProcessingJob or TrainingJob. finishes with failures.

There are two types of retry policies to handle these scenarios:

  • sagemaker.workflow.retry.StepRetryPolicy

  • sagemaker.workflow.retry.SageMakerJobStepRetryPolicy

The StepRetryPolicy is used if service faults (like a network issue) or throttling are recognized when creating a SageMaker job.

StepRetryPolicy(
    exception_types=[
        StepExceptionTypeEnum.SERVICE_FAULT,
        StepExceptionTypeEnum.THROTTLING,
    ],
    expire_after_min=5,
    interval_seconds=10,
    backoff_rate=2.0
)

Note: A pipeline step type that supports the StepRetryPolicy will attempt exponential retries with a one-second interval by default for service faults and throttling. This behavior can be overridden using the policy above.

The SageMakerJobStepRetryPolicy is used if a failure reason is given after a job is done. There are many reasons why a job can fail. The SageMakerJobStepRetryPolicy supports retry configuration for the following failures:

  • SageMaker.JOB_INTERNAL_ERROR

  • SageMaker.CAPACITY_ERROR

  • SageMaker.RESOURCE_LIMIT

The following example specifies that a SageMaker job should retry if it fails due to a resource limit exception. The job will retry exponentially, starting at an interval of 60 seconds, and will only attempt to retry this job for two hours total.

SageMakerJobStepRetryPolicy(
    exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT]
    expire_after_min=120,
    interval_seconds=60,
    backoff_rate=2.0
)

For more information, see Retry Policy for Pipeline Steps in the Amazon SageMaker Developer Guide.

Model Step

After model artifacts are produced by either a TrainingJob or TuningJob, you might want to create a SageMaker Model, or register that model to SageMaker Model Registry. This is where the sagemaker.workflow.model_step.ModelStep comes in.

Follow the example below to create a SageMaker Model and register it to SageMaker Model Registry using sagemaker.workflow.model_step.ModelStep.

step_train = TrainingStep(...)
model = Model(
    image_uri=pytorch_estimator.training_image_uri(),
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

# we might also want to create a SageMaker Model
step_model_create = ModelStep(
   name="MyModelCreationStep",
   step_args=model.create(instance_type="ml.m5.xlarge"),
)

# in addition, we might also want to register a model to SageMaker Model Registry
register_model_step_args = model.register(
    content_types=["*"],
    response_types=["application/json"],
    inference_instances=["ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    description="MyModelPackage",
)

step_model_registration = ModelStep(
   name="MyModelRegistration",
   step_args=register_model_step_args,
)
...

When model repacking is needed, sagemaker.workflow.model_step.ModelStep. is a collection of steps. Model repacking unpacks model data, creates a new model tarball file that includes any custom inference scripts, and uploads this tarball file to Amazon S3. Once a model is repacked, it is ready to deploy to an endpoint or be registered as a model package.

sagemaker.workflow.model_step.ModelStep uses the provided inputs to automatically detect if a repack is needed. If a repack is needed, sagemaker.workflow.steps.TrainingStep is added to the step collection for that repack. Then, either sagemaker.workflow.steps.CreateModelStep or sagemaker.workflow.step_collections.RegisterModelStep will be chained after it.

MonitorBatchTransform Step

MonitorBatchTransformStep is a new step type that allows customers to use SageMaker Model Monitor with batch transform jobs that are a part of their pipeline. Using this step, customers can set up the following monitors for their batch transform job: data quality, model quality, model bias, and feature attribution.

When configuring this step, customers have the flexibility to run the monitoring job before or after the transform job executes. There is an additional flag called fail_on_violation which will fail the step if set to true and there is a monitoring violation, or will continue to execute the step if set to false.

Here is an example showing you how to configure a sagemaker.workflow.monitor_batch_transform_step.MonitorBatchTransformStep with a Data Quality monitor.

from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.transformer import Transformer
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig

from sagemaker.workflow.parameters import ParameterString

pipeline_session = PipelineSession()

transform_input_param = ParameterString(
    name="transform_input",
    default_value=f"s3://my-bucket/my-prefix/my-transform-input",
)

# the resource configuration for the monitoring job
job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    ...
)

The following code sample demonstrates how to set up an on-demand batch transform data quality monitor:

# configure your transformer
transformer = Transformer(..., sagemaker_session=pipeline_session)
transform_arg = transformer.transform(
    transform_input_param,
    content_type="text/csv",
    split_type="Line",
    ...
)

data_quality_config = DataQualityCheckConfig(
    baseline_dataset=transform_input_param,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri="s3://my-report-path",
)

from sagemaker.workflow.monitor_batch_transform_step import MonitorBatchTransformStep

transform_and_monitor_step = MonitorBatchTransformStep(
    name="MyMonitorBatchTransformStep",
    transform_step_args=transform_arg,
    monitor_configuration=data_quality_config,
    check_job_configuration=job_config,
    # since data quality only looks at the inputs,
    # so there is no need to wait for the transform output.
    monitor_before_transform=True,
    # if violation is detected in the monitoring, and you want to skip it
    # and continue running batch transform, you can set fail_on_violation
    # to false.
    fail_on_violation=False,
    supplied_baseline_statistics="s3://my-baseline-statistics.json",
    supplied_baseline_constraints="s3://my-baseline-constraints.json",
)
...

The same example can be extended for model quality, bias, and feature attribute monitoring.

Warning

Note that to run on-demand model quality, you will need to have the ground truth data ready. When running the transform job, include the ground truth inside your transform input, and join the transform inference input and output. Then you can indicate which attribute or column name/index points to the ground truth when run the monitoring job.

transformer = Transformer(..., sagemaker_session=pipeline_session)

transform_arg = transformer.transform(
    transform_input_param,
    content_type="text/csv",
    split_type="Line",
    # Note that we need to join both the inference input and output
    # into transform outputs. The inference input needs to have the ground truth.
    # details can be found here
    # https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html
    join_source="Input",
    # We need to exclude the ground truth inside the inference input
    # before passing it to the prediction model.
    # Assume the first column of our csv file is the ground truth
    input_filter="$[1:]",
    ...
)

model_quality_config = ModelQualityCheckConfig(
    baseline_dataset=transformer.output_path,
    problem_type="BinaryClassification",
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri="s3://my-output",
    # assume the model output is at column idx 10
    inference_attribute="_c10",
    # As pointed out previously, the first column is the ground truth.
    ground_truth_attribute="_c0",
)
from sagemaker.workflow.monitor_batch_transform_step import MonitorBatchTransformStep

transform_and_monitor_step = MonitorBatchTransformStep(
    name="MyMonitorBatchTransformStep",
    transform_step_args=transform_arg,
    monitor_configuration=data_quality_config,
    check_job_configuration=job_config,
    # model quality job needs the transform outputs, therefore
    # monitor_before_transform can not be true for model quality
    monitor_before_transform=False,
    fail_on_violation=True,
    supplied_baseline_statistics="s3://my-baseline-statistics.json",
    supplied_baseline_constraints="s3://my-baseline-constraints.json",
)
...

Limitations

The SageMaker Model Building Pipeline Python SDK offers abstractions to help you construct a pipeline definition at ease. However, there are certain limitations. Read on for information about known issues. If you discover additional limitations, open an issue in the sagemaker-python-sdk repository.

Parameterization

Parameterization is crucial for pipeline composition, it allows you to assign values to properties that are not available until runtime. However, there are limitations.

Incompatibility with other SageMaker Python SDK modules

Pipeline parameterization includes pipeline parameters like sagemaker.workflow.parameters.ParameterString and sagemaker.workflow.parameters.ParameterInteger, property reference, functions like sagemaker.workflow.functions.Join and sagemaker.workflow.functions.JsonGet, and pipeline execution variables. Pipeline parameterization might not be supported with 100% compatibility when used with other SageMaker Python SDK modules.

For example, when running a training job in script mode, you cannot parameterize the entry_point value for estimators inherited from sagemaker.estimator.EstimatorBase because a SageMaker EstimatorBase expects an entry_point to point to a local Python source file.

# An example of what not to do
script_path = ParameterString(name="MyScript", default="s3://my-bucket/my-script.py")
xgb_script_mode_estimator = XGBoost(
        entry_point=script_path,
        framework_version="1.5-1",
        role=role,
        ...
)

Not all arguments can be parameterized

Many arguments for class constructors or methods from other modules can be parameterized, but not all of them. For example, Inputs or outputs can be parameterized when calling processor.run.

instance_count = ParameterInteger(name="InstanceCount", default_value=2)
process_s3_input_url = ParameterString(name="ProcessingInputUrl")

processor = Processor(
   instance_type=instance_count,
   instance_count="ml.m5.xlarge",
   ...
)
processor.run(inputs=ProcessingInput(source=process_s3_input_url), ...)

However, you cannot parameterize git_config when calling processor.run. This is because the source code needs to be downloaded, packaged, and uploaded S3 at compile time and parameterization can only be evaluated at run time.

Not all built-in Python operations can be applied to parameters

Another limitation of parameterization is that not all built-in Python operations can be applied to a pipeline parameter. For example, You cannot concatenate the pipeline variables using Python primitives:

# An example of what not to do
my_string = "s3://{}/training".format(ParameterString(name="MyBucket", default_value=""))

# Another example of what not to do
int_param = str(ParameterInteger(name="MyBucket", default_value=1))

# Instead, if you want to convert the parameter to string type, do
int_param.to_string()

The concatenation example above will not work, as the sagemaker.workflow.parameters.ParameterString can only be evaluated at run time. Instead, you can concatenate parameters using sagemaker.workflow.functions.Join:

This concatenation of my_string will not work, as the parameter MyBucket can only be evaluated at run time. Instead, the same concatenation can be achieved using function sagemaker.workflow.functions.Join:

my_string = Join(on="", values=[
    "s3://",
    ParameterString(name="MyBucket", default_value=""),
    "/training"]
)

Pipeline parameters can only be evaluated at run time. If a pipeline parameter needs to be evaluated at compile time, then it will throw an exception.

Pipeline Composition Alternative

The SageMaker Python SDK provides you with tools for pipeline composition. Under the hood, it produces a pipeline definition JSON file. If you want to author the pipeline definition by hand, you can follow the SageMaker Pipeline Definition JSON Schema