Amazon SageMaker Feature Store

Create Feature Groups

This guide will show you how to create and use Amazon SageMaker Feature Store. The example code in this guide covers using the SageMaker Python SDK. The underlying APIs are available for developers using other languages.

Features

Prior to using a feature store you will typically load your dataset, run transformations, and set up your features for ingestion. This step has a lot of variation and is highly dependent on your data. The example code in the following code blocks will often make reference to an example notebook, Fraud Detection with Amazon SageMaker Feature Store. It is recommended that you run this notebook in SageMaker Studio and use the code from there, as the code in this guide is conceptual and not fully functional if copied.

Feature store data types and schema

Feature Store supported types are String, Fractional, and Integral. The default type is set to String. This means that, if a column in your dataset is not a float or long type, it will default to String in your feature store.

You may use a schema to describe your data’s columns and data types. You pass this schema into FeatureDefinitions, a required parameter for a FeatureGroup. However, for Python developers, the SageMaker Python SDK has automatic data type detection when you use the load_feature_definitions function.

Feature store setup

To start using Feature Store, first create a SageMaker session, boto3 session, and a Feature Store session. Also, setup the bucket you will use for your features; this is your Offline Store. The following will use the SageMaker default bucket and add a custom prefix to it.

Note

The role that you use requires these managed policies:AmazonSageMakerFullAccessandAmazonSageMakerFeatureStoreAccess.

import boto3
import sagemaker
from sagemaker.session import Session

boto_session = boto3.Session(region_name=region)
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
default_bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker-featurestore'
offline_feature_store_bucket = 's3://*{}*/*{}*'.format(default_bucket, prefix)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

Load datasets and partition data into feature groups

You will load your data into data frames for each of your features. You will use these data frames after you setup the feature group. In the fraud detection example, you can see these steps in the following code.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io

fraud_detection_bucket_name = 'sagemaker-featurestore-fraud-detection'
identity_file_key = 'sampled_identity.csv'
transaction_file_key = 'sampled_transactions.csv'

identity_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=identity_file_key)
transaction_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=transaction_file_key)

identity_data = pd.read_csv(io.BytesIO(identity_data_object['Body'].read()))
transaction_data = pd.read_csv(io.BytesIO(transaction_data_object['Body'].read()))

identity_data = identity_data.round(5)
transaction_data = transaction_data.round(5)

identity_data = identity_data.fillna(0)
transaction_data = transaction_data.fillna(0)

# Feature transformations for this dataset are applied before ingestion into FeatureStore.
# One hot encode card4, card6
encoded_card_bank = pd.get_dummies(transaction_data['card4'], prefix = 'card_bank')
encoded_card_type = pd.get_dummies(transaction_data['card6'], prefix = 'card_type')

transformed_transaction_data = pd.concat([transaction_data, encoded_card_type, encoded_card_bank], axis=1)
transformed_transaction_data = transformed_transaction_data.rename(columns={"card_bank_american express": "card_bank_american_express"})

Feature group setup

Name your feature groups and customize the feature names with a unique name, and setup each feature group with the FeatureGroup class.

from sagemaker.feature_store.feature_group import FeatureGroup
feature_group_name = "some string for a name"
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)

For example, in the fraud detection example, the two feature groups are “identity” and “transaction”. In the following code you can see how the names are customized with a timestamp, then each group is setup by passing in the name and the session.

import time
from time import gmtime, strftime, sleep
from sagemaker.feature_store.feature_group import FeatureGroup

identity_feature_group_name = 'identity-feature-group-' + strftime('%d-%H-%M-%S', gmtime())
transaction_feature_group_name = 'transaction-feature-group-' + strftime('%d-%H-%M-%S', gmtime())

identity_feature_group = FeatureGroup(name=identity_feature_group_name, sagemaker_session=feature_store_session)
transaction_feature_group = FeatureGroup(name=transaction_feature_group_name, sagemaker_session=feature_store_session)

Record identifier and event time feature

Next, you will need a record identifier name and an event time feature name. This will match the column of the corresponding features in your data. For example, in the fraud detection example, the column of interest is “TransactionID”. “EventTime” can be appended to your data when no timestamp is available. In the following code, you can see how these variables are set, and then EventTime is appended to both feature’s data.

record_identifier_name = "TransactionID"
event_time_feature_name = "EventTime"
current_time_sec = int(round(time.time()))
identity_data[event_time_feature_name] = pd.Series([current_time_sec]*len(identity_data), dtype="float64")
transformed_transaction_data[event_time_feature_name] = pd.Series([current_time_sec]*len(transaction_data), dtype="float64")

Feature definitions

You can now load the feature definitions by passing a data frame containing the feature data. In the following code for the fraud detection example, the identity feature and transaction feature are each loaded by using load_feature_definitions, and this function automatically detects the data type of each column of data. For developers using a schema rather than automatic detection, refer to the Creating Feature Groups with Data Wrangler example for code that shows loading the schema, mapping it and adding as a FeatureDefinition that is used when you create the FeatureGroup. This example also covers a boto3 implementation, instead of using the SageMaker Python SDK.

identity_feature_group.load_feature_definitions(data_frame=identity_data); # output is suppressed
transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data); # output is suppressed

Create a feature group

The last step for creating the feature group is to use the create function. The following code shows all of the available parameters. The online store is not created by default, so you must set this as True if you want to enable it. The s3_uri is the location of your offline store.

# create a FeatureGroup
feature_group.create(
    description = "Some info about the feature group",
    feature_group_name = feature_group_name,
    record_identifier_name = record_identifier_name,
    event_time_feature_name = event_time_feature_name,
    feature_definitions = feature_definitions,
    role_arn = role,
    s3_uri = offline_feature_store_bucket,
    enable_online_store = True,
    ttl_duration = None,
    online_store_kms_key_id = None,
    offline_store_kms_key_id = None,
    disable_glue_table_creation = False,
    data_catalog_config = None,
    tags = ["tag1","tag2"])

The following code from the fraud detection example shows a minimal create call for each of the two features groups being created.

identity_feature_group.create(
    s3_uri=offline_feature_store_bucket,
    record_identifier_name=record_identifier_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

transaction_feature_group.create(
    s3_uri=offline_feature_store_bucket,
    record_identifier_name=record_identifier_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

Creating a feature group takes time as the data is loaded. You will need to wait until it is created before you can use it. You can check status using the following method. Note that it can take approximately 10-15 minutes to provision an online FeatureGroup with the InMemory StorageType.

status = feature_group.describe().get("FeatureGroupStatus")

While it is creating you will get a Creating as a response. When this has finished successfully the response will be Created. The other possible statuses are CreateFailed, Deleting, or DeleteFailed.

Describe a feature group

You can retrieve information about your feature group with the describe function.

feature_group.describe()

List feature groups

You can list all of your feature groups with the list_feature_groups function.

sagemaker_client.list_feature_groups()

Put records in a feature group

You can use the ingest function to load your feature data. You pass in a data frame of feature data, set the number of workers, and choose to wait for it to return or not. The following example demonstrates using the ingest function.

feature_group.ingest(
    data_frame=feature_data, max_workers=3, wait=True
)

For each feature group you have, run the ingest function on the feature data you want to load.

Get records from a feature group

You can use the get_record function to retrieve the data for a specific feature by its record identifier. The following example uses an example identifier to retrieve the record.

record_identifier_value = str(2990130)
featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)

You can use the batch_get_record function to retrieve multiple records simultaneously from your feature store. The following example uses this API to retrieve a batch of records.

record_identifier_values = ["573291", "109382", "828400", "124013"]
featurestore_runtime.batch_get_record(Identifiers=[{"FeatureGroupName": transaction_feature_group_name, "RecordIdentifiersValueAsString": record_identifier_values}])

An example response from the fraud detection example:

...
'Record': [{'FeatureName': 'TransactionID', 'ValueAsString': '2990130'},
  {'FeatureName': 'isFraud', 'ValueAsString': '0'},
  {'FeatureName': 'TransactionDT', 'ValueAsString': '152647'},
  {'FeatureName': 'TransactionAmt', 'ValueAsString': '75.0'},
  {'FeatureName': 'ProductCD', 'ValueAsString': 'H'},
  {'FeatureName': 'card1', 'ValueAsString': '4577'},
...

Hive DDL commands

The SageMaker Python SDK’s FeatureStore class also provides the functionality to generate Hive DDL commands. The schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type.

print(feature_group.as_hive_ddl())

An example output:

CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.identity-feature-group-27-19-33-00 (
  TransactionID INT
  id_01 FLOAT
  id_02 FLOAT
  id_03 FLOAT
  id_04 FLOAT
 ...

Build a Training Dataset

Feature Store automatically builds a Amazon Glue Data Catalog when Feature Groups are created and can optionally be turned off. The following we show how to create a single training dataset with feature values from both identity and transaction feature groups created above. Also, the following shows how to run an Amazon Athena query to join data stored in the Offline Store from both identity and transaction feature groups.

To start, create an Athena query usingathena_query()for both identity and transaction feature groups. The table_name is the Glue table that is auto-generated by Feature Store.

identity_query = identity_feature_group.athena_query()
transaction_query = transaction_feature_group.athena_query()

identity_table = identity_query.table_name
transaction_table = transaction_query.table_name

Writing and Executing your Athena Query

You will write your query using SQL on these feature groups, and then execute the query with the .run() command and specify your S3 bucket location for the data set to be saved there.

# Athena query
query_string = 'SELECT * FROM "'+transaction_table+'" LEFT JOIN "'+identity_table+'" ON "'+transaction_table+'".transactionid = "'+identity_table+'".transactionid'

# run Athena query. The output is loaded to a Pandas dataframe.
dataset = pd.DataFrame()
identity_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/query_results/')
identity_query.wait()
dataset = identity_query.as_dataframe()

From here you can train a model using this data set and then perform inference.

Using the Offline Store SDK: Getting Started

The Feature Store Offline SDK provides the ability to quickly and easily build ML-ready datasets for use by ML model training or pre-processing. The SDK makes it easy to build datasets from SQL join, point-in-time accurate join, and event range time frames, all without the need to write any SQL code. This functionality is accessed via the DatasetBuilder class which is the primary entry point for the SDK functionality.

from sagemaker.feature_store.feature_store import FeatureStore

feature_store = FeatureStore(sagemaker_session=feature_store_session)
base_feature_group = identity_feature_group
target_feature_group = transaction_feature_group

You can create dataset using create_dataset of feature store API. base can either be a feature group or a pandas dataframe.

result_df, query = feature_store.create_dataset(
   base=base_feature_group,
   output_path=f"s3://{s3_bucket_name}"
).to_dataframe()

If you want to join other feature group, you can specify extra feature group using with_feature_group method.

dataset_builder = feature_store.create_dataset(
   base=base_feature_group,
   output_path=f"s3://{s3_bucket_name}"
).with_feature_group(target_feature_group, record_identifier_name)

result_df, query = dataset_builder.to_dataframe()

Using the Offline Store SDK: Configuring the DatasetBuilder

How the DatasetBuilder produces the resulting dataframe can be configured in various ways.

By default the Python SDK will exclude all deleted and duplicate records. However if you need either of them in returned dataset, you can call include_duplicated_records or include_deleted_records when creating dataset builder.

dataset_builder.include_duplicated_records()
dataset_builder.include_deleted_records()

The DatasetBuilder provides with_number_of_records_from_query_results and with_number_of_recent_records_by_record_identifier methods to limit the number of records returned for the offline snapshot.

with_number_of_records_from_query_results will limit the number of records in the output. For example, when N = 100, only 100 records are going to be returned in either the csv or dataframe.

dataset_builder.with_number_of_records_from_query_results(number_of_records=N)

On the other hand, with_number_of_recent_records_by_record_identifier is used to deal with records which have the same identifier. They are going to be sorted according to event_time and return at most N recent records in the output.

dataset_builder.with_number_of_recent_records_by_record_identifier(number_of_recent_records=N)

Since these functions return the dataset builder, these functions can be chained.

dataset_builder
   .with_number_of_records_from_query_results(number_of_records=N)
   .include_duplicated_records()
   .with_number_of_recent_records_by_record_identifier(number_of_recent_records=N)
   .to_dataframe()

There are additional configurations that can be made for various use cases, such as time travel and point-in-time join. These are outlined in the Feature Store DatasetBuilder API Reference.

Delete a feature group

You can delete a feature group with the delete function. Note that it can take approximately 10-15 minutes to delete an online FeatureGroup with the InMemory StorageType.

feature_group.delete()

The following code example is from the fraud detection example.

identity_feature_group.delete()
transaction_feature_group.delete()