Feature Store APIs¶
Feature Group¶
- class sagemaker.feature_store.feature_group.FeatureGroup(name=_Nothing.NOTHING, sagemaker_session=_Nothing.NOTHING, feature_definitions=_Nothing.NOTHING)¶
Bases:
object
FeatureGroup definition.
This class instantiates a FeatureGroup object that comprises of a name for the FeatureGroup, session instance, and a list of feature definition objects i.e., FeatureDefinition.
- Parameters
name (str) –
sagemaker_session (Session) –
feature_definitions (Sequence[FeatureDefinition]) –
- sagemaker_session¶
session instance to perform boto calls. If None, a new Session will be created.
- Type
- feature_definitions¶
list of FeatureDefinitions.
- Type
Sequence[FeatureDefinition]
Method generated by attrs for class FeatureGroup.
- create(s3_uri, record_identifier_name, event_time_feature_name, role_arn=None, online_store_kms_key_id=None, enable_online_store=False, ttl_duration=None, offline_store_kms_key_id=None, disable_glue_table_creation=False, data_catalog_config=None, description=None, tags=None, table_format=None, online_store_storage_type=None, throughput_config=None)¶
Create a SageMaker FeatureStore FeatureGroup.
- Parameters
s3_uri (Union[str, bool]) – S3 URI of the offline store, set to
False
to disable offline store.record_identifier_name (str) – name of the record identifier feature.
event_time_feature_name (str) – name of the event time feature.
role_arn (str) – ARN of the role used to call CreateFeatureGroup.
online_store_kms_key_id (str) – KMS key ARN for online store (default: None).
ttl_duration (TtlDuration) – Default time to live duration for records (default: None).
enable_online_store (bool) – whether to enable online store or not (default: False).
offline_store_kms_key_id (str) – KMS key ARN for offline store (default: None). If a KMS encryption key is not specified, SageMaker encrypts all data at rest using the default AWS KMS key. By defining your bucket-level key for SSE, you can reduce the cost of AWS KMS requests. For more information, see Bucket Key in the Amazon S3 User Guide.
disable_glue_table_creation (bool) – whether to turn off Glue table creation or not (default: False).
data_catalog_config (DataCatalogConfig) – configuration for Metadata store (default: None).
description (str) – description of the FeatureGroup (default: None).
tags (Optional[Tags]) – Tags for labeling a FeatureGroup (default: None).
table_format (TableFormatEnum) – format of the offline store table (default: None).
online_store_storage_type (OnlineStoreStorageTypeEnum) – storage type for the online store (default: None).
throughput_config (ThroughputConfig) – throughput configuration of the feature group (default: None).
- Returns
Response dict from service.
- Return type
- delete()¶
Delete a FeatureGroup.
- describe(next_token=None)¶
Describe a FeatureGroup.
- update(feature_additions=None, online_store_config=None, throughput_config=None)¶
Update a FeatureGroup and add new features from the given feature definitions.
- Parameters
feature_additions (Sequence[Dict[str, str]) – list of feature definitions to be updated.
online_store_config (OnlineStoreConfigUpdate) – online store config to be updated.
throughput_config (ThroughputConfigUpdate) – target throughput configuration
- Returns
Response dict from service.
- Return type
- update_feature_metadata(feature_name, description=None, parameter_additions=None, parameter_removals=None)¶
Update a feature metadata and add/remove metadata.
- Parameters
- Returns
Response dict from service.
- Return type
- describe_feature_metadata(feature_name)¶
Describe feature metadata by feature name.
- list_tags()¶
List all tags for a feature group.
- list_parameters_for_feature_metadata(feature_name)¶
List all parameters for a feature metadata.
- load_feature_definitions(data_frame, online_storage_type=None)¶
Load feature definitions from a Pandas DataFrame.
Column name is used as feature name. Feature type is inferred from the dtype of the column. Dtype
int_
, int8, int16, int32, int64, uint8, uint16, uint32 and uint64 are mapped to Integral feature type. Dtypefloat_
, float16, float32 and float64 are mapped to Fractional feature type. string dtype is mapped to String feature type.No feature definitions will be loaded if the given data_frame contains unsupported dtypes.
For IN_MEMORY online_storage_type all collection type columns within DataFrame will be inferred as a List, instead of a String. Due to performance limitations, only first 1,000 values of the column will be sampled, when inferring collection Type. Customers can manually update the inferred collection type as needed.
- Parameters
data_frame (DataFrame) – A Pandas DataFrame containing features.
online_storage_type (OnlineStoreStorageTypeEnum) – Optional. Online storage type for the feature group. The value can be either STANDARD or IN_MEMORY If not specified,STANDARD will be used by default. If specified as IN_MEMORY, we will infer any collection type column within DataFrame as a List instead of a String. All, collection types (List, Set and Vector) will be inferred as List. We will only sample the first 1,000 values of the column when inferring collection Type.
- Returns
list of FeatureDefinition
- Return type
- get_record(record_identifier_value_as_string, feature_names=None)¶
Get a single record in a FeatureGroup
- put_record(record, target_stores=None, ttl_duration=None)¶
Put a single record in the FeatureGroup.
- Parameters
record (Sequence[FeatureValue]) – a list contains feature values.
target_stores (Sequence[str]) – a list of target stores.
ttl_duration (TtlDuration) – customer specified ttl duration.
- delete_record(record_identifier_value_as_string, event_time, deletion_mode=DeletionModeEnum.SOFT_DELETE)¶
Delete a single record from a FeatureGroup.
- Parameters
record_identifier_value_as_string (String) – a String representing the value of the record identifier.
event_time (String) – a timestamp format String indicating when the deletion event occurred.
deletion_mode (DeletionModeEnum) – deletion mode for deleting record. (default: DetectionModeEnum.SOFT_DELETE)
- ingest(data_frame, target_stores=None, max_workers=1, max_processes=1, wait=True, timeout=None, profile_name=None)¶
Ingest the content of a pandas DataFrame to feature store.
max_worker
the number of threads created to work on different partitions of thedata_frame
in parallel.max_processes
the number of processes will be created to work on different partitions of thedata_frame
in parallel, each withmax_worker
threads.The ingest function attempts to ingest all records in the data frame. SageMaker Feature Store throws an exception if it fails to ingest any records.
If
wait
isTrue
, Feature Store runs theingest
function synchronously. You receive anIngestionError
if there are any records that can’t be ingested. Ifwait
isFalse
, Feature Store runs theingest
function asynchronously.Instead of setting
wait
toTrue
in theingest
function, you can invoke thewait
function on the returned instance ofIngestionManagerPandas
to run theingest
function synchronously.To access the rows that failed to ingest, set
wait
toFalse
. TheIngestionError.failed_rows
object saves all the rows that failed to ingest.profile_name argument is an optional one. It will use the default credential if None is passed. This profile_name is used in the sagemaker_featurestore_runtime client only. See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for more about the default credential.
- Parameters
data_frame (DataFrame) – data_frame to be ingested to feature store.
target_stores (Sequence[TargetStoreEnum]) – target stores to be used for ingestion. (default: None).
max_workers (int) – number of threads to be created.
max_processes (int) – number of processes to be created. Each process spawns
max_worker
number of threads.wait (bool) – whether to wait for the ingestion to finish or not.
timeout (Union[int, float]) –
concurrent.futures.TimeoutError
will be raised if timeout is reached.profile_name (str) – the profile credential should be used for
PutRecord
(default: None).
- Returns
An instance of IngestionManagerPandas.
- Return type
- athena_query()¶
Create an AthenaQuery instance.
- Returns
An instance of AthenaQuery initialized with data catalog configurations.
- Return type
- as_hive_ddl(database='sagemaker_featurestore', table_name=None)¶
Generate Hive DDL commands to define or change structure of tables or databases in Hive.
Schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type. Integral feature type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type. String feature type is mapped to STRING data-type.
- class sagemaker.feature_store.feature_group.AthenaQuery(catalog, database, table_name, sagemaker_session)¶
Bases:
object
Class to manage querying of feature store data with AWS Athena.
This class instantiates a AthenaQuery object that is used to retrieve data from feature store via standard SQL queries.
Method generated by attrs for class AthenaQuery.
- run(query_string, output_location, kms_key=None, workgroup=None)¶
Execute a SQL query given a query string, output location and kms key.
This method executes the SQL query using Athena and outputs the results to output_location and returns the execution id of the query.
- Parameters
- Returns
Execution id of the query.
- Return type
- wait()¶
Wait for the current query to finish.
- get_query_execution()¶
Get execution status of the current query.
- as_dataframe(**kwargs)¶
Download the result of the current query and load it into a DataFrame.
- Parameters
**kwargs (object) – key arguments used for the method pandas.read_csv to be able to have a better tuning on data. For more info read: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
- Returns
A pandas DataFrame contains the query result.
- Return type
DataFrame
- class sagemaker.feature_store.feature_group.IngestionManagerPandas(feature_group_name, feature_definitions, sagemaker_fs_runtime_client_config=None, sagemaker_session=None, max_workers=1, max_processes=1, profile_name=None, async_result=None, processing_pool=None, failed_indices=_Nothing.NOTHING)¶
Bases:
object
Class to manage the multi-threaded data ingestion process.
This class will manage the data ingestion process which is multi-threaded.
- Parameters
- feature_definitions¶
dictionary of feature definitions. where the key is the feature name and the value is the FeatureDefinition. The FeatureDefinition contains the data type of the feature.
- Type
Dict[str, Dict[Any, Any]]
- data_frame¶
pandas DataFrame to be ingested to the given feature group.
- Type
DataFrame
Method generated by attrs for class IngestionManagerPandas.
- property failed_rows: List[int]¶
Get rows that failed to ingest.
- Returns
List of row indices that failed to be ingested.
- wait(timeout=None)¶
Wait for the ingestion process to finish.
- run(data_frame, target_stores=None, wait=True, timeout=None)¶
Start the ingestion process.
- Parameters
data_frame (DataFrame) – source DataFrame to be ingested.
target_stores (Sequence[TargetStoreEnum]) – list of target stores to be used for the ingestion. If None, the default target store is used.
wait (bool) – whether to wait for the ingestion to finish or not.
timeout (Union[int, float]) –
concurrent.futures.TimeoutError
will be raised if timeout is reached.
Feature Definition¶
- class sagemaker.feature_store.feature_definition.FeatureDefinition(feature_name, feature_type, collection_type=None)¶
Bases:
Config
Feature definition.
This instantiates a Feature Definition object where FeatureDefinition is a subclass of Config.
- Parameters
feature_name (str) –
feature_type (FeatureTypeEnum) –
collection_type (CollectionType) –
- feature_type¶
The type of the feature
- Type
- collection_type¶
The type of collection for the feature
- Type
Method generated by attrs for class FeatureDefinition.
- class sagemaker.feature_store.feature_definition.FractionalFeatureDefinition(feature_name, collection_type=None)¶
Bases:
FeatureDefinition
Fractional feature definition.
This class instantiates a FractionalFeatureDefinition object, a subclass of FeatureDefinition where the data type of the feature being defined is a Fractional.
- feature_type¶
A FeatureTypeEnum.FRACTIONAL type
- Type
- collection_type¶
The type of collection for the feature
- Type
Construct an instance of FractionalFeatureDefinition.
- Parameters
feature_name (str) – the name of the feature.
collection_type (CollectionType) –
- class sagemaker.feature_store.feature_definition.IntegralFeatureDefinition(feature_name, collection_type=None)¶
Bases:
FeatureDefinition
Fractional feature definition.
This class instantiates a IntegralFeatureDefinition object, a subclass of FeatureDefinition where the data type of the feature being defined is a Integral.
- feature_type¶
a FeatureTypeEnum.INTEGRAL type.
- Type
- collection_type¶
The type of collection for the feature.
- Type
Construct an instance of IntegralFeatureDefinition.
- Parameters
feature_name (str) – the name of the feature.
collection_type (CollectionType) –
- class sagemaker.feature_store.feature_definition.StringFeatureDefinition(feature_name, collection_type=None)¶
Bases:
FeatureDefinition
Fractional feature definition.
This class instantiates a StringFeatureDefinition object, a subclass of FeatureDefinition where the data type of the feature being defined is a String.
- feature_type¶
a FeatureTypeEnum.STRING type.
- Type
- collection_type¶
The type of collection for the feature.
- Type
Construct an instance of StringFeatureDefinition.
- Parameters
feature_name (str) – the name of the feature.
collection_type (CollectionType) –
- class sagemaker.feature_store.feature_definition.FeatureTypeEnum(value)¶
Bases:
Enum
Enum of feature types.
The data type of a feature can be Fractional, Integral or String.
- class sagemaker.feature_store.feature_definition.CollectionTypeEnum(value)¶
Bases:
Enum
Enum of collection types.
The collection type of a feature can be List, Set or Vector.
- class sagemaker.feature_store.feature_definition.CollectionType(collection_type, collection_config)¶
Bases:
Config
Collection type and its configuration.
This initiates a collectiontype object where CollectionType is a subclass of Config.
- Parameters
collection_type (CollectionTypeEnum) –
- collection_type¶
The type of the collection
- Type
Method generated by attrs for class CollectionType.
- class sagemaker.feature_store.feature_definition.ListCollectionType¶
Bases:
CollectionType
List collection type
This class instantiates a ListCollectionType object, as subclass of CollectionType where the collection type is defined as List.
Construct an instance of ListCollectionType.
- class sagemaker.feature_store.feature_definition.SetCollectionType¶
Bases:
CollectionType
Set collection type
This class instantiates a SetCollectionType object, as subclass of CollectionType where the collection type is defined as Set.
Construct an instance of SetCollectionType.
- class sagemaker.feature_store.feature_definition.VectorCollectionType(dimension)¶
Bases:
CollectionType
Vector collection type
This class instantiates a VectorCollectionType object, as subclass of CollectionType where the collection type is defined as Vector.
- Parameters
dimension (int) –
Construct an instance of VectorCollectionType.
Attributes: dimension (int): The dimension size for the Vector.
Inputs¶
- class sagemaker.feature_store.inputs.Config¶
Bases:
ABC
Base config object for FeatureStore.
Configs must implement the to_dict method.
- abstract to_dict()¶
Get the dictionary from attributes.
- class sagemaker.feature_store.inputs.DataCatalogConfig(table_name=_Nothing.NOTHING, catalog=_Nothing.NOTHING, database=_Nothing.NOTHING)¶
Bases:
Config
DataCatalogConfig for FeatureStore.
Method generated by attrs for class DataCatalogConfig.
- class sagemaker.feature_store.inputs.OfflineStoreConfig(s3_storage_config, disable_glue_table_creation=False, data_catalog_config=None, table_format=None)¶
Bases:
Config
OfflineStoreConfig for FeatureStore.
- Parameters
s3_storage_config (S3StorageConfig) –
disable_glue_table_creation (bool) –
data_catalog_config (DataCatalogConfig) –
table_format (TableFormatEnum) –
- s3_storage_config¶
configuration of S3 storage.
- Type
- data_catalog_config¶
configuration of the data catalog.
- Type
- table_format¶
format of the offline store table.
- Type
Method generated by attrs for class OfflineStoreConfig.
- class sagemaker.feature_store.inputs.ThroughputConfig(mode=None, provisioned_read_capacity_units=None, provisioned_write_capacity_units=None)¶
Bases:
Config
Throughput configuration of the feature group.
Throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for read and write capacity units. ON_DEMAND works best for less predictable traffic, while PROVISIONED works best for consistent and predictable traffic.
- Parameters
mode (ThroughputModeEnum) –
provisioned_read_capacity_units (int) –
provisioned_write_capacity_units (int) –
- mode¶
Throughput mode
- Type
- provisioned_read_capacity_units¶
For provisioned feature groups, this indicates the read throughput you are billed for and can consume without throttling.
- Type
- provisioned_write_capacity_units¶
For provisioned feature groups, this indicates the write throughput you are billed for and can consume without throttling.
- Type
Method generated by attrs for class ThroughputConfig.
- class sagemaker.feature_store.inputs.ThroughputConfigUpdate(mode=None, provisioned_read_capacity_units=None, provisioned_write_capacity_units=None)¶
Bases:
Config
Target throughput configuration for the feature group.
Target throughput configuration can be ON_DEMAND, or PROVISIONED with valid values for read and write capacity units. ON_DEMAND works best for less predictable traffic, while PROVISIONED works best for consistent and predictable traffic.
- Parameters
mode (ThroughputModeEnum) –
provisioned_read_capacity_units (int) –
provisioned_write_capacity_units (int) –
- mode¶
Target throughput mode
- Type
- provisioned_read_capacity_units¶
For provisioned feature groups, this indicates the read throughput you are billed for and can consume without throttling.
- Type
- provisioned_write_capacity_units¶
For provisioned feature groups, this indicates the write throughput you are billed for and can consume without throttling.
- Type
Method generated by attrs for class ThroughputConfigUpdate.
- class sagemaker.feature_store.inputs.OnlineStoreConfig(enable_online_store=True, online_store_security_config=None, ttl_duration=None, storage_type=None)¶
Bases:
Config
OnlineStoreConfig for FeatureStore.
- Parameters
enable_online_store (bool) –
online_store_security_config (OnlineStoreSecurityConfig) –
ttl_duration (TtlDuration) –
storage_type (OnlineStoreStorageTypeEnum) –
- online_store_security_config¶
configuration of security setting.
- ttl_duration¶
Default time to live duration for records.
- Type
Method generated by attrs for class OnlineStoreConfig.
- class sagemaker.feature_store.inputs.OnlineStoreSecurityConfig(kms_key_id=_Nothing.NOTHING)¶
Bases:
Config
OnlineStoreSecurityConfig for FeatureStore.
- Parameters
kms_key_id (str) –
Method generated by attrs for class OnlineStoreSecurityConfig.
- class sagemaker.feature_store.inputs.TtlDuration(unit, value)¶
Bases:
Config
TtlDuration for records in online FeatureStore.
Method generated by attrs for class TtlDuration.
- class sagemaker.feature_store.inputs.S3StorageConfig(s3_uri, kms_key_id=None)¶
Bases:
Config
S3StorageConfig for FeatureStore.
Method generated by attrs for class S3StorageConfig.
- class sagemaker.feature_store.inputs.FeatureValue(feature_name=None, value_as_string=None, value_as_string_list=None)¶
Bases:
Config
FeatureValue for FeatureStore.
- form used for collection type.
Method generated by attrs for class FeatureValue.
- class sagemaker.feature_store.inputs.TableFormatEnum(value)¶
Bases:
Enum
Enum of table formats.
The offline store table formats can be Glue or Iceberg.
- class sagemaker.feature_store.inputs.OnlineStoreStorageTypeEnum(value)¶
Bases:
Enum
Enum of storage types for online store.
The online store storage types can be Standard or InMemory.
- class sagemaker.feature_store.inputs.ThroughputModeEnum(value)¶
Bases:
Enum
Enum of throughput modes supported by feature group.
Throughput mode of feature group can be ON_DEMAND or PROVISIONED.
- class sagemaker.feature_store.inputs.ResourceEnum(value)¶
Bases:
Enum
Enum of resources.
The data type of resource can be
FeatureGroup
orFeatureMetadata
.
- class sagemaker.feature_store.inputs.SearchOperatorEnum(value)¶
Bases:
Enum
Enum of search operators.
The data type of search operator can be
And
orOr
.
- class sagemaker.feature_store.inputs.SortOrderEnum(value)¶
Bases:
Enum
Enum of sort orders.
The data type of sort order can be
Ascending
orDescending
.
- class sagemaker.feature_store.inputs.FilterOperatorEnum(value)¶
Bases:
Enum
Enum of filter operators.
The data type of filter operator can be
Equals
,NotEquals
,GreaterThan
,GreaterThanOrEqualTo
,LessThan
,LessThanOrEqualTo
,Contains
,Exists
,NotExists
, orIn
.
- class sagemaker.feature_store.inputs.Filter(name, value, operator=None)¶
Bases:
Config
Filter for FeatureStore search.
- Parameters
name (str) –
value (str) –
operator (FilterOperatorEnum) –
- value¶
A value used with
Name
andOperator
to determine which resources satisfy the filter’s condition.- Type
- operator¶
A Boolean binary operator that is used to evaluate the
- Type
- filter. If specify ``Value`` without ``Operator``, Amazon SageMaker uses ``Equals``
- (default
None).
Method generated by attrs for class Filter.
- class sagemaker.feature_store.inputs.Identifier(feature_group_name, record_identifiers_value_as_string, feature_names=None)¶
Bases:
Config
Identifier of batch get record API.
- Parameters
Method generated by attrs for class Identifier.
Dataset Builder¶
- class sagemaker.feature_store.dataset_builder.DatasetBuilder(sagemaker_session, base, output_path, record_identifier_feature_name=None, event_time_identifier_feature_name=None, included_feature_names=None, kms_key_id=None, event_time_identifier_feature_type=None)¶
Bases:
object
DatasetBuilder definition.
This class instantiates a DatasetBuilder object that comprises a base, a list of feature names, an output path and a KMS key ID.
- Parameters
sagemaker_session (Session) –
base (Union[FeatureGroup, DataFrame]) –
output_path (str) –
record_identifier_feature_name (str) –
event_time_identifier_feature_name (str) –
kms_key_id (str) –
event_time_identifier_feature_type (FeatureTypeEnum) –
- _base¶
A base which can be either a FeatureGroup or a pandas.DataFrame and will be used to merge other FeatureGroups and generate a Dataset.
- Type
Union[FeatureGroup, DataFrame]
- _record_identifier_feature_name¶
A string representing the record identifier feature if base is a DataFrame (default: None).
- Type
- _event_time_identifier_feature_name¶
A string representing the event time identifier feature if base is a DataFrame (default: None).
- Type
- _included_feature_names¶
A list of strings representing features to be included in the output. If not set, all features will be included in the output. (default: None).
- Type
List[str]
- _kms_key_id¶
A KMS key id. If set, will be used to encrypt the result file (default: None).
- Type
- _point_in_time_accurate_join¶
A boolean representing if point-in-time join is applied to the resulting dataframe when calling “to_dataframe”. When set to True, users can retrieve data using “row-level time travel” according to the event times provided to the DatasetBuilder. This requires that the entity dataframe with event times is submitted as the base in the constructor (default: False).
- Type
- _include_duplicated_records¶
A boolean representing whether the resulting dataframe when calling “to_dataframe” should include duplicated records (default: False).
- Type
- _include_deleted_records¶
A boolean representing whether the resulting dataframe when calling “to_dataframe” should include deleted records (default: False).
- Type
- _number_of_recent_records¶
An integer representing how many records will be returned for each record identifier (default: 1).
- Type
- _number_of_records¶
An integer representing the number of records that should be returned in the resulting dataframe when calling “to_dataframe” (default: None).
- Type
- _write_time_ending_timestamp¶
A datetime that represents the latest write time for a record to be included in the resulting dataset. Records with a newer write time will be omitted from the resulting dataset. (default: None).
- Type
- _event_time_starting_timestamp¶
A datetime that represents the earliest event time for a record to be included in the resulting dataset. Records with an older event time will be omitted from the resulting dataset. (default: None).
- Type
- _event_time_ending_timestamp¶
A datetime that represents the latest event time for a record to be included in the resulting dataset. Records with a newer event time will be omitted from the resulting dataset. (default: None).
- Type
- _feature_groups_to_be_merged¶
A list of FeatureGroupToBeMerged which will be joined to base (default: []).
- Type
List[FeatureGroupToBeMerged]
- _event_time_identifier_feature_type¶
A FeatureTypeEnum representing the type of event time identifier feature (default: None).
- Type
Method generated by attrs for class DatasetBuilder.
- with_feature_group(feature_group, target_feature_name_in_base=None, included_feature_names=None, feature_name_in_target=None, join_comparator=JoinComparatorEnum.EQUALS, join_type=JoinTypeEnum.INNER_JOIN)¶
Join FeatureGroup with base.
- Parameters
feature_group (FeatureGroup) – A target FeatureGroup which will be joined to base.
target_feature_name_in_base (str) – A string representing the feature name in base which will be used as a join key (default: None).
included_feature_names (List[str]) – A list of strings representing features to be included in the output (default: None).
feature_name_in_target (str) – A string representing the feature name in the target feature group that will be compared to the target feature in the base feature group. If None is provided, the record identifier feature will be used in the SQL join. (default: None).
join_comparator (JoinComparatorEnum) – A JoinComparatorEnum representing the comparator used when joining the target feature in the base feature group and the feature in the target feature group. (default: JoinComparatorEnum.EQUALS).
join_type (JoinTypeEnum) – A JoinTypeEnum representing the type of join between the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).
Returns – This DatasetBuilder object.
- point_in_time_accurate_join()¶
Enable point-in-time accurate join.
- Returns
This DatasetBuilder object.
- include_duplicated_records()¶
Include duplicated records in dataset.
- Returns
This DatasetBuilder object.
- include_deleted_records()¶
Include deleted records in dataset.
- Returns
This DatasetBuilder object.
- with_number_of_recent_records_by_record_identifier(number_of_recent_records)¶
Set number_of_recent_records field with provided input.
- Parameters
number_of_recent_records (int) – An int that how many recent records will be returned for each record identifier.
- Returns
This DatasetBuilder object.
- with_number_of_records_from_query_results(number_of_records)¶
Set number_of_records field with provided input.
- Parameters
number_of_records (int) – An int that how many records will be returned.
- Returns
This DatasetBuilder object.
- as_of(timestamp)¶
Set write_time_ending_timestamp field with provided input.
- Parameters
timestamp (datetime.datetime) – A datetime that all records’ write time in dataset will be before it.
- Returns
This DatasetBuilder object.
- with_event_time_range(starting_timestamp=None, ending_timestamp=None)¶
Set event_time_starting_timestamp and event_time_ending_timestamp with provided inputs.
- Parameters
starting_timestamp (datetime.datetime) – A datetime that all records’ event time in dataset will be after it (default: None).
ending_timestamp (datetime.datetime) – A datetime that all records’ event time in dataset will be before it (default: None).
- Returns
This DatasetBuilder object.
- to_csv_file()¶
Get query string and result in .csv format file
Feature Store¶
- class sagemaker.feature_store.feature_store.FeatureStore(sagemaker_session=<class 'sagemaker.session.Session'>)¶
Bases:
object
FeatureStore definition.
This class instantiates a FeatureStore object that comprises a SageMaker session instance.
- Parameters
sagemaker_session (Session) –
Method generated by attrs for class FeatureStore.
- create_dataset(base, output_path, record_identifier_feature_name=None, event_time_identifier_feature_name=None, included_feature_names=None, kms_key_id=None)¶
Create a Dataset Builder for generating a Dataset.
- Parameters
base (Union[FeatureGroup, DataFrame]) – A base which can be either a FeatureGroup or a pandas.DataFrame and will be used to merge other FeatureGroups and generate a Dataset.
output_path (str) – An S3 URI which stores the output .csv file.
record_identifier_feature_name (str) – A string representing the record identifier feature if base is a DataFrame (default: None).
event_time_identifier_feature_name (str) – A string representing the event time identifier feature if base is a DataFrame (default: None).
included_feature_names (List[str]) – A list of features to be included in the output (default: None).
kms_key_id (str) – An KMS key id. If set, will be used to encrypt the result file (default: None).
- Raises
ValueError – Base is a Pandas DataFrame but no record identifier feature name nor event time identifier feature name is provided.
- Return type
- list_feature_groups(name_contains=None, feature_group_status_equals=None, offline_store_status_equals=None, creation_time_after=None, creation_time_before=None, sort_order=None, sort_by=None, max_results=None, next_token=None)¶
List all FeatureGroups satisfying given filters.
- Parameters
name_contains (str) – A string that partially matches one or more FeatureGroups’ names. Filters FeatureGroups by name.
feature_group_status_equals (str) – A FeatureGroup status. Filters FeatureGroups by FeatureGroup status.
offline_store_status_equals (str) – An OfflineStore status. Filters FeatureGroups by OfflineStore status.
creation_time_after (datetime.datetime) – Use this parameter to search for FeatureGroups created after a specific date and time.
creation_time_before (datetime.datetime) – Use this parameter to search for FeatureGroups created before a specific date and time.
sort_order (str) – The order in which FeatureGroups are listed.
sort_by (str) – The value on which the FeatureGroup list is sorted.
max_results (int) – The maximum number of results returned by ListFeatureGroups.
next_token (str) – A token to resume pagination of ListFeatureGroups results.
- Returns
Response dict from service.
- Return type
- batch_get_record(identifiers, expiration_time_response=None)¶
Get record in batch from FeatureStore
- Parameters
identifiers (Sequence[Identifier]) – A list of identifiers to uniquely identify records in FeatureStore.
expiration_time_response (str) – the field of expiration time response to toggle returning of expiresAt.
- Returns
Response dict from service.
- Return type
- search(resource, filters=None, operator=None, sort_by=None, sort_order=None, next_token=None, max_results=None)¶
Search for FeatureGroups or FeatureMetadata satisfying given filters.
- Parameters
resource (ResourceEnum) – The name of the Amazon SageMaker resource to search for. Valid values are
FeatureGroup
orFeatureMetadata
.filters (Sequence[Filter]) – A list of filter objects (Default: None).
operator (SearchOperatorEnum) – A Boolean operator used to evaluate the filters. Valid values are
And
orOr
. The default isAnd
(Default: None).sort_by (str) – The name of the resource property used to sort the
SearchResults
. The default isLastModifiedTime
.sort_order (SortOrderEnum) – How
SearchResults
are ordered. Valid values areAscending
orDescending
. The default isDescending
.next_token (str) – If more than
MaxResults
resources match the specified filters, the response includes aNextToken
. TheNextToken
can be passed to the nextSearchRequest
to continue retrieving results (Default: None).max_results (int) – The maximum number of results to return (Default: None).
- Returns
Response dict from service.
- Return type
@feature_processor Decorator¶
- @sagemaker.feature_store.feature_processor.feature_processor(inputs, output, target_stores=None, parameters=None, enable_ingestion=True, spark_config=None)¶
Decorator to facilitate feature engineering for Feature Groups.
If the decorated function is executed without arguments then the decorated function’s arguments are automatically loaded from the input data sources. Outputs are ingested to the output Feature Group. If arguments are provided to this function, then arguments are not automatically loaded (for testing).
Decorated functions must conform to the expected signature. Parameters: one parameter of type pyspark.sql.DataFrame for each DataSource in ‘inputs’; followed by the optional parameters with names and types in [params: Dict[str, Any], spark: SparkSession]. Outputs: a single return value of type pyspark.sql.DataFrame. The function can have any name.
Example:
@feature_processor( inputs=[FeatureGroupDataSource("input-fg"), CSVDataSource("s3://bucket/prefix)], output='arn:aws:sagemaker:us-west-2:123456789012:feature-group/output-fg' ) def transform( input_feature_group: DataFrame, input_csv: DataFrame, params: Dict[str, Any], spark: SparkSession ) -> DataFrame: return ...
More concisely:
@feature_processor( inputs=[FeatureGroupDataSource("input-fg"), CSVDataSource("s3://bucket/prefix)], output='arn:aws:sagemaker:us-west-2:123456789012:feature-group/output-fg' ) def transform(input_feature_group, input_csv): return ...
- Parameters
inputs (Sequence[Union[FeatureGroupDataSource, CSVDataSource, ParquetDataSource, BaseDataSource]]) – A list of data sources.
output (str) – A Feature Group ARN to write results of this function to.
target_stores (Optional[list[str]], optional) – A list containing at least one of ‘OnlineStore’ or ‘OfflineStore’. If unspecified, data will be ingested to the enabled stores of the output feature group. Defaults to None.
parameters (Optional[Dict[str, Union[str, Dict]]], optional) – Parameters to be provided to the decorated function, available as the ‘params’ argument. Useful for parameterized functions. The params argument also contains the set of system provided parameters under the key ‘system’. E.g. ‘scheduled_time’: a timestamp representing the time that the execution was scheduled to execute at, if triggered by a Scheduler, otherwise, the current time.
enable_ingestion (bool, optional) – A boolean indicating whether the decorated function’s return value is ingested to the ‘output’ Feature Group. This flag is useful during the development phase to ensure that data is not used until the function is ready. It also useful for users that want to manage their own data ingestion. Defaults to True.
spark_config (Dict[str, str]) – A dict contains the key-value paris for Spark configurations.
- Raises
IngestionError – If any rows are not ingested successfully then a sample of the records, with failure reasons, is logged.
- Returns
The decorated function.
- Return type
Callable
Feature Processor Data Source¶
- class sagemaker.feature_store.feature_processor.FeatureGroupDataSource(name, input_start_offset=None, input_end_offset=None)¶
Bases:
object
A Feature Group data source definition for a FeatureProcessor.
- input_start_offset¶
A duration specified as a string in the format ‘<no> <unit>’ where ‘no’ is a number and ‘unit’ is a unit of time in [‘hours’, ‘days’, ‘weeks’, ‘months’, ‘years’] (plural and singular forms). Inputs contain data with event times no earlier than input_start_offset in the past. Offsets are relative to the function execution time. If the function is executed by a Schedule, then the offset is relative to the scheduled start time. Defaults to None.
- Type
Optional[str], optional
- input_end_offset¶
The ‘end’ (as opposed to start) counterpart for the ‘input_start_offset’. Inputs will contain records with event times no later than ‘input_end_offset’ in the past. Defaults to None.
- Type
Optional[str], optional
Method generated by attrs for class FeatureGroupDataSource.
- class sagemaker.feature_store.feature_processor.CSVDataSource(s3_uri, csv_header=True, csv_infer_schema=False)¶
Bases:
object
An CSV data source definition for a FeatureProcessor.
- csv_header¶
Whether to read the first line of the CSV file as column names. This option is only valid when file_format is set to csv. By default the value of this option is true, and all column types are assumed to be a string.
- Type
- infer_schema¶
Whether to infer the schema of the CSV data source. This option is only valid when file_format is set to csv. If set to true, two passes of the data is required to load and infer the schema.
- Type
Method generated by attrs for class CSVDataSource.
- class sagemaker.feature_store.feature_processor.ParquetDataSource(s3_uri)¶
Bases:
object
An parquet data source definition for a FeatureProcessor.
- Parameters
s3_uri (str) –
Method generated by attrs for class ParquetDataSource.
- class sagemaker.feature_store.feature_processor.PySparkDataSource¶
Bases:
BaseDataSource
[DataFrame
],ABC
Abstract base class for feature processor data sources.
Provides a skeleton for customization requiring the overriding of the method to read data from data source and return the Spark DataFrame.
Method generated by attrs for class PySparkDataSource.
- abstract read_data(spark, params=None)¶
Read data from data source and convert the data to Spark DataFrame.
Feature Processor Scheduler and Triggers¶
- feature_processor.to_pipeline(step, role=None, transformation_code=None, max_retries=None, tags=None, sagemaker_session=None)¶
Creates a sagemaker pipeline that takes in a callable as a training step.
To configure training step used in sagemaker pipeline, input argument step needs to be wrapped by remote decorator in module sagemaker.remote_function. If not wrapped by remote decorator, default configurations in sagemaker.remote_function.job._JobSettings will be used to create training step.
- Parameters
pipeline_name (str) – The name of the pipeline.
step (Callable) – A user provided function wrapped by feature_processor and optionally wrapped by remote_decorator.
role (Optional[str]) – The Amazon Resource Name (ARN) of the role used by the pipeline to access and create resources. If not specified, it will default to the credentials provided by the AWS configuration chain.
transformation_code (Optional[str]) – The data source for a reference to the transformation code for Lineage tracking. This code is not used for actual transformation.
max_retries (Optional[int]) – The number of times to retry sagemaker pipeline step. If not specified, sagemaker pipline step will not retry.
tags (List[Tuple[str, str]) – A list of tags attached to the pipeline and all corresponding lineage resources that support tags. If not specified, no custom tags will be attached.
sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Returns
SageMaker Pipeline ARN.
- Return type
- feature_processor.schedule(schedule_expression, role_arn=None, state='ENABLED', start_date=None, sagemaker_session=None)¶
Creates an EventBridge Schedule that schedules executions of a sagemaker pipeline.
The pipeline created will also have a pipeline parameter scheduled-time indicating when the pipeline is scheduled to run.
- Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be scheduled.
schedule_expression (str) – The expression that defines when the schedule runs. It supports at expression, rate expression and cron expression. See the CreateSchedule API for more details.
state (str) – Specifies whether the schedule is enabled or disabled. Valid values are ENABLED and DISABLED. See the State request parameter for more details. If not specified, it will default to ENABLED.
start_date (Optional[datetime]) – The date, in UTC, after which the schedule can begin invoking its target. Depending on the schedule’s recurrence expression, invocations might occur on, or after, the StartDate you specify.
role_arn (Optional[str]) – The Amazon Resource Name (ARN) of the IAM role that EventBridge Scheduler will assume for this target when the schedule is invoked.
sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Returns
The EventBridge Schedule ARN.
- Return type
- feature_processor.execute(execution_time=None, sagemaker_session=None)¶
Starts an execution of a SageMaker Pipeline created by feature_processor
- Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be executed.
execution_time (datetime) – The date, in UTC, will be used as a sagemaker pipeline parameter indicating the time which at which the execution is scheduled to execute. If not specified, it will default to the current timestamp.
sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Returns
The pipeline execution ARN.
- Return type
- feature_processor.delete_schedule(sagemaker_session=None)¶
Delete EventBridge Schedule corresponding to a SageMaker Pipeline if there is one.
- Parameters
pipeline_name (str) – The name of the SageMaker Pipeline that needs to be deleted
sagemaker_session (Optional[Session]) – (Optional[Session], optional): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Return type
None
- feature_processor.describe(sagemaker_session=None)¶
Describe feature processor and other related resources.
This API will include details related to the feature processor including SageMaker Pipeline and EventBridge Schedule.
- Parameters
- Returns
Return information for resources related to feature processor.
- Return type
- feature_processor.list_pipelines()¶
Lists all SageMaker Pipelines created by Feature Processor SDK.
- Parameters
sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Returns
- Return list of SageMaker Pipeline metadata created for
feature_processor.
- Return type
List[Dict[str, Any]]
- feature_processor.put_trigger(target_pipeline, target_pipeline_parameters=None, state='ENABLED', event_pattern=None, role_arn=None, sagemaker_session=None)¶
Creates an event based trigger that triggers executions of a sagemaker pipeline.
- Parameters
source_pipeline_events (List[FeatureProcessorPipelineEvents]) – The list of FeatureProcessorPipelineEvents that will trigger the target_pipeline.
target_pipeline (str) – The name of the SageMaker Pipeline that will be triggered.
target_pipeline_parameters (Optional[Dict[str, str]]) – The list of parameters to start execution of a pipeline.
state (Optional[str]) – Indicates whether the rule is enabled or disabled. If not specified, it will default to ENABLED.
event_pattern (Optional[str]) – The EventBridge EventPattern that triggers the target_pipeline. If specified, will override source_pipeline_events. For more information, see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html in the Amazon EventBridge User Guide.
role_arn (Optional[str]) – The Amazon Resource Name (ARN) of the IAM role that EventBridge Scheduler will assume for this target when the schedule is invoked.
sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Returns
The EventBridge Rule ARN.
- Return type
- feature_processor.enable_trigger(sagemaker_session=None)¶
Enable the EventBridge Rule that is associated with the pipeline.
- Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be executed.
sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Return type
None
- feature_processor.disable_trigger(sagemaker_session=None)¶
Disable the EventBridge Rule that is associated with the pipeline.
- Parameters
pipeline_name (str) – The SageMaker Pipeline name that will be executed.
sagemaker_session (Optional[Session]) – Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Return type
None
- feature_processor.delete_trigger(sagemaker_session=None)¶
Delete EventBridge Rule corresponding to a SageMaker Pipeline if there is one.
- Parameters
pipeline_name (str) – The name of the SageMaker Pipeline that needs to be deleted
sagemaker_session (Optional[Session]) – (Optional[Session], optional): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the function creates one using the default AWS configuration chain.
- Return type
None