Feature Store APIs

Feature Group

class sagemaker.feature_store.feature_group.FeatureGroup(name=NOTHING, sagemaker_session=NOTHING, feature_definitions=NOTHING)

Bases: object

FeatureGroup definition.

This class instantiates a FeatureGroup object that comprises of a name for the FeatureGroup, session instance, and a list of feature definition objects i.e., FeatureDefinition.

Parameters
Return type

None

name

name of the FeatureGroup instance.

Type

str

sagemaker_session

session instance to perform boto calls. If None, a new Session will be created.

Type

Session

feature_definitions

list of FeatureDefinitions.

Type

Sequence[FeatureDefinition]

Method generated by attrs for class FeatureGroup.

create(s3_uri, record_identifier_name, event_time_feature_name, role_arn=None, online_store_kms_key_id=None, enable_online_store=False, ttl_duration=None, offline_store_kms_key_id=None, disable_glue_table_creation=False, data_catalog_config=None, description=None, tags=None, table_format=None, online_store_storage_type=None)

Create a SageMaker FeatureStore FeatureGroup.

Parameters
  • s3_uri (Union[str, bool]) – S3 URI of the offline store, set to False to disable offline store.

  • record_identifier_name (str) – name of the record identifier feature.

  • event_time_feature_name (str) – name of the event time feature.

  • role_arn (str) – ARN of the role used to call CreateFeatureGroup.

  • online_store_kms_key_id (str) – KMS key ARN for online store (default: None).

  • ttl_duration (TtlDuration) – Default time to live duration for records (default: None).

  • enable_online_store (bool) – whether to enable online store or not (default: False).

  • offline_store_kms_key_id (str) – KMS key ARN for offline store (default: None). If a KMS encryption key is not specified, SageMaker encrypts all data at rest using the default AWS KMS key. By defining your bucket-level key for SSE, you can reduce the cost of AWS KMS requests. For more information, see Bucket Key in the Amazon S3 User Guide.

  • disable_glue_table_creation (bool) – whether to turn off Glue table creation or not (default: False).

  • data_catalog_config (DataCatalogConfig) – configuration for Metadata store (default: None).

  • description (str) – description of the FeatureGroup (default: None).

  • tags (List[Dict[str, str]]) – list of tags for labeling a FeatureGroup (default: None).

  • table_format (TableFormatEnum) – format of the offline store table (default: None).

  • online_store_storage_type (OnlineStoreStorageTypeEnum) – storage type for the online store (default: None).

Returns

Response dict from service.

Return type

Dict[str, Any]

delete()

Delete a FeatureGroup.

describe(next_token=None)

Describe a FeatureGroup.

Parameters

next_token (str) – next_token to get next page of features.

Returns

Response dict from the service.

Return type

Dict[str, Any]

update(feature_additions=None, online_store_config=None)

Update a FeatureGroup and add new features from the given feature definitions.

Parameters
  • feature_additions (Sequence[Dict[str, str]) – list of feature definitions to be updated.

  • online_store_config (OnlineStoreConfigUpdate) – online store config to be updated.

Returns

Response dict from service.

Return type

Dict[str, Any]

update_feature_metadata(feature_name, description=None, parameter_additions=None, parameter_removals=None)

Update a feature metadata and add/remove metadata.

Parameters
  • feature_name (str) – name of the feature to update.

  • description (str) – description of the feature to update.

  • parameter_additions (Sequence[Dict[str, str]) – list of feature parameter to be added.

  • parameter_removals (Sequence[str]) – list of feature parameter key to be removed.

Returns

Response dict from service.

Return type

Dict[str, Any]

describe_feature_metadata(feature_name)

Describe feature metadata by feature name.

Parameters

feature_name (str) – name of the feature.

Returns

Response dict from service.

Return type

Dict[str, Any]

list_tags()

List all tags for a feature group.

Returns

list of key, value pair of the tags.

Return type

Sequence[Dict[str, str]]

list_parameters_for_feature_metadata(feature_name)

List all parameters for a feature metadata.

Parameters

feature_name (str) – name of the feature.

Returns

list of key, value pair of the parameters.

Return type

Sequence[Dict[str, str]]

load_feature_definitions(data_frame)

Load feature definitions from a Pandas DataFrame.

Column name is used as feature name. Feature type is inferred from the dtype of the column. Dtype int_, int8, int16, int32, int64, uint8, uint16, uint32 and uint64 are mapped to Integral feature type. Dtype float_, float16, float32 and float64 are mapped to Fractional feature type. string dtype is mapped to String feature type.

No feature definitions will be loaded if the given data_frame contains unsupported dtypes.

Parameters

data_frame (DataFrame) –

Returns

list of FeatureDefinition

Return type

Sequence[sagemaker.feature_store.feature_definition.FeatureDefinition]

get_record(record_identifier_value_as_string, feature_names=None)

Get a single record in a FeatureGroup

Parameters
  • record_identifier_value_as_string (String) – a String representing the value of the record identifier.

  • feature_names (Sequence[String]) – a list of Strings representing feature names.

Return type

Sequence[Dict[str, str]]

put_record(record, ttl_duration=None)

Put a single record in the FeatureGroup.

Parameters
  • record (Sequence[FeatureValue]) – a list contains feature values.

  • ttl_duration (TtlDuration) – customer specified ttl duration.

delete_record(record_identifier_value_as_string, event_time, deletion_mode=<DeletionModeEnum.SOFT_DELETE: 'SoftDelete'>)

Delete a single record from a FeatureGroup.

Parameters
  • record_identifier_value_as_string (String) – a String representing the value of the record identifier.

  • event_time (String) – a timestamp format String indicating when the deletion event occurred.

  • deletion_mode (DeletionModeEnum) – deletion mode for deleting record. (default: DetectionModeEnum.SOFT_DELETE)

ingest(data_frame, max_workers=1, max_processes=1, wait=True, timeout=None, profile_name=None)

Ingest the content of a pandas DataFrame to feature store.

max_worker the number of threads created to work on different partitions of the data_frame in parallel.

max_processes the number of processes will be created to work on different partitions of the data_frame in parallel, each with max_worker threads.

The ingest function attempts to ingest all records in the data frame. SageMaker Feature Store throws an exception if it fails to ingest any records.

If wait is True, Feature Store runs the ingest function synchronously. You receive an IngestionError if there are any records that can’t be ingested. If wait is False, Feature Store runs the ingest function asynchronously.

Instead of setting wait to True in the ingest function, you can invoke the wait function on the returned instance of IngestionManagerPandas to run the ingest function synchronously.

To access the rows that failed to ingest, set wait to False. The IngestionError.failed_rows object saves all of the rows that failed to ingest.

profile_name argument is an optional one. It will use the default credential if None is passed. This profile_name is used in the sagemaker_featurestore_runtime client only. See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for more about the default credential.

Parameters
  • data_frame (DataFrame) – data_frame to be ingested to feature store.

  • max_workers (int) – number of threads to be created.

  • max_processes (int) – number of processes to be created. Each process spawns max_worker number of threads.

  • wait (bool) – whether to wait for the ingestion to finish or not.

  • timeout (Union[int, float]) – concurrent.futures.TimeoutError will be raised if timeout is reached.

  • profile_name (str) – the profile credential should be used for PutRecord (default: None).

Returns

An instance of IngestionManagerPandas.

Return type

sagemaker.feature_store.feature_group.IngestionManagerPandas

athena_query()

Create an AthenaQuery instance.

Returns

An instance of AthenaQuery initialized with data catalog configurations.

Return type

sagemaker.feature_store.feature_group.AthenaQuery

as_hive_ddl(database='sagemaker_featurestore', table_name=None)

Generate Hive DDL commands to define or change structure of tables or databases in Hive.

Schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type. Integral feature type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type. String feature type is mapped to STRING data-type.

Parameters
  • database (str) – name of the database. If not set “sagemaker_featurestore” will be used.

  • table_name (str) – name of the table. If not set the name of this feature group will be used.

Returns

Generated create table DDL string.

Return type

str

class sagemaker.feature_store.feature_group.AthenaQuery(catalog, database, table_name, sagemaker_session)

Bases: object

Class to manage querying of feature store data with AWS Athena.

This class instantiates a AthenaQuery object that is used to retrieve data from feature store via standard SQL queries.

Parameters
Return type

None

catalog

name of the data catalog.

Type

str

database

name of the database.

Type

str

table_name

name of the table.

Type

str

sagemaker_session

instance of the Session class to perform boto calls.

Type

Session

Method generated by attrs for class AthenaQuery.

run(query_string, output_location, kms_key=None, workgroup=None)

Execute a SQL query given a query string, output location and kms key.

This method executes the SQL query using Athena and outputs the results to output_location and returns the execution id of the query.

Parameters
  • query_string (str) – SQL query string.

  • output_location (str) – S3 URI of the query result.

  • kms_key (str) – KMS key id. If set, will be used to encrypt the query result file.

  • workgroup (str) – The name of the workgroup in which the query is being started.

Returns

Execution id of the query.

Return type

str

wait()

Wait for the current query to finish.

get_query_execution()

Get execution status of the current query.

Returns

Response dict from Athena.

Return type

Dict[str, Any]

as_dataframe(**kwargs)

Download the result of the current query and load it into a DataFrame.

Parameters

**kwargs (object) – key arguments used for the method pandas.read_csv to be able to have a better tuning on data. For more info read: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html

Returns

A pandas DataFrame contains the query result.

Return type

pandas.core.frame.DataFrame

class sagemaker.feature_store.feature_group.IngestionManagerPandas(feature_group_name, sagemaker_fs_runtime_client_config=None, sagemaker_session=None, max_workers=1, max_processes=1, profile_name=None, async_result=None, processing_pool=None, failed_indices=NOTHING)

Bases: object

Class to manage the multi-threaded data ingestion process.

This class will manage the data ingestion process which is multi-threaded.

Parameters
  • feature_group_name (str) –

  • sagemaker_fs_runtime_client_config (botocore.config.Config) –

  • sagemaker_session (sagemaker.session.Session) –

  • max_workers (int) –

  • max_processes (int) –

  • profile_name (str) –

  • async_result (multiprocessing.pool.ApplyResult) –

  • processing_pool (pathos.multiprocessing.ProcessPool) –

  • failed_indices (List[int]) –

Return type

None

feature_group_name

name of the Feature Group.

Type

str

sagemaker_fs_runtime_client_config

instance of the Config class for boto calls.

Type

Config

sagemaker_session

session instance to perform boto calls.

Type

Session

data_frame

pandas DataFrame to be ingested to the given feature group.

Type

DataFrame

max_workers

number of threads to create.

Type

int

max_processes

number of processes to create. Each process spawns max_workers threads.

Type

int

profile_name

the profile credential should be used for PutRecord (default: None).

Type

str

Method generated by attrs for class IngestionManagerPandas.

property failed_rows

Get rows that failed to ingest.

Returns

List of row indices that failed to be ingested.

wait(timeout=None)

Wait for the ingestion process to finish.

Parameters

timeout (Union[int, float]) – concurrent.futures.TimeoutError will be raised if timeout is reached.

run(data_frame, wait=True, timeout=None)

Start the ingestion process.

Parameters
  • data_frame (DataFrame) – source DataFrame to be ingested.

  • wait (bool) – whether to wait for the ingestion to finish or not.

  • timeout (Union[int, float]) – concurrent.futures.TimeoutError will be raised if timeout is reached.

Feature Definition

class sagemaker.feature_store.feature_definition.FeatureDefinition(feature_name, feature_type, collection_type=None)

Bases: sagemaker.feature_store.inputs.Config

Feature definition.

This instantiates a Feature Definition object where FeatureDefinition is a subclass of Config.

Parameters
Return type

None

feature_name

The name of the feature

Type

str

feature_type

The type of the feature

Type

FeatureTypeEnum

collection_type

The type of collection for the feature

Type

CollectionType

Method generated by attrs for class FeatureDefinition.

to_dict()

Construct a dictionary based on each attribute.

Return type

Dict[str, Any]

class sagemaker.feature_store.feature_definition.FractionalFeatureDefinition(feature_name, collection_type=None)

Bases: sagemaker.feature_store.feature_definition.FeatureDefinition

Fractional feature definition.

This class instantiates a FractionalFeatureDefinition object, a subclass of FeatureDefinition where the data type of the feature being defined is a Fractional.

feature_name

The name of the feature

Type

str

feature_type

A FeatureTypeEnum.FRACTIONAL type

Type

FeatureTypeEnum

collection_type

The type of collection for the feature

Type

CollectionType

Construct an instance of FractionalFeatureDefinition.

Parameters
class sagemaker.feature_store.feature_definition.IntegralFeatureDefinition(feature_name, collection_type=None)

Bases: sagemaker.feature_store.feature_definition.FeatureDefinition

Fractional feature definition.

This class instantiates a IntegralFeatureDefinition object, a subclass of FeatureDefinition where the data type of the feature being defined is a Integral.

feature_name

the name of the feature.

Type

str

feature_type

a FeatureTypeEnum.INTEGRAL type.

Type

FeatureTypeEnum

collection_type

The type of collection for the feature.

Type

CollectionType

Construct an instance of IntegralFeatureDefinition.

Parameters
class sagemaker.feature_store.feature_definition.StringFeatureDefinition(feature_name, collection_type=None)

Bases: sagemaker.feature_store.feature_definition.FeatureDefinition

Fractional feature definition.

This class instantiates a StringFeatureDefinition object, a subclass of FeatureDefinition where the data type of the feature being defined is a String.

feature_name

the name of the feature.

Type

str

feature_type

a FeatureTypeEnum.STRING type.

Type

FeatureTypeEnum

collection_type

The type of collection for the feature.

Type

CollectionType

Construct an instance of StringFeatureDefinition.

Parameters
class sagemaker.feature_store.feature_definition.FeatureTypeEnum(value)

Bases: enum.Enum

Enum of feature types.

The data type of a feature can be Fractional, Integral or String.

class sagemaker.feature_store.feature_definition.CollectionTypeEnum(value)

Bases: enum.Enum

Enum of collection types.

The collection type of a feature can be List, Set or Vector.

class sagemaker.feature_store.feature_definition.CollectionType(collection_type, collection_config)

Bases: sagemaker.feature_store.inputs.Config

Collection type and its configuration.

This initiates a collectiontype object where CollectionType is a subclass of Config.

Parameters
Return type

None

collection_type

The type of the collection

Type

CollectionTypeEnum

collection_config

The configuration for the collection.

Type

Dict[str, Any]

Method generated by attrs for class CollectionType.

to_dict()

Construct a dictionary based on each attribute.

Return type

Dict[str, Any]

class sagemaker.feature_store.feature_definition.ListCollectionType

Bases: sagemaker.feature_store.feature_definition.CollectionType

List collection type

This class instantiates a ListCollectionType object, as subclass of CollectionType where the collection type is defined as List.

Construct an instance of ListCollectionType.

class sagemaker.feature_store.feature_definition.SetCollectionType

Bases: sagemaker.feature_store.feature_definition.CollectionType

Set collection type

This class instantiates a SetCollectionType object, as subclass of CollectionType where the collection type is defined as Set.

Construct an instance of SetCollectionType.

class sagemaker.feature_store.feature_definition.VectorCollectionType(dimension)

Bases: sagemaker.feature_store.feature_definition.CollectionType

Vector collection type

This class instantiates a VectorCollectionType object, as subclass of CollectionType where the collection type is defined as Vector.

Parameters

dimension (int) –

dimension

The dimension size for the Vector.

Type

int

Construct an instance of VectorCollectionType.

Attributes: dimension (int): The dimension size for the Vector.

Inputs

class sagemaker.feature_store.inputs.Config

Bases: abc.ABC

Base config object for FeatureStore.

Configs must implement the to_dict method.

abstract to_dict()

Get the dictionary from attributes.

Returns

dict contains the attributes.

Return type

Dict[str, Any]

classmethod construct_dict(**kwargs)

Construct the dictionary based on the args.

Parameters

kwargs – args to be used to construct the dict.

Returns

dict represents the given kwargs.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.DataCatalogConfig(table_name=NOTHING, catalog=NOTHING, database=NOTHING)

Bases: sagemaker.feature_store.inputs.Config

DataCatalogConfig for FeatureStore.

Parameters
  • table_name (str) –

  • catalog (str) –

  • database (str) –

Return type

None

table_name

name of the table.

Type

str

catalog

name of the catalog.

Type

str

database

name of the database.

Type

str

Method generated by attrs for class DataCatalogConfig.

to_dict()

Construct a dictionary based on the attributes provided.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.OfflineStoreConfig(s3_storage_config, disable_glue_table_creation=False, data_catalog_config=None, table_format=None)

Bases: sagemaker.feature_store.inputs.Config

OfflineStoreConfig for FeatureStore.

Parameters
Return type

None

s3_storage_config

configuration of S3 storage.

Type

S3StorageConfig

disable_glue_table_creation

whether to disable the Glue table creation.

Type

bool

data_catalog_config

configuration of the data catalog.

Type

DataCatalogConfig

table_format

format of the offline store table.

Type

TableFormatEnum

Method generated by attrs for class OfflineStoreConfig.

to_dict()

Construct a dictionary based on the attributes.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.OnlineStoreConfig(enable_online_store=True, online_store_security_config=None, ttl_duration=None, storage_type=None)

Bases: sagemaker.feature_store.inputs.Config

OnlineStoreConfig for FeatureStore.

Parameters
Return type

None

enable_online_store

whether to enable the online store.

Type

bool

online_store_security_config

configuration of security setting.

Type

OnlineStoreSecurityConfig

ttl_duration

Default time to live duration for records.

Type

TtlDuration

Method generated by attrs for class OnlineStoreConfig.

to_dict()

Construct a dictionary based on the attributes.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.OnlineStoreSecurityConfig(kms_key_id=NOTHING)

Bases: sagemaker.feature_store.inputs.Config

OnlineStoreSecurityConfig for FeatureStore.

Parameters

kms_key_id (str) –

Return type

None

kms_key_id

KMS key id.

Type

str

Method generated by attrs for class OnlineStoreSecurityConfig.

to_dict()

Construct a dictionary based on the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.S3StorageConfig(s3_uri, kms_key_id=None)

Bases: sagemaker.feature_store.inputs.Config

S3StorageConfig for FeatureStore.

Parameters
  • s3_uri (str) –

  • kms_key_id (str) –

Return type

None

s3_uri

S3 URI.

Type

str

kms_key_id

KMS key id.

Type

str

Method generated by attrs for class S3StorageConfig.

to_dict()

Construct a dictionary based on the attributes provided.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.FeatureValue(feature_name=None, value_as_string=None, value_as_string_list=None)

Bases: sagemaker.feature_store.inputs.Config

FeatureValue for FeatureStore.

Parameters
  • feature_name (str) –

  • value_as_string (str) –

  • value_as_string_list (List[str]) –

Return type

None

feature_name

name of the Feature.

Type

str

value_as_string

value of the Feature in string form.

Type

str

value_as_string_list

value of the Feature in string list

Type

List[str]

form used for collection type.

Method generated by attrs for class FeatureValue.

to_dict()

Construct a dictionary based on the attributes provided.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.TableFormatEnum(value)

Bases: enum.Enum

Enum of table formats.

The offline store table formats can be Glue or Iceberg.

class sagemaker.feature_store.inputs.OnlineStoreStorageTypeEnum(value)

Bases: enum.Enum

Enum of storage types for online store.

The online store storage types can be Standard or InMemory.

class sagemaker.feature_store.inputs.ResourceEnum(value)

Bases: enum.Enum

Enum of resources.

The data type of resource can be FeatureGroup or FeatureMetadata.

class sagemaker.feature_store.inputs.SearchOperatorEnum(value)

Bases: enum.Enum

Enum of search operators.

The data type of search operator can be And or Or.

class sagemaker.feature_store.inputs.SortOrderEnum(value)

Bases: enum.Enum

Enum of sort orders.

The data type of sort order can be Ascending or Descending.

class sagemaker.feature_store.inputs.FilterOperatorEnum(value)

Bases: enum.Enum

Enum of filter operators.

The data type of filter operator can be Equals, NotEquals, GreaterThan, GreaterThanOrEqualTo, LessThan, LessThanOrEqualTo, Contains, Exists, NotExists, or In.

class sagemaker.feature_store.inputs.Filter(name, value, operator=None)

Bases: sagemaker.feature_store.inputs.Config

Filter for FeatureStore search.

Parameters
Return type

None

name

A resource property name.

Type

str

value

A value used with Name and Operator to determine which resources satisfy the filter’s condition.

Type

str

operator

A Boolean binary operator that is used to evaluate the

Type

FilterOperatorEnum

filter. If specify ``Value`` without ``Operator``, Amazon SageMaker uses ``Equals``
(default

None).

Method generated by attrs for class Filter.

to_dict()

Construct a dictionary based on the attributes provided.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.Identifier(feature_group_name, record_identifiers_value_as_string, feature_names=None)

Bases: sagemaker.feature_store.inputs.Config

Identifier of batch get record API.

Parameters
  • feature_group_name (str) –

  • record_identifiers_value_as_string (List[str]) –

  • feature_names (List[str]) –

Return type

None

feature_group_name

name of a feature group.

Type

str

record_identifiers_value_as_string

string value of record identifier.

Type

List[str]

feature_names

list of feature names (default: None).

Type

List[str]

Method generated by attrs for class Identifier.

to_dict()

Construct a dictionary based on the attributes provided.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

class sagemaker.feature_store.inputs.FeatureParameter(key=None, value=None)

Bases: sagemaker.feature_store.inputs.Config

FeatureParameter for FeatureStore.

Parameters
  • key (str) –

  • value (str) –

Return type

None

key

key of the parameter.

Type

str

value

value of the parameter.

Type

str

Method generated by attrs for class FeatureParameter.

to_dict()

Construct a dictionary based on the attributes provided.

Returns

dict represents the attributes.

Return type

Dict[str, Any]

Dataset Builder

class sagemaker.feature_store.dataset_builder.DatasetBuilder(sagemaker_session, base, output_path, record_identifier_feature_name=None, event_time_identifier_feature_name=None, included_feature_names=None, kms_key_id=None, event_time_identifier_feature_type=None)

Bases: object

DatasetBuilder definition.

This class instantiates a DatasetBuilder object that comprises a base, a list of feature names, an output path and a KMS key ID.

Parameters
Return type

None

_sagemaker_session

Session instance to perform boto calls.

Type

Session

_base

A base which can be either a FeatureGroup or a pandas.DataFrame and will be used to merge other FeatureGroups and generate a Dataset.

Type

Union[FeatureGroup, DataFrame]

_output_path

An S3 URI which stores the output .csv file.

Type

str

_record_identifier_feature_name

A string representing the record identifier feature if base is a DataFrame (default: None).

Type

str

_event_time_identifier_feature_name

A string representing the event time identifier feature if base is a DataFrame (default: None).

Type

str

_included_feature_names

A list of strings representing features to be included in the output. If not set, all features will be included in the output. (default: None).

Type

List[str]

_kms_key_id

A KMS key id. If set, will be used to encrypt the result file (default: None).

Type

str

_point_in_time_accurate_join

A boolean representing if point-in-time join is applied to the resulting dataframe when calling “to_dataframe”. When set to True, users can retrieve data using “row-level time travel” according to the event times provided to the DatasetBuilder. This requires that the entity dataframe with event times is submitted as the base in the constructor (default: False).

Type

bool

_include_duplicated_records

A boolean representing whether the resulting dataframe when calling “to_dataframe” should include duplicated records (default: False).

Type

bool

_include_deleted_records

A boolean representing whether the resulting dataframe when calling “to_dataframe” should include deleted records (default: False).

Type

bool

_number_of_recent_records

An integer representing how many records will be returned for each record identifier (default: 1).

Type

int

_number_of_records

An integer representing the number of records that should be returned in the resulting dataframe when calling “to_dataframe” (default: None).

Type

int

_write_time_ending_timestamp

A datetime that represents the latest write time for a record to be included in the resulting dataset. Records with a newer write time will be omitted from the resulting dataset. (default: None).

Type

datetime.datetime

_event_time_starting_timestamp

A datetime that represents the earliest event time for a record to be included in the resulting dataset. Records with an older event time will be omitted from the resulting dataset. (default: None).

Type

datetime.datetime

_event_time_ending_timestamp

A datetime that represents the latest event time for a record to be included in the resulting dataset. Records with a newer event time will be omitted from the resulting dataset. (default: None).

Type

datetime.datetime

_feature_groups_to_be_merged

A list of FeatureGroupToBeMerged which will be joined to base (default: []).

Type

List[FeatureGroupToBeMerged]

_event_time_identifier_feature_type

A FeatureTypeEnum representing the type of event time identifier feature (default: None).

Type

FeatureTypeEnum

Method generated by attrs for class DatasetBuilder.

with_feature_group(feature_group, target_feature_name_in_base=None, included_feature_names=None, feature_name_in_target=None, join_comparator=JoinComparatorEnum(), join_type=JoinTypeEnum())

Join FeatureGroup with base.

Parameters
  • feature_group (FeatureGroup) – A target FeatureGroup which will be joined to base.

  • target_feature_name_in_base (str) – A string representing the feature name in base which will be used as a join key (default: None).

  • included_feature_names (List[str]) – A list of strings representing features to be included in the output (default: None).

  • feature_name_in_target (str) – A string representing the feature name in the target feature group that will be compared to the target feature in the base feature group. If None is provided, the record identifier feature will be used in the SQL join. (default: None).

  • join_comparator (JoinComparatorEnum) – A JoinComparatorEnum representing the comparator used when joining the target feature in the base feature group and the feature in the target feature group. (default: JoinComparatorEnum.EQUALS).

  • join_type (JoinTypeEnum) – A JoinTypeEnum representing the type of join between the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).

  • Returns – This DatasetBuilder object.

point_in_time_accurate_join()

Enable point-in-time accurate join.

Returns

This DatasetBuilder object.

include_duplicated_records()

Include duplicated records in dataset.

Returns

This DatasetBuilder object.

include_deleted_records()

Include deleted records in dataset.

Returns

This DatasetBuilder object.

with_number_of_recent_records_by_record_identifier(number_of_recent_records)

Set number_of_recent_records field with provided input.

Parameters

number_of_recent_records (int) – An int that how many recent records will be returned for each record identifier.

Returns

This DatasetBuilder object.

with_number_of_records_from_query_results(number_of_records)

Set number_of_records field with provided input.

Parameters

number_of_records (int) – An int that how many records will be returned.

Returns

This DatasetBuilder object.

as_of(timestamp)

Set write_time_ending_timestamp field with provided input.

Parameters

timestamp (datetime.datetime) – A datetime that all records’ write time in dataset will be before it.

Returns

This DatasetBuilder object.

with_event_time_range(starting_timestamp=None, ending_timestamp=None)

Set event_time_starting_timestamp and event_time_ending_timestamp with provided inputs.

Parameters
  • starting_timestamp (datetime.datetime) – A datetime that all records’ event time in dataset will be after it (default: None).

  • ending_timestamp (datetime.datetime) – A datetime that all records’ event time in dataset will be before it (default: None).

Returns

This DatasetBuilder object.

to_csv_file()

Get query string and result in .csv format file

Returns

The S3 path of the .csv file. The query string executed.

Return type

Tuple[str, str]

to_dataframe()

Get query string and result in pandas.Dataframe

Returns

The pandas.DataFrame object. The query string executed.

Return type

Tuple[pandas.core.frame.DataFrame, str]

Feature Store

class sagemaker.feature_store.feature_store.FeatureStore(sagemaker_session=<class 'sagemaker.session.Session'>)

Bases: object

FeatureStore definition.

This class instantiates a FeatureStore object that comprises a SageMaker session instance.

Parameters

sagemaker_session (sagemaker.session.Session) –

Return type

None

sagemaker_session

session instance to perform boto calls.

Type

Session

Method generated by attrs for class FeatureStore.

create_dataset(base, output_path, record_identifier_feature_name=None, event_time_identifier_feature_name=None, included_feature_names=None, kms_key_id=None)

Create a Dataset Builder for generating a Dataset.

Parameters
  • base (Union[FeatureGroup, DataFrame]) – A base which can be either a FeatureGroup or a pandas.DataFrame and will be used to merge other FeatureGroups and generate a Dataset.

  • output_path (str) – An S3 URI which stores the output .csv file.

  • record_identifier_feature_name (str) – A string representing the record identifier feature if base is a DataFrame (default: None).

  • event_time_identifier_feature_name (str) – A string representing the event time identifier feature if base is a DataFrame (default: None).

  • included_feature_names (List[str]) – A list of features to be included in the output (default: None).

  • kms_key_id (str) – An KMS key id. If set, will be used to encrypt the result file (default: None).

Raises

ValueError – Base is a Pandas DataFrame but no record identifier feature name nor event time identifier feature name is provided.

Return type

sagemaker.feature_store.dataset_builder.DatasetBuilder

list_feature_groups(name_contains=None, feature_group_status_equals=None, offline_store_status_equals=None, creation_time_after=None, creation_time_before=None, sort_order=None, sort_by=None, max_results=None, next_token=None)

List all FeatureGroups satisfying given filters.

Parameters
  • name_contains (str) – A string that partially matches one or more FeatureGroups’ names. Filters FeatureGroups by name.

  • feature_group_status_equals (str) – A FeatureGroup status. Filters FeatureGroups by FeatureGroup status.

  • offline_store_status_equals (str) – An OfflineStore status. Filters FeatureGroups by OfflineStore status.

  • creation_time_after (datetime.datetime) – Use this parameter to search for FeatureGroups created after a specific date and time.

  • creation_time_before (datetime.datetime) – Use this parameter to search for FeatureGroups created before a specific date and time.

  • sort_order (str) – The order in which FeatureGroups are listed.

  • sort_by (str) – The value on which the FeatureGroup list is sorted.

  • max_results (int) – The maximum number of results returned by ListFeatureGroups.

  • next_token (str) – A token to resume pagination of ListFeatureGroups results.

Returns

Response dict from service.

Return type

Dict[str, Any]

batch_get_record(identifiers, expiration_time_response=None)

Get record in batch from FeatureStore

Parameters
  • identifiers (Sequence[Identifier]) – A list of identifiers to uniquely identify records in FeatureStore.

  • expiration_time_response (str) – the field of expiration time response to toggle returning of expiresAt.

Returns

Response dict from service.

Return type

Dict[str, Any]

search(resource, filters=None, operator=None, sort_by=None, sort_order=None, next_token=None, max_results=None)

Search for FeatureGroups or FeatureMetadata satisfying given filters.

Parameters
  • resource (ResourceEnum) – The name of the Amazon SageMaker resource to search for. Valid values are FeatureGroup or FeatureMetadata.

  • filters (Sequence[Filter]) – A list of filter objects (Default: None).

  • operator (SearchOperatorEnum) – A Boolean operator used to evaluate the filters. Valid values are And or Or. The default is And (Default: None).

  • sort_by (str) – The name of the resource property used to sort the SearchResults. The default is LastModifiedTime.

  • sort_order (SortOrderEnum) – How SearchResults are ordered. Valid values are Ascending or Descending. The default is Descending.

  • next_token (str) – If more than MaxResults resources match the specified filters, the response includes a NextToken. The NextToken can be passed to the next SearchRequest to continue retrieving results (Default: None).

  • max_results (int) – The maximum number of results to return (Default: None).

Returns

Response dict from service.

Return type

Dict[str, Any]

@feature_processor Decorator

@sagemaker.feature_store.feature_processor.feature_processor(inputs, output, target_stores=None, parameters=None, enable_ingestion=True, spark_config=None)

Decorator to facilitate feature engineering for Feature Groups.

If the decorated function is executed without arguments then the decorated function’s arguments are automatically loaded from the input data sources. Outputs are ingested to the output Feature Group. If arguments are provided to this function, then arguments are not automatically loaded (for testing).

Decorated functions must conform to the expected signature. Parameters: one parameter of type pyspark.sql.DataFrame for each DataSource in ‘inputs’; followed by the optional parameters with names and types in [params: Dict[str, Any], spark: SparkSession]. Outputs: a single return value of type pyspark.sql.DataFrame. The function can have any name.

Example:

@feature_processor(
    inputs=[FeatureGroupDataSource("input-fg"), CSVDataSource("s3://bucket/prefix)],
    output='arn:aws:sagemaker:us-west-2:123456789012:feature-group/output-fg'
)
def transform(
    input_feature_group: DataFrame, input_csv: DataFrame, params: Dict[str, Any],
    spark: SparkSession
) -> DataFrame:
    return ...

More concisely:

@feature_processor(
    inputs=[FeatureGroupDataSource("input-fg"), CSVDataSource("s3://bucket/prefix)],
    output='arn:aws:sagemaker:us-west-2:123456789012:feature-group/output-fg'
)
def transform(input_feature_group, input_csv):
    return ...
Parameters
  • (Sequence[Union[FeatureGroupDataSource, CSVDataSource, ParquetDataSource, (inputs) – BaseDataSource]]): A list of data sources.

  • output (str) – A Feature Group ARN to write results of this function to.

  • target_stores (Optional[list[str]], optional) – A list containing at least one of ‘OnlineStore’ or ‘OfflineStore’. If unspecified, data will be ingested to the enabled stores of the output feature group. Defaults to None.

  • parameters (Optional[Dict[str, Union[str, Dict]]], optional) – Parameters to be provided to the decorated function, available as the ‘params’ argument. Useful for parameterized functions. The params argument also contains the set of system provided parameters under the key ‘system’. E.g. ‘scheduled_time’: a timestamp representing the time that the execution was scheduled to execute at, if triggered by a Scheduler, otherwise, the current time.

  • enable_ingestion (bool, optional) – A boolean indicating whether the decorated function’s return value is ingested to the ‘output’ Feature Group. This flag is useful during the development phase to ensure that data is not used until the function is ready. It also useful for users that want to manage their own data ingestion. Defaults to True.

  • spark_config (Dict[str, str]) – A dict contains the key-value paris for Spark configurations.

  • inputs (Sequence[Union[sagemaker.feature_store.feature_processor._data_source.FeatureGroupDataSource, sagemaker.feature_store.feature_processor._data_source.CSVDataSource, sagemaker.feature_store.feature_processor._data_source.ParquetDataSource, sagemaker.feature_store.feature_processor._data_source.BaseDataSource]]) –

Raises

IngestionError – If any rows are not ingested successfully then a sample of the records, with failure reasons, is logged.

Returns

The decorated function.

Return type

Callable

Feature Processor Data Source

class sagemaker.feature_store.feature_processor.FeatureGroupDataSource(name, input_start_offset=None, input_end_offset=None)

Bases: object

A Feature Group data source definition for a FeatureProcessor.

Parameters
  • name (str) –

  • input_start_offset (Optional[str]) –

  • input_end_offset (Optional[str]) –

Return type

None

name

The name or ARN of the Feature Group.

Type

str

input_start_offset

A duration specified as a string in the format ‘<no> <unit>’ where ‘no’ is a number and ‘unit’ is a unit of time in [‘hours’, ‘days’, ‘weeks’, ‘months’, ‘years’] (plural and singular forms). Inputs contain data with event times no earlier than input_start_offset in the past. Offsets are relative to the function execution time. If the function is executed by a Schedule, then the offset is relative to the scheduled start time. Defaults to None.

Type

Optional[str], optional

input_end_offset

The ‘end’ (as opposed to start) counterpart for the ‘input_start_offset’. Inputs will contain records with event times no later than ‘input_end_offset’ in the past. Defaults to None.

Type

Optional[str], optional

Method generated by attrs for class FeatureGroupDataSource.

class sagemaker.feature_store.feature_processor.CSVDataSource(s3_uri, csv_header=True, csv_infer_schema=False)

Bases: object

An CSV data source definition for a FeatureProcessor.

Parameters
  • s3_uri (str) –

  • csv_header (bool) –

  • csv_infer_schema (bool) –

Return type

None

s3_uri

S3 URI of the data source.

Type

str

csv_header

Whether to read the first line of the CSV file as column names. This option is only valid when file_format is set to csv. By default the value of this option is true, and all column types are assumed to be a string.

Type

bool

infer_schema

Whether to infer the schema of the CSV data source. This option is only valid when file_format is set to csv. If set to true, two passes of the data is required to load and infer the schema.

Type

bool

Method generated by attrs for class CSVDataSource.

class sagemaker.feature_store.feature_processor.ParquetDataSource(s3_uri)

Bases: object

An parquet data source definition for a FeatureProcessor.

Parameters

s3_uri (str) –

Return type

None

s3_uri

S3 URI of the data source.

Type

str

Method generated by attrs for class ParquetDataSource.

Feature Processor Scheduler

feature_processor.to_pipeline(step, role=None, transformation_code=None, max_retries=None, tags=None, sagemaker_session=None)

Creates a sagemaker pipeline that takes in a callable as a training step.

To configure training step used in sagemaker pipeline, input argument step needs to be wrapped by remote decorator in module sagemaker.remote_function. If not wrapped by remote decorator, default configurations in sagemaker.remote_function.job._JobSettings will be used to create training step.

Parameters
  • pipeline_name (str) – The name of the pipeline.

  • step (Callable) – A user provided function wrapped by feature_processor and optionally wrapped by remote_decorator.

  • role (Optional[str]) – The Amazon Resource Name (ARN) of the role used by the pipeline to access and create resources. If not specified, it will default to the credentials provided by the AWS configuration chain.

  • transformation_code (Optional[str]) – The data source for a reference to the transformation code for Lineage tracking. This code is not used for actual transformation.

  • max_retries (Optional[int]) – The number of times to retry sagemaker pipeline step. If not specified, sagemaker pipline step will not retry.

  • tags (List[Tuple[str, str]) – A list of tags attached to the pipeline and all corresponding lineage resources that support tags. If not specified, no custom tags will be attached.

  • sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.

Returns

SageMaker Pipeline ARN.

Return type

str

feature_processor.schedule(schedule_expression, role_arn=None, state='ENABLED', start_date=None, sagemaker_session=None)

Creates an EventBridge Schedule that schedules executions of a sagemaker pipeline.

The pipeline created will also have a pipeline parameter scheduled-time indicating when the pipeline is scheduled to run.

Parameters
  • pipeline_name (str) – The SageMaker Pipeline name that will be scheduled.

  • schedule_expression (str) – The expression that defines when the schedule runs. It supports at expression, rate expression and cron expression. See the CreateSchedule API for more details.

  • state (str) – Specifies whether the schedule is enabled or disabled. Valid values are ENABLED and DISABLED. See the State request parameter for more details. If not specified, it will default to ENABLED.

  • start_date (Optional[datetime]) – The date, in UTC, after which the schedule can begin invoking its target. Depending on the schedule’s recurrence expression, invocations might occur on, or after, the StartDate you specify.

  • role_arn (Optional[str]) – The Amazon Resource Name (ARN) of the IAM role that EventBridge Scheduler will assume for this target when the schedule is invoked.

  • sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.

Returns

The EventBridge Schedule ARN.

Return type

str

feature_processor.execute(execution_time=None, sagemaker_session=None)

Starts an execution of a SageMaker Pipeline created by feature_processor

Parameters
  • pipeline_name (str) – The SageMaker Pipeline name that will be executed.

  • execution_time (datetime) – The date, in UTC, will be used as a sagemaker pipeline parameter indicating the time which at which the execution is scheduled to execute. If not specified, it will default to the current timestamp.

  • sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.

Returns

The pipeline execution ARN.

Return type

str

feature_processor.delete_schedule(sagemaker_session=None)

Delete EventBridge Schedule corresponding to a SageMaker Pipeline if there is one.

Parameters
  • pipeline_name (str) – The name of the SageMaker Pipeline that needs to be deleted

  • sagemaker_session (Optional[sagemaker.session.Session]) – (Optional[Session], optional): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.

Return type

None

feature_processor.describe(sagemaker_session=None)

Describe feature processor and other related resources.

This API will include details related to the feature processor including SageMaker Pipeline and EventBridge Schedule.

Parameters
  • pipeline_name (str) – Name of the pipeline.

  • sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.

Returns

Return information for resources related to feature processor.

Return type

Dict[str, Union[int, str]]

feature_processor.list_pipelines()

Lists all SageMaker Pipelines created by Feature Processor SDK.

Parameters

sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.

Returns

Return list of SageMaker Pipeline metadata created for

feature_processor.

Return type

List[Dict[str, Any]]