Amazon SageMaker Operators in Apache Airflow

Apache Airflow

Apache Airflow is a platform that enables you to programmatically author, schedule, and monitor workflows. Using Airflow, you can build a workflow for SageMaker training, hyperparameter tuning, batch transform and endpoint deployment. You can use any SageMaker deep learning framework or Amazon algorithms to perform above operations in Airflow.

There are two ways to build a SageMaker workflow. Using Airflow SageMaker operators or using Airflow PythonOperator.

1. SageMaker Operators: In Airflow 1.10.1, the SageMaker team contributed special operators for SageMaker operations. Each operator takes a configuration dictionary that defines the corresponding operation. We provide APIs to generate the configuration dictionary in the SageMaker Python SDK. Currently, the following SageMaker operators are supported:

  • SageMakerTrainingOperator

  • SageMakerTuningOperator

  • SageMakerModelOperator

  • SageMakerTransformOperator

  • SageMakerEndpointConfigOperator

  • SageMakerEndpointOperator

2. PythonOperator: Airflow built-in operator that executes Python callables. You can use the PythonOperator to execute operations in the SageMaker Python SDK to create a SageMaker workflow.

Using Airflow on AWS

Turbine is an open-source AWS CloudFormation template that enables you to create an Airflow resource stack on AWS. You can get it here: https://github.com/villasv/aws-airflow-stack

Using Airflow SageMaker Operators

Starting with Airflow 1.10.1, you can use SageMaker operators in Airflow. All SageMaker operators take a configuration dictionary that can be generated by the SageMaker Python SDK. For example:

import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.airflow import training_config, transform_config_from_estimator

estimator = TensorFlow(entry_point='tf_train.py',
                       role='sagemaker-role',
                       framework_version='1.11.0',
                       training_steps=1000,
                       evaluation_steps=100,
                       train_instance_count=2,
                       train_instance_type='ml.p2.xlarge')

# train_config specifies SageMaker training configuration
train_config = training_config(estimator=estimator,
                               inputs=your_training_data_s3_uri)

# trans_config specifies SageMaker batch transform configuration
# task_id specifies which operator the training job associatd with; task_type specifies whether the operator is a
# training operator or tuning operator
trans_config = transform_config_from_estimator(estimator=estimator,
                                               task_id='tf_training',
                                               task_type='training',
                                               instance_count=1,
                                               instance_type='ml.m4.xlarge',
                                               data=your_transform_data_s3_uri,
                                               content_type='text/csv')

Now you can pass these configurations to the corresponding SageMaker operators and create the workflow:

import airflow
from airflow import DAG
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG('tensorflow_example', default_args=default_args,
          schedule_interval='@once')

train_op = SageMakerTrainingOperator(
    task_id='tf_training',
    config=train_config,
    wait_for_completion=True,
    dag=dag)

transform_op = SageMakerTransformOperator(
    task_id='tf_transform',
    config=trans_config,
    wait_for_completion=True,
    dag=dag)

transform_op.set_upstream(train_op)

Using Airflow Python Operator

Airflow PythonOperator is a built-in operator that can execute any Python callable. If you want to build the SageMaker workflow in a more flexible way, write your python callables for SageMaker operations by using the SageMaker Python SDK.

from sagemaker.tensorflow import TensorFlow

# callable for SageMaker training in TensorFlow
def train(data, **context):
    estimator = TensorFlow(entry_point='tf_train.py',
                           role='sagemaker-role',
                           framework_version='1.11.0',
                           training_steps=1000,
                           evaluation_steps=100,
                           train_instance_count=2,
                           train_instance_type='ml.p2.xlarge')
    estimator.fit(data)
    return estimator.latest_training_job.job_name

# callable for SageMaker batch transform
def transform(data, **context):
    training_job = context['ti'].xcom_pull(task_ids='training')
    estimator = TensorFlow.attach(training_job)
    transformer = estimator.transformer(instance_count=1, instance_type='ml.c4.xlarge')
    transformer.transform(data, content_type='text/csv')

Then build your workflow by using the PythonOperator with the Python callables defined above:

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG('tensorflow_example', default_args=default_args,
          schedule_interval='@once')

train_op = PythonOperator(
    task_id='training',
    python_callable=train,
    op_args=[training_data_s3_uri],
    provide_context=True,
    dag=dag)

transform_op = PythonOperator(
    task_id='transform',
    python_callable=transform,
    op_args=[transform_data_s3_uri],
    provide_context=True,
    dag=dag)

transform_op.set_upstream(train_op)

A workflow that runs a SageMaker training job and a batch transform job is finished. You can customize your Python callables with the SageMaker Python SDK according to your needs, and build more flexible and powerful workflows.