SageMaker V3 Local Training Example#
This notebook demonstrates how to use SageMaker V3 ModelTrainer in Local Container mode for testing training jobs in Docker containers locally.
import os
import subprocess
import tempfile
import shutil
import numpy as np
from sagemaker.train.model_trainer import ModelTrainer, Mode
from sagemaker.train.configs import SourceCode, Compute, InputData
from sagemaker.core.helper.session_helper import Session
# NOTE: Local mode requires Docker to be installed and running.
import os
os.environ['PATH'] = '/usr/local/bin:/Applications/Docker.app/Contents/Resources/bin:' + os.environ['PATH']
Step 1: Setup Session and Create Test Data#
Initialize the SageMaker session and create the necessary test data and training script.
sagemaker_session = Session()
region = sagemaker_session.boto_region_name
# Get the correct ECR image for your region
from sagemaker.core import image_uris
DEFAULT_CPU_IMAGE = image_uris.retrieve(
framework="pytorch",
region=region,
version="2.0.0",
py_version="py310",
instance_type="ml.m5.xlarge",
image_scope="training"
)
# Set Docker platform for Apple Silicon compatibility
import platform
if platform.machine() == 'arm64':
os.environ['DOCKER_DEFAULT_PLATFORM'] = 'linux/amd64'
# Create temporary directories
temp_dir = tempfile.mkdtemp()
source_dir = os.path.join(temp_dir, "source")
data_dir = os.path.join(temp_dir, "data")
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
os.makedirs(source_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
print(f"Created temporary directories in: {temp_dir}")
Step 2: Create Training Data and Scripts#
Generate the test data and training scripts needed for local container training.
# Create test data
np.random.seed(42)
x_train = np.random.randn(100, 4).astype(np.float32)
y_train = np.random.randn(100).astype(np.float32)
x_test = np.random.randn(20, 4).astype(np.float32)
y_test = np.random.randn(20).astype(np.float32)
np.save(os.path.join(train_dir, "x_train.npy"), x_train)
np.save(os.path.join(train_dir, "y_train.npy"), y_train)
np.save(os.path.join(test_dir, "x_test.npy"), x_test)
np.save(os.path.join(test_dir, "y_test.npy"), y_test)
print(f"Created training data: {x_train.shape}, {y_train.shape}")
print(f"Created test data: {x_test.shape}, {y_test.shape}")
# Create pytorch model definition
pytorch_model_def = '''
import torch
import torch.nn as nn
def get_model():
return nn.Sequential(
nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1)
)
'''
with open(os.path.join(source_dir, "pytorch_model_def.py"), 'w') as f:
f.write(pytorch_model_def)
print("Created pytorch_model_def.py")
# Create training script
training_script = '''
import argparse
import numpy as np
import os
import sys
import logging
import json
import shutil
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from pytorch_model_def import get_model
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
current_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = "/opt/ml/input/data"
def get_train_data(train_dir):
x_train = np.load(os.path.join(train_dir, "x_train.npy"))
y_train = np.load(os.path.join(train_dir, "y_train.npy"))
logger.info(f"x train: {x_train.shape}, y train: {y_train.shape}")
return torch.from_numpy(x_train), torch.from_numpy(y_train)
def get_test_data(test_dir):
x_test = np.load(os.path.join(test_dir, "x_test.npy"))
y_test = np.load(os.path.join(test_dir, "y_test.npy"))
logger.info(f"x test: {x_test.shape}, y test: {y_test.shape}")
return torch.from_numpy(x_test), torch.from_numpy(y_test)
def train():
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
model_dir = os.environ.get("SM_MODEL_DIR", os.path.join(current_dir, "data/model"))
x_train, y_train = get_train_data(train_dir)
x_test, y_test = get_test_data(test_dir)
train_ds = TensorDataset(x_train, y_train)
batch_size = 64
epochs = 1
learning_rate = 0.1
logger.info(f"batch_size = {batch_size}, epochs = {epochs}, learning rate = {learning_rate}")
train_dl = DataLoader(train_ds, batch_size, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = get_model().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
for epoch in range(epochs):
for x_train_batch, y_train_batch in train_dl:
y = model(x_train_batch.float())
loss = criterion(y.flatten(), y_train_batch.float())
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch += 1
logger.info(f"epoch: {epoch} -> loss: {loss}")
with torch.no_grad():
y = model(x_test.float()).flatten()
mse = ((y - y_test) ** 2).sum() / y_test.shape[0]
print("Test MSE:", mse.numpy())
os.makedirs(model_dir, exist_ok=True)
torch.save(model.state_dict(), model_dir + "/model.pth")
inference_code_path = model_dir + "/code/"
if not os.path.exists(inference_code_path):
os.mkdir(inference_code_path)
logger.info(f"Created a folder at {inference_code_path}!")
shutil.copy("local_training_script.py", inference_code_path)
shutil.copy("pytorch_model_def.py", inference_code_path)
logger.info(f"Saving models files to {inference_code_path}")
if __name__ == "__main__":
print("Running the training job ...")
train()
'''
with open(os.path.join(source_dir, "local_training_script.py"), 'w') as f:
f.write(training_script)
print("Created local_training_script.py")
Step 3: Configure Local Container Training#
Set up ModelTrainer to run in LOCAL_CONTAINER mode.
source_code = SourceCode(
source_dir=source_dir,
entry_script="local_training_script.py",
)
compute = Compute(
instance_type="local_cpu",
instance_count=1,
)
train_data = InputData(
channel_name="train",
data_source=train_dir,
)
test_data = InputData(
channel_name="test",
data_source=test_dir,
)
print("Configuration complete")
Step 4: Create ModelTrainer#
Initialize ModelTrainer with the local container configuration.
model_trainer = ModelTrainer(
training_image=DEFAULT_CPU_IMAGE,
sagemaker_session=sagemaker_session,
source_code=source_code,
compute=compute,
input_data_config=[train_data, test_data],
base_job_name="local_mode_single_container_local_data",
training_mode=Mode.LOCAL_CONTAINER,
)
print("ModelTrainer created with LOCAL_CONTAINER mode!")
Step 5: Run Local Container Training#
Start the training job in local Docker container.
print("Starting local container training...")
try:
model_trainer.train()
print("Local container training completed successfully!")
operation_successful = True
except Exception as e:
print(f"Training failed with error: {e}")
operation_successful = False
Step 6: Check Training Results#
Examine the training artifacts created by local container training.
if operation_successful:
current_dir = os.getcwd()
directories_to_check = [
"compressed_artifacts",
"artifacts",
"model",
"output",
]
print("Training Results:")
for directory in directories_to_check:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
print(f"✓ {directory}: Found")
if os.path.isdir(path):
files = os.listdir(path)
print(f" Contents: {files}")
else:
print(f"✗ {directory}: Not found")
print("Local container training completed successfully!")
else:
print("Training was not successful.")
Step 7: Clean Up#
Clean up local artifacts and temporary files.
try:
subprocess.run(["docker", "compose", "down", "-v"], check=False)
print("Docker containers stopped")
except Exception:
pass
# Clean up temporary files
try:
shutil.rmtree(temp_dir)
print(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
print(f"Could not clean up temp directory: {e}")
# Clean up training artifacts
current_dir = os.getcwd()
directories = ["compressed_artifacts", "artifacts", "model", "output"]
for directory in directories:
path = os.path.join(current_dir, directory)
if os.path.exists(path):
try:
shutil.rmtree(path)
print(f"Cleaned up: {directory}")
except Exception as e:
print(f"Could not clean up {directory}: {e}")
print("Cleanup completed!")
Summary#
This notebook demonstrated:
Local container training: Running training in Docker containers locally
Data preparation: Creating test data and training scripts
Artifact management: Checking and cleaning up training artifacts
Docker integration: Proper container lifecycle management
Local container training provides a great way to test training jobs locally before deploying to SageMaker cloud instances.