SageMaker V3 Distributed Local Training Example#
This notebook demonstrates how to run distributed training locally using SageMaker V3 ModelTrainer with multiple Docker containers.
import os
import subprocess
import tempfile
import shutil
import numpy as np
from sagemaker.train.model_trainer import ModelTrainer, Mode
from sagemaker.train.configs import SourceCode, Compute, InputData
from sagemaker.train.distributed import Torchrun
from sagemaker.core.helper.session_helper import Session
# NOTE: Local mode requires Docker to be installed and running.
import os
os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:' + os.environ['PATH']
Step 1: Setup Session and Create Test Data#
Initialize the SageMaker session and create the necessary test data and training script.
sagemaker_session = Session()
DEFAULT_CPU_IMAGE = "763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.0.0-cpu-py310"
# Create temporary directories
temp_dir = tempfile.mkdtemp()
source_dir = os.path.join(temp_dir, "source")
data_dir = os.path.join(temp_dir, "data")
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
os.makedirs(source_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
print(f"Created temporary directories in: {temp_dir}")
print("Note: This will use multiple Docker containers locally for distributed training!")
Step 2: Create Training Data and Scripts#
Generate the test data and training scripts needed for distributed local training.
# Create test data
np.random.seed(42)
x_train = np.random.randn(100, 4).astype(np.float32)
y_train = np.random.randn(100).astype(np.float32)
x_test = np.random.randn(20, 4).astype(np.float32)
y_test = np.random.randn(20).astype(np.float32)
np.save(os.path.join(train_dir, "x_train.npy"), x_train)
np.save(os.path.join(train_dir, "y_train.npy"), y_train)
np.save(os.path.join(test_dir, "x_test.npy"), x_test)
np.save(os.path.join(test_dir, "y_test.npy"), y_test)
print(f"Created training data: {x_train.shape}, {y_train.shape}")
print(f"Created test data: {x_test.shape}, {y_test.shape}")
# Create pytorch model definition
pytorch_model_def = '''
import torch
import torch.nn as nn
def get_model():
return nn.Sequential(
nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1)
)
'''
with open(os.path.join(source_dir, "pytorch_model_def.py"), 'w') as f:
f.write(pytorch_model_def)
print("Created pytorch_model_def.py")
# Create training script (same as single container for simplicity)
training_script = '''
import argparse
import numpy as np
import os
import sys
import logging
import json
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from pytorch_model_def import get_model
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
current_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = "/opt/ml/input/data"
def get_train_data(train_dir):
x_train = np.load(os.path.join(train_dir, "x_train.npy"))
y_train = np.load(os.path.join(train_dir, "y_train.npy"))
logger.info(f"x train: {x_train.shape}, y train: {y_train.shape}")
return torch.from_numpy(x_train), torch.from_numpy(y_train)
def get_test_data(test_dir):
x_test = np.load(os.path.join(test_dir, "x_test.npy"))
y_test = np.load(os.path.join(test_dir, "y_test.npy"))
logger.info(f"x test: {x_test.shape}, y test: {y_test.shape}")
return torch.from_numpy(x_test), torch.from_numpy(y_test)
def train():
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
model_dir = os.environ.get("SM_MODEL_DIR", os.path.join(current_dir, "data/model"))
x_train, y_train = get_train_data(train_dir)
x_test, y_test = get_test_data(test_dir)
train_ds = TensorDataset(x_train, y_train)
batch_size = 64
epochs = 1
learning_rate = 0.1
logger.info(f"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}")
train_dl = DataLoader(train_ds, batch_size, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = get_model().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
for epoch in range(epochs):
for x_train_batch, y_train_batch in train_dl:
y = model(x_train_batch.float())
loss = criterion(y.flatten(), y_train_batch.float())
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch += 1
logger.info(f"epoch: {epoch} -> loss: {loss}")
with torch.no_grad():
y = model(x_test.float()).flatten()
mse = ((y - y_test) ** 2).sum() / y_test.shape[0]
print("Test MSE:", mse.numpy())
os.makedirs(model_dir, exist_ok=True)
torch.save(model.state_dict(), model_dir + "/model.pth")
inference_code_path = model_dir + "/code/"
if not os.path.exists(inference_code_path):
os.mkdir(inference_code_path)
logger.info(f"Created a folder at {inference_code_path}!")
shutil.copy("local_training_script.py", inference_code_path)
shutil.copy("pytorch_model_def.py", inference_code_path)
logger.info(f"Saving models files to {inference_code_path}")
if __name__ == "__main__":
print("Running the training job ...")
train()
'''
with open(os.path.join(source_dir, "local_training_script.py"), 'w') as f:
f.write(training_script)
print("Created local_training_script.py")
Step 3: Configure Distributed Local Training#
Set up ModelTrainer for distributed training in local container mode.
source_code = SourceCode(
source_dir=source_dir,
entry_script="local_training_script.py",
)
distributed = Torchrun(
process_count_per_node=1,
)
compute = Compute(
instance_type="local_cpu",
instance_count=2,
)
train_data = InputData(
channel_name="train",
data_source=train_dir,
)
test_data = InputData(
channel_name="test",
data_source=test_dir,
)
print("Distributed Local Training Configuration:")
print(f" Containers: {compute.instance_count}")
print(f" Processes per container: {distributed.process_count_per_node}")
print(f" Total processes: {compute.instance_count * distributed.process_count_per_node}")
print(f" Distributed framework: Torchrun")
Step 4: Create Distributed ModelTrainer#
Initialize ModelTrainer for distributed local container training.
model_trainer = ModelTrainer(
training_image=DEFAULT_CPU_IMAGE,
sagemaker_session=sagemaker_session,
source_code=source_code,
distributed=distributed,
compute=compute,
input_data_config=[train_data, test_data],
base_job_name="local_mode_multi_container",
training_mode=Mode.LOCAL_CONTAINER,
)
print("Distributed ModelTrainer created successfully!")
print(f"Training mode: {Mode.LOCAL_CONTAINER}")
print(f"Distributed config: {distributed}")
Step 5: Run Distributed Local Training#
Start the distributed training job using multiple local containers.
print("Starting distributed local container training...")
print("This will launch 2 Docker containers with 1 training process each.")
try:
model_trainer.train()
print("Distributed local container training completed successfully!")
operation_successful = True
except Exception as e:
print(f"Training failed with error: {e}")
operation_successful = False
Step 6: Check Training Results#
Examine the results from distributed training.
if operation_successful:
current_dir = os.getcwd()
print("Distributed Training Results:")
print("=" * 35)
# Check that certain directories don't exist (cleanup verification)
assert not os.path.exists(os.path.join(current_dir, "shared"))
assert not os.path.exists(os.path.join(current_dir, "input"))
assert not os.path.exists(os.path.join(current_dir, "algo-1"))
assert not os.path.exists(os.path.join(current_dir, "algo-2"))
print("✓ Temporary directories properly cleaned up")
# Check for expected artifacts
directories_to_check = [
"compressed_artifacts",
"artifacts",
"model",
"output",
]
for directory in directories_to_check:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
print(f"✓ {directory}: Found")
else:
print(f"✗ {directory}: Not found")
print("\nDistributed Training Configuration:")
print(f" Training Image: {DEFAULT_CPU_IMAGE}")
print(f" Container Count: {compute.instance_count}")
print(f" Processes per Container: {distributed.process_count_per_node}")
print(f" Total Training Processes: {compute.instance_count * distributed.process_count_per_node}")
else:
print("Training was not successful.")
Step 7: Clean Up#
Clean up local artifacts and temporary files.
try:
subprocess.run(["docker", "compose", "down", "-v"], check=False)
print("Docker containers stopped")
except Exception:
pass
# Clean up temporary files
try:
shutil.rmtree(temp_dir)
print(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
print(f"Could not clean up temp directory: {e}")
# Clean up training artifacts
current_dir = os.getcwd()
directories = ["compressed_artifacts", "artifacts", "model", "output"]
for directory in directories:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
try:
shutil.rmtree(path)
print(f"Cleaned up: {directory}")
except Exception as e:
print(f"Could not clean up {directory}: {e}")
# Final assertion
assert operation_successful
print("\n✓ Distributed local training completed successfully!")
print("Cleanup completed - all artifacts removed.")
Summary#
This notebook demonstrated:
Multi-container distributed training: Running training across multiple Docker containers locally
Torchrun integration: Using SageMaker’s Torchrun distributed driver
Local development workflow: Testing distributed training before cloud deployment
Proper cleanup: Following cleanup patterns for local artifacts
Distributed local training provides a great way to test distributed training patterns locally before deploying to SageMaker cloud instances, with no AWS costs and realistic container-based execution.