Run a Distributed Training Job Using the SageMaker Python SDK

Walk through the following pages to learn about the SageMaker model parallel library’s APIs to configure and enable distributed model parallelism through an Amazon SageMaker estimator.

Configuration Parameters for distribution

Amazon SageMaker’s TensorFlow and PyTorch estimator objects contain a distribution parameter, which you can use to enable and specify parameters for SageMaker distributed training. The SageMaker model parallel library internally uses MPI. To use model parallelism, both smdistributed and MPI must be enabled through the distribution parameter.

The following code example is a template of setting up model parallelism for a PyTorch estimator.

import sagemaker
from sagemaker.pytorch import PyTorch

smp_options = {
    "enabled":True,
    "parameters": {
        ...
    }
}

mpi_options = {
    "enabled" : True,
    ...
}

smdmp_estimator = PyTorch(
    ...
    distribution={
        "smdistributed": {"modelparallel": smp_options},
        "mpi": mpi_options
    }
)

smdmp_estimator.fit()

Tip

This page provides you a complete list of parameters you can use when you construct a SageMaker estimator and configure for distributed training.

To find examples of how to construct a SageMaker estimator with the distributed training parameters, see Launch a SageMaker Distributed Model Parallel Training Job in the SageMaker’s Distributed Model Parallel developer guide.

Parameters for smdistributed

You can use the following parameters to initialize the library configuring a dictionary for modelparallel, which goes into the smdistributed option for the distribution parameter.

Note

partitions for TensorFlow and pipeline_parallel_degree for PyTorch are required parameters. All other parameters in the following table are optional.

Common Parameters

Parameter

Type / Valid values

Default

Description

partitions for TensorFlow and PyTorch with smdistributed-modelparallel<v1.6, pipeline_parallel_degree for PyTorch v1.8.1 with smdistributed-modelparallel>=v1.6)

int

Required. The number of partitions to split the model into. In case of pipeline_parallel_degree for PyTorch, this is the number of devices over which pipeline parallelism will be performed.

microbatches

int

1

The number of microbatches to perform pipelining over. 1 means no pipelining. Batch size must be divisible by the number of microbatches.

pipeline

"interleaved" or "simple"

"interleaved"

The pipeline schedule.

optimize

"memory" or "speed"

"memory"

Determines the distribution mechanism of transformer layers. If optimizing speed, there will be less communication across tensor-parallel ranks and layer normalization will not be distributed. However, there will be duplicate activations stored across tensor-parallel ranks. If optimizing memory, there will be no redundant activations stored, but this will result in more communication overhead across tensor parallel ranks.

placement_strategy

"cluster", "spread", or a permutation of the string D, P, and T.

"cluster"

Determines the mapping of model partitions onto physical devices. When hybrid model/data parallelism is used, cluster places a single model replica in neighboring device IDs. Contrarily, spread places a model replica as far as possible. For more information, see Ranking Basics without Tensor Parallelism.

In case of the permutation letters, D stands for reduced-data parallelism, P stands for pipeline parallelism, and T stands for tensor parallelism. spread is equivalent to "TPD", and cluster is equivalent to "DPT". For more information, see Placement Strategy with Tensor Parallelism.

Note: For TensorFlow, tensor parallelism is not implemented and available parameter values are only "spread" and "cluster".

auto_partition

bool

True

Enable auto-partitioning. If disabled, default_partition parameter must be provided.

default_partition

int

0

Required if auto_partition is false. The partition ID to place operations/modules that are not placed in any smp.partition contexts.

TensorFlow-specific Parameters

Parameter

Type / Valid values

Default

Description

contiguous

bool

True

Whether the model partitions should be contiguous. If true, each partition forms a connected component in the computational graph, unless the graph itself is not connected.

horovod

bool

False

Must be set to True if hybrid model/data parallelism is used and the data parallelism (DP) framework is Horovod.

PyTorch-specific Parameters

Parameter

Type / Valid values

Default

Description

memory_weight

float [0.0, 1.0]

0.2 if optimize is "speed", else 0.8

The weight of memory balancing in the auto-partitioni ng objective, as opposed to balancing computational load. If 0.0, the library only tries to balance computation; if 1.0 the library only tries to balance the memory use. Any value in between interpolates between these extremes.

ddp

bool

False

Must be set to True if hybrid model/data parallelism is used with DistributedDataParallel. DistributedDataParallel is used with NCCL backend, and uses the MASTER_PORT provided by SageMaker.

active_microbatches (smdistributed-modelparallel>=v1.3)

int

partitions + 2

This is the maximum number of microbatches that are simultaneously in execution during pipelining. Jointly scaling batch size and number of microbatches can often mitigate the pipeline bubble overhead, but that can lead to increased memory usage if too many microbatches are simultaneously in execution. In such cases setting the number of active microbatches to a lower number can help control memory usage. By default this is set to two plus the number of partitions of the model.

deterministic_server (smdistributed-modelparallel>=v1.3)

bool

False

Setting this to true ensures that the execution server for pipelining executes requests in the same order across all data parallel ranks.

offload_activations (smdistributed-modelparallel>=v1.6)

bool

False

Enables activation offloading. To improve GPU memory usage, use activation offloading only when (1) the microbatches and active_microbatches are greater than 1, and (2) activation checkpointing is enabled for at least one module in the model.

activation_loading_horizon (smdistributed-modelparallel>=v1.6)

int

4

Specify the number of pipeline tasks. This determines how early the activations should be loaded back to the GPU, expressed in number of pipeline tasks. Smaller value indicates that activations are loaded closer in time to when they are needed for backward pass. Setting this value too small might improve memory usage, but might potentially cause throughput loss and GPU bottlenecks during the CPU-to-GPU data transfer.

tensor_parallel_degree (smdistributed-modelparallel>=v1.6)

int

1

The number of devices over which the tensor parallel modules will be distributed. If tensor_parallel_degree is greater than 1, then ddp must be set to True.

fp16 (smdistributed-modelparallel>=v1.10)

bool

False

To run FP16 training, add "fp16"'": True to the smp configuration. Other APIs remain the same between FP16 and FP32. If fp16 is enabled and when user calls smp.DistributedModel, the model will be wrapped with FP16_Module, which converts the model to FP16 dtype and deals with forward pass in FP16. If fp16 is enabled and when user calls smp.DistributedOptimizer, the optimizer will be wrapped with FP16_Optimizer.

fp16_params (smdistributed-modelparallel>=v1.6)

bool

False

If True, the parameters of the distributed modules will be initialized in FP16.

shard_optimizer_state (smdistributed-modelparallel>=v1.6)

bool

False

If True, the library shards the optimizer state of all parameters across the data parallel processes which hold the same parameter. This optimizer state sharding happens in a balanced manner. Note that when sharding optimizer state, full optimizer saving is not currently supported. Please save partial optimizer state. For more information about saving and loading checkpoints with optimizer state sharding, see Instructions for Checkpointing with Tensor Parallelism.

prescaled_batch (smdistributed-modelparallel>=v1.6)

bool

False

If True and when smp.nn.DistributedTransformerLMHead is used (this is typically used for GPT-2 or GPT-3 models), the library assumes that the devices in the same tensor parallelism group receive the same input data. Otherwise, it is assumed that they receive different examples. To learn more, see Prescaled Batch.

skip_tracing (smdistributed-modelparallel>=v1.6)

bool

False

Skips the initial tracing step. This can be useful in very large models where even model tracing at the CPU is not possible due to memory constraints.

sharded_data_parallel_degree (smdistributed-modelparallel>=v1.11)

int

1

To run a training job using sharded data parallelism, add this parameter and specify a number greater than 1. Sharded data parallelism is a memory-saving distributed training technique that splits the training state of a model (model parameters, gradients, and optimizer states) across GPUs in a data parallel group. For more information, see Sharded Data Parallelism.

sdp_reduce_bucket_size (smdistributed-modelparallel>=v1.11)

int

5e8

Configuration parameter for sharded data parallelism (for sharded_data_parallel_degree > 2). Specifies the size of PyTorch DDP gradient buckets in number of elements of the default dtype.

sdp_param_persistence_threshold (smdistributed-modelparallel>=v1.11)

int

1e6

Specifies the size of a parameter tensor in number of elements that can persist at each GPU. Sharded data parallelism splits each parameter tensor across GPUs of a data parallel group. If the number of elements in the parameter tensor is smaller than this threshold, the parameter tensor is not split; this helps reduce communication overhead because the parameter tensor is replicated across data-parallel GPUs.

sdp_max_live_parameters (smdistributed-modelparallel>=v1.11)

int

1e9

Specifies the maximum number of parameters that can simultaneously be in a recombined training state during the forward and backward pass. Parameter fetching with the AllGather operation pauses when the number of active parameters reaches the given threshold. Note that increasing this parameter increases the memory footprint.

sdp_hierarchical_allgather (smdistributed-modelparallel>=v1.11)

bool

True

If set to True, the AllGather operation runs hierarchically: it runs within each node first, and then runs across nodes. For multi-node distributed training jobs, the hierarchical AllGather operation is automatically activated.

sdp_gradient_clipping (smdistributed-modelparallel>=v1.11)

float

1.0

Specifies a threshold for gradient clipping the L2 norm of the gradients before propagating them backward through the model parameters. When sharded data parallelism is activated, gradient clipping is also activated. The default threshold is 1.0. Adjust this parameter if you have the exploding gradients problem.

Parameters for mpi

For the "mpi" key, a dict must be passed which contains:

  • "enabled": Set to True to launch the training job with MPI.

  • "processes_per_host": Specifies the number of processes MPI should launch on each host. In SageMaker a host is a single Amazon EC2 ml instance. The SageMaker distributed model parallel library maintains a one-to-one mapping between processes and GPUs across model and data parallelism. This means that SageMaker schedules each process on a single, separate GPU and no GPU contains more than one process. If you are using PyTorch, you must restrict each process to its own device using torch.cuda.set_device(smp.local_rank()). To learn more, see Modify a PyTorch Training Script.

    Important

    process_per_host must be less than or equal to the number of GPUs per instance, and typically will be equal to the number of GPUs per instance.

    For example, if you use one instance with 4-way model parallelism and 2-way data parallelism, then processes_per_host should be 2 x 4 = 8. Therefore, you must choose an instance that has at least 8 GPUs, such as an ml.p3.16xlarge.

    The following image illustrates how 2-way data parallelism and 4-way model parallelism is distributed across 8 GPUs: the model is partitioned across 4 GPUs, and each partition is added to 2 GPUs.

    2-way data parallelism and 4-way model parallelism distributed across 8 GPUs
  • "custom_mpi_options": Use this key to pass any custom MPI options you might need. To avoid Docker warnings from contaminating your training logs, we recommend the following flag. `--mca btl_vader_single_copy_mechanism none`

Ranking Basics without Tensor Parallelism

The library maintains a one-to-one mapping between processes and available GPUs: for each GPU, there is a corresponding CPU process. Each CPU process maintains a “rank” assigned by MPI, which is a 0-based unique index for the process. For instance, if a training job is launched with 4 p3dn.24xlarge instances using all its GPUs, there are 32 processes across all instances, and the ranks of these processes range from 0 to 31.

The local_rank of a process is the rank of the process among the processes in the same instance. This can range from 0 up to the number of GPUs in the instance, but can be lower if fewer processes than GPUs are launched in the instance. For instance, in the preceding example, local_ranks of the processes will range from 0 to 7, since there are 8 GPUs in a p3dn.24xlarge instance.

When model parallelism is used together with data parallelism (Horovod for TensorFlow and DDP for PyTorch), the library partitions the set of processes into disjoint mp_groups. An mp_group is a subset of all processes that together hold a single, partitioned model replica.

For instance, if a single node job is launched with 8 local processes with partitions=2 (meaning the model will be split into 2), there are four mp_groups. The specific sets of processes that form the mp_groups can be adjusted by the placement_strategy option.

  • If placement_strategy is spread, then the four mp_groups are [0, 4], [1, 5], [2, 6], [3, 7]. The mp_rank is the rank of a process within each mp_group. For example, the mp_rank is 0 for the processes 0, 1, 2, and 3, and the mp_rank is 1 for the processes 4, 5, 6, and 7.

    Analogously, the library defines dp_groups as sets of processes that all hold the same model partition, and perform data parallelism among each other. If placement_strategy is spread, there are two dp_groups: [0, 1, 2, 3] and [4, 5, 6, 7].

    Since each process within the dp_group holds the same partition of the model, and makes allreduce calls among themselves. Allreduce for data parallelism does not take place across dp_groups. dp_rank is defined as the rank of a process within its dp_group. In the preceding example, the dp_rank of process 6 is 2.

  • If placement_strategy is cluster, the four mp_groups become [0, 1], [2, 3], [4, 5], [6, 7], and the the two dp_groups become [0, 2, 4, 6] and [1, 3, 5, 7].

Placement Strategy with Tensor Parallelism

In addition to the two placement strategies introduced in the previous section, the library provides additional placement strategies for extended tensor parallelism features for PyTorch. The additional placement strategies (parallelism types) are denoted as follows:

  • D stands for (reduced) data parallelism.

  • P stands for pipeline parallelism.

  • T stands for tensor parallelism.

With given permutation of the tree letters, the library takes the right-most letter as the first strategy performs over the global ranks in ascending order. Contrarily, the parallelism type represented by the left-most letter is performed over the ranks that are as distant as possible.

  • Example: Given 8 devices with tp_size() == 2, pp_size() == 2, rdp_size() == 2

    • placement_strategy: "DPT" gives

      rank

      rdp_rank

      pp_rank

      tp_rank

      0

      0

      0

      0

      1

      0

      0

      1

      2

      0

      1

      0

      3

      0

      1

      1

      4

      1

      0

      0

      5

      1

      0

      1

      6

      1

      1

      0

      7

      1

      1

      1

    • placement_strategy: "PTD" gives

      rank

      rdp_rank

      pp_rank

      tp_rank

      0

      0

      0

      0

      1

      1

      0

      0

      2

      0

      0

      1

      3

      1

      0

      1

      4

      0

      1

      0

      5

      1

      1

      0

      6

      0

      1

      1

      7

      1

      1

      1

Because the neighboring ranks are placed on the same instance with high-bandwidth NVLinks, it is recommended to place the parallelism type that has higher bandwidth requirements for your model on the right-most position in the placement_strategy string. Because tensor parallelism often requires frequent communication, placing T in the right-most position is recommended (as in the default "cluster" strategy). In many large models, keeping the default of "cluster" would result in the best performance.

Prescaled Batch

prescaled_batch is a configuration parameter that can be useful for DistributedTransformerLMHead, which is used for GPT-2 and GPT-3.

The way tensor parallelism works is that when a module is distributed, the inputs to the distributed module in different tp_ranks gets shuffled around in a way that is sliced by the hidden dimension and scaled by the batch dimension. For example, if tensor parallel degree is 8, the inputs to DistributedTransformer (a tensor with shape [B, S, H] where B=batch size, S=sequence length, H=hidden width) in different tp_ranks will be communicated around, and the shapes will become [8B, S, H/8]. Each tp_rank has the batch from all the peer tp_ranks, but only the slice that interacts with their local partition of the module.

By default, the library assumes that each tp_rank gets assigned a different batch, and performs the communication described above. If prescaled_batch is true, then the library assumes that the input batch is already scaled (and is the same across the tp_ranks), and only does the slicing. In the example above, the library assumes that input tensor has shape [8B, S, H], and only converts it into [8B, S, H/8]. So if prescaled_batch is true, it is the user’s responsibility to feed the same batch to the tp_ranks in the same TP_GROUP. This can be done by doing the data sharding based on smp.rdp_size() and smp.rdp_rank(), instead of smp.dp_size() and smp.dp_rank(). When prescaled_batch is true, the global batch size is smp.rdp_size() multiplied by the per-MP_GROUP batch size. When prescaled_batch is false, global batch size is smp.dp_size() multiplied by the per-PP_GROUP batch size.

If you use pipeline parallelism degree 1, then you can keep prescaled_batch false (the default option). If you use a pipeline parallellism degree more than 1, it is recommended to use prescaled_batch true, so that you can increase per-MP_GROUP batch size for efficient pipelining, without running into out-of-memory issues.