feast.infra.offline_stores package
Subpackages
- feast.infra.offline_stores.contrib package
- Subpackages
- feast.infra.offline_stores.contrib.postgres_offline_store package
- feast.infra.offline_stores.contrib.spark_offline_store package
- feast.infra.offline_stores.contrib.trino_offline_store package
- Subpackages
- Submodules
- feast.infra.offline_stores.contrib.trino_offline_store.trino module
- feast.infra.offline_stores.contrib.trino_offline_store.trino_queries module
- feast.infra.offline_stores.contrib.trino_offline_store.trino_source module
- feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map module
- Module contents
- Submodules
- feast.infra.offline_stores.contrib.contrib_repo_configuration module
- feast.infra.offline_stores.contrib.postgres_repo_configuration module
- Module contents
- Subpackages
Submodules
feast.infra.offline_stores.bigquery module
- class feast.infra.offline_stores.bigquery.BigQueryOfflineStore[source]
Bases:
feast.infra.offline_stores.offline_store.OfflineStore
- static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob [source]
- static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]
Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn’t exist).
Hence, this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters
config – Repo configuration object
data – Arrow table or path to parquet directory that contains logs dataset.
source – Logging source that provides schema and some additional metadata.
logging_config – used to determine destination
registry – Feast registry
This is an optional method that could be supported only be some stores.
- class feast.infra.offline_stores.bigquery.BigQueryOfflineStoreConfig(*, type: Literal['bigquery'] = 'bigquery', dataset: pydantic.types.StrictStr = 'feast', project_id: pydantic.types.StrictStr = None, location: pydantic.types.StrictStr = None)[source]
Bases:
feast.repo_config.FeastConfigBaseModel
Offline store config for GCP BigQuery
- dataset: pydantic.types.StrictStr
(optional) BigQuery Dataset name for temporary tables
- location: Optional[pydantic.types.StrictStr]
(optional) GCP location name used for the BigQuery offline store. Examples of location names include
US
,EU
,us-central1
,us-west4
. If a location is not specified, the location defaults to theUS
multi-regional location. For more information on BigQuery data locations see: https://cloud.google.com/bigquery/docs/locations
- project_id: Optional[pydantic.types.StrictStr]
(optional) GCP project name used for the BigQuery offline store
- type: Literal['bigquery']
Offline store type selector
- class feast.infra.offline_stores.bigquery.BigQueryRetrievalJob(query: Union[str, Callable[[], AbstractContextManager[str]]], client: google.cloud.bigquery.client.Client, config: feast.repo_config.RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]
Bases:
feast.infra.offline_stores.offline_store.RetrievalJob
- property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]
Return metadata information about retrieval. Should be available even before materializing the dataset itself.
- property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
- persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]
Run the retrieval and persist the results in the same offline store used for read.
- to_bigquery(job_config: Optional[google.cloud.bigquery.job.query.QueryJobConfig] = None, timeout: int = 1800, retry_cadence: int = 10) Optional[str] [source]
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table. Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
- Parameters
job_config – An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.
timeout – An optional number of seconds for setting the time limit of the QueryJob.
retry_cadence – An optional number of seconds for setting how long the job should checked for completion.
- Returns
Returns the destination table name or returns None if job_config.dry_run is True.
- feast.infra.offline_stores.bigquery.arrow_schema_to_bq_schema(arrow_schema: pyarrow.lib.Schema) List[google.cloud.bigquery.schema.SchemaField] [source]
- feast.infra.offline_stores.bigquery.block_until_done(client: google.cloud.bigquery.client.Client, bq_job: Union[google.cloud.bigquery.job.query.QueryJob, google.cloud.bigquery.job.load.LoadJob], timeout: int = 1800, retry_cadence: float = 1)[source]
Waits for bq_job to finish running, up to a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
- Parameters
client – A bigquery.client.Client to monitor the bq_job.
bq_job – The bigquery.job.QueryJob that blocks until done runnning.
timeout – An optional number of seconds for setting the time limit of the job.
retry_cadence – An optional number of seconds for setting how long the job should checked for completion.
- Raises
BigQueryJobStillRunning exception if the function has blocked longer than 30 minutes. –
BigQueryJobCancelled exception to signify when that the job has been cancelled (i.e. from timeout or KeyboardInterrupt) –
feast.infra.offline_stores.bigquery_source module
- class feast.infra.offline_stores.bigquery_source.BigQueryLoggingDestination(*, table_ref)[source]
Bases:
feast.feature_logging.LoggingDestination
- classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination [source]
- to_data_source() feast.data_source.DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.bigquery_source.BigQueryOptions(table: Optional[str], query: Optional[str])[source]
Bases:
object
Configuration options for a BigQuery data source.
- classmethod from_proto(bigquery_options_proto: feast.core.DataSource_pb2.BigQueryOptions)[source]
Creates a BigQueryOptions from a protobuf representation of a BigQuery option
- Parameters
bigquery_options_proto – A protobuf representation of a DataSource
- Returns
Returns a BigQueryOptions object based on the bigquery_options protobuf
- class feast.infra.offline_stores.bigquery_source.BigQuerySource(*, event_timestamp_column: Optional[str] = '', table: Optional[str] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, query: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = None)[source]
Bases:
feast.data_source.DataSource
- static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]
Converts data source config in protobuf spec to a DataSource class object.
- Parameters
data_source – A protobuf representation of a DataSource.
- Returns
A DataSource class object.
- Raises
ValueError – The type of DataSource could not be identified.
- get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]] [source]
Returns the list of column names and raw column types.
- Parameters
config – Configuration object used to configure a feature store.
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL
- property query
- static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- property table
- to_proto() feast.core.DataSource_pb2.DataSource [source]
Converts a DataSourceProto object to its protobuf representation.
- validate(config: feast.repo_config.RepoConfig)[source]
Validates the underlying data source.
- Parameters
config – Configuration object used to configure a feature store.
- class feast.infra.offline_stores.bigquery_source.SavedDatasetBigQueryStorage(table: str)[source]
Bases:
feast.saved_dataset.SavedDatasetStorage
- bigquery_options: feast.infra.offline_stores.bigquery_source.BigQueryOptions
- static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage [source]
- to_data_source() feast.data_source.DataSource [source]
feast.infra.offline_stores.file module
- class feast.infra.offline_stores.file.FileOfflineStore[source]
Bases:
feast.infra.offline_stores.offline_store.OfflineStore
- static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob [source]
- static offline_write_batch(config: feast.repo_config.RepoConfig, feature_view: feast.feature_view.FeatureView, data: pyarrow.lib.Table, progress: Optional[Callable[[int], Any]])[source]
Write features to a specified destination in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn’t exist).
Hence, this function can be called repeatedly with the same destination config to write features.
- Parameters
config – Repo configuration object
table – FeatureView to write the data to.
data – pyarrow table containing feature data and timestamp column for historical feature retrieval
progress – Optional function to be called once every mini-batch of rows is written to
progress. (the online store. Can be used to display) –
- static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]
Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn’t exist).
Hence, this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters
config – Repo configuration object
data – Arrow table or path to parquet directory that contains logs dataset.
source – Logging source that provides schema and some additional metadata.
logging_config – used to determine destination
registry – Feast registry
This is an optional method that could be supported only be some stores.
- class feast.infra.offline_stores.file.FileOfflineStoreConfig(*, type: Literal['file'] = 'file')[source]
Bases:
feast.repo_config.FeastConfigBaseModel
Offline store config for local (file-based) store
- type: Literal['file']
Offline store type selector
- class feast.infra.offline_stores.file.FileRetrievalJob(evaluation_function: Callable, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]
Bases:
feast.infra.offline_stores.offline_store.RetrievalJob
- property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]
Return metadata information about retrieval. Should be available even before materializing the dataset itself.
- property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
- persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]
Run the retrieval and persist the results in the same offline store used for read.
feast.infra.offline_stores.file_source module
- class feast.infra.offline_stores.file_source.FileLoggingDestination(*, path: str, s3_endpoint_override='', partition_by: Optional[List[str]] = None)[source]
Bases:
feast.feature_logging.LoggingDestination
- classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination [source]
- to_data_source() feast.data_source.DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.file_source.FileOptions(file_format: Optional[feast.data_format.FileFormat], s3_endpoint_override: Optional[str], uri: Optional[str])[source]
Bases:
object
Configuration options for a file data source.
- classmethod from_proto(file_options_proto: feast.core.DataSource_pb2.FileOptions)[source]
Creates a FileOptions from a protobuf representation of a file option
- Parameters
file_options_proto – a protobuf representation of a datasource
- Returns
Returns a FileOptions object based on the file_options protobuf
- class feast.infra.offline_stores.file_source.FileSource(*args, path: Optional[str] = None, event_timestamp_column: Optional[str] = '', file_format: Optional[feast.data_format.FileFormat] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = '', s3_endpoint_override: Optional[str] = None, name: Optional[str] = '', description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = '')[source]
Bases:
feast.data_source.DataSource
- static create_filesystem_and_path(path: str, s3_endpoint_override: str) Tuple[Optional[pyarrow._fs.FileSystem], str] [source]
- static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]
Converts data source config in protobuf spec to a DataSource class object.
- Parameters
data_source – A protobuf representation of a DataSource.
- Returns
A DataSource class object.
- Raises
ValueError – The type of DataSource could not be identified.
- get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]] [source]
Returns the list of column names and raw column types.
- Parameters
config – Configuration object used to configure a feature store.
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL.
- property path
Returns the path of this file data source.
- static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- to_proto() feast.core.DataSource_pb2.DataSource [source]
Converts a DataSourceProto object to its protobuf representation.
- validate(config: feast.repo_config.RepoConfig)[source]
Validates the underlying data source.
- Parameters
config – Configuration object used to configure a feature store.
- class feast.infra.offline_stores.file_source.SavedDatasetFileStorage(path: str, file_format: feast.data_format.FileFormat = <feast.data_format.ParquetFormat object>, s3_endpoint_override: Optional[str] = None)[source]
Bases:
feast.saved_dataset.SavedDatasetStorage
- file_options: feast.infra.offline_stores.file_source.FileOptions
- static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage [source]
- to_data_source() feast.data_source.DataSource [source]
feast.infra.offline_stores.offline_store module
- class feast.infra.offline_stores.offline_store.OfflineStore[source]
Bases:
abc.ABC
OfflineStore is an object used for all interaction between Feast and the service used for offline storage of features.
- abstract static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob [source]
- static offline_write_batch(config: feast.repo_config.RepoConfig, feature_view: feast.feature_view.FeatureView, data: pyarrow.lib.Table, progress: Optional[Callable[[int], Any]])[source]
Write features to a specified destination in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn’t exist).
Hence, this function can be called repeatedly with the same destination config to write features.
- Parameters
config – Repo configuration object
table – FeatureView to write the data to.
data – pyarrow table containing feature data and timestamp column for historical feature retrieval
progress – Optional function to be called once every mini-batch of rows is written to
progress. (the online store. Can be used to display) –
- abstract static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- abstract static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]
Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn’t exist).
Hence, this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters
config – Repo configuration object
data – Arrow table or path to parquet directory that contains logs dataset.
source – Logging source that provides schema and some additional metadata.
logging_config – used to determine destination
registry – Feast registry
This is an optional method that could be supported only be some stores.
- class feast.infra.offline_stores.offline_store.RetrievalJob[source]
Bases:
abc.ABC
RetrievalJob is used to manage the execution of a historical feature retrieval
- abstract property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]
Return metadata information about retrieval. Should be available even before materializing the dataset itself.
- abstract property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
- abstract persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]
Run the retrieval and persist the results in the same offline store used for read.
- class feast.infra.offline_stores.offline_store.RetrievalMetadata(features: List[str], keys: List[str], min_event_timestamp: Optional[datetime.datetime] = None, max_event_timestamp: Optional[datetime.datetime] = None)[source]
Bases:
object
- max_event_timestamp: Optional[datetime.datetime]
- min_event_timestamp: Optional[datetime.datetime]
feast.infra.offline_stores.offline_utils module
- class feast.infra.offline_stores.offline_utils.FeatureViewQueryContext(name: str, ttl: int, entities: List[str], features: List[str], field_mapping: Dict[str, str], timestamp_field: str, created_timestamp_column: Optional[str], table_subquery: str, entity_selections: List[str], min_event_timestamp: Optional[str], max_event_timestamp: str)[source]
Bases:
object
Context object used to template a BigQuery and Redshift point-in-time SQL query
- feast.infra.offline_stores.offline_utils.assert_expected_columns_in_entity_df(entity_schema: Dict[str, numpy.dtype], join_keys: Set[str], entity_df_event_timestamp_col: str)[source]
- feast.infra.offline_stores.offline_utils.build_point_in_time_query(feature_view_query_contexts: List[feast.infra.offline_stores.offline_utils.FeatureViewQueryContext], left_table_query_string: str, entity_df_event_timestamp_col: str, entity_df_columns: KeysView[str], query_template: str, full_feature_names: bool = False) str [source]
Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift
- feast.infra.offline_stores.offline_utils.get_entity_df_timestamp_bounds(entity_df: pandas.core.frame.DataFrame, event_timestamp_col: str) Tuple[pandas._libs.tslibs.timestamps.Timestamp, pandas._libs.tslibs.timestamps.Timestamp] [source]
- feast.infra.offline_stores.offline_utils.get_expected_join_keys(project: str, feature_views: List[feast.feature_view.FeatureView], registry: feast.registry.BaseRegistry) Set[str] [source]
- feast.infra.offline_stores.offline_utils.get_feature_view_query_context(feature_refs: List[str], feature_views: List[feast.feature_view.FeatureView], registry: feast.registry.BaseRegistry, project: str, entity_df_timestamp_range: Tuple[datetime.datetime, datetime.datetime]) List[feast.infra.offline_stores.offline_utils.FeatureViewQueryContext] [source]
Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query
- feast.infra.offline_stores.offline_utils.get_offline_store_from_config(offline_store_config: Any) feast.infra.offline_stores.offline_store.OfflineStore [source]
Creates an offline store corresponding to the given offline store config.
feast.infra.offline_stores.redshift module
- class feast.infra.offline_stores.redshift.RedshiftOfflineStore[source]
Bases:
feast.infra.offline_stores.offline_store.OfflineStore
- static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob [source]
- static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]
Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn’t exist).
Hence, this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters
config – Repo configuration object
data – Arrow table or path to parquet directory that contains logs dataset.
source – Logging source that provides schema and some additional metadata.
logging_config – used to determine destination
registry – Feast registry
This is an optional method that could be supported only be some stores.
- class feast.infra.offline_stores.redshift.RedshiftOfflineStoreConfig(*, type: Literal['redshift'] = 'redshift', cluster_id: pydantic.types.StrictStr, region: pydantic.types.StrictStr, user: pydantic.types.StrictStr, database: pydantic.types.StrictStr, s3_staging_location: pydantic.types.StrictStr, iam_role: pydantic.types.StrictStr)[source]
Bases:
feast.repo_config.FeastConfigBaseModel
Offline store config for AWS Redshift
- cluster_id: pydantic.types.StrictStr
Redshift cluster identifier
- database: pydantic.types.StrictStr
Redshift database name
- iam_role: pydantic.types.StrictStr
IAM Role for Redshift, granting it access to S3
- region: pydantic.types.StrictStr
Redshift cluster’s AWS region
- s3_staging_location: pydantic.types.StrictStr
S3 path for importing & exporting data to Redshift
- type: Literal['redshift']
Offline store type selector
- user: pydantic.types.StrictStr
Redshift user name
- class feast.infra.offline_stores.redshift.RedshiftRetrievalJob(query: Union[str, Callable[[], AbstractContextManager[str]]], redshift_client, s3_resource, config: feast.repo_config.RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]
Bases:
feast.infra.offline_stores.offline_store.RetrievalJob
- property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]
Return metadata information about retrieval. Should be available even before materializing the dataset itself.
- property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
- persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]
Run the retrieval and persist the results in the same offline store used for read.
feast.infra.offline_stores.redshift_source module
- class feast.infra.offline_stores.redshift_source.RedshiftLoggingDestination(*, table_name: str)[source]
Bases:
feast.feature_logging.LoggingDestination
- classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination [source]
- to_data_source() feast.data_source.DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.redshift_source.RedshiftOptions(table: Optional[str], schema: Optional[str], query: Optional[str], database: Optional[str])[source]
Bases:
object
Configuration options for a Redshift data source.
- classmethod from_proto(redshift_options_proto: feast.core.DataSource_pb2.RedshiftOptions)[source]
Creates a RedshiftOptions from a protobuf representation of a Redshift option.
- Parameters
redshift_options_proto – A protobuf representation of a DataSource
- Returns
A RedshiftOptions object based on the redshift_options protobuf.
- class feast.infra.offline_stores.redshift_source.RedshiftSource(*, event_timestamp_column: Optional[str] = '', table: Optional[str] = None, schema: Optional[str] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, query: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', database: Optional[str] = '', timestamp_field: Optional[str] = '')[source]
Bases:
feast.data_source.DataSource
- property database
Returns the Redshift database of this Redshift source.
- static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]
Creates a RedshiftSource from a protobuf representation of a RedshiftSource.
- Parameters
data_source – A protobuf representation of a RedshiftSource
- Returns
A RedshiftSource object based on the data_source protobuf.
- get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]] [source]
Returns a mapping of column names to types for this Redshift source.
- Parameters
config – A RepoConfig describing the feature repo
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL.
- property query
Returns the Redshift query of this Redshift source.
- property schema
Returns the schema of this Redshift source.
- static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- property table
Returns the table of this Redshift source.
- to_proto() feast.core.DataSource_pb2.DataSource [source]
Converts a RedshiftSource object to its protobuf representation.
- Returns
A DataSourceProto object.
- validate(config: feast.repo_config.RepoConfig)[source]
Validates the underlying data source.
- Parameters
config – Configuration object used to configure a feature store.
- class feast.infra.offline_stores.redshift_source.SavedDatasetRedshiftStorage(table_ref: str)[source]
Bases:
feast.saved_dataset.SavedDatasetStorage
- static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage [source]
- redshift_options: feast.infra.offline_stores.redshift_source.RedshiftOptions
- to_data_source() feast.data_source.DataSource [source]
feast.infra.offline_stores.snowflake module
- class feast.infra.offline_stores.snowflake.SnowflakeOfflineStore[source]
Bases:
feast.infra.offline_stores.offline_store.OfflineStore
- static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob [source]
- static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob [source]
This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.
Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.
- Parameters
config – Repo configuration object
data_source – Data source to pull all of the columns from
join_key_columns – Columns of the join keys
feature_name_columns – Columns of the feature names needed
timestamp_field – Timestamp column
start_date – Starting date of query
end_date – Ending date of query
- static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]
Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn’t exist).
Hence, this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters
config – Repo configuration object
data – Arrow table or path to parquet directory that contains logs dataset.
source – Logging source that provides schema and some additional metadata.
logging_config – used to determine destination
registry – Feast registry
This is an optional method that could be supported only be some stores.
- class feast.infra.offline_stores.snowflake.SnowflakeOfflineStoreConfig(*, type: Literal['snowflake.offline'] = 'snowflake.offline', config_path: str = '/home/docs/.snowsql/config', account: str = None, user: str = None, password: str = None, role: str = None, warehouse: str = None, database: str = None, schema: str = None)[source]
Bases:
feast.repo_config.FeastConfigBaseModel
Offline store config for Snowflake
- type: Literal['snowflake.offline']
Offline store type selector
- class feast.infra.offline_stores.snowflake.SnowflakeRetrievalJob(query: Union[str, Callable[[], AbstractContextManager[str]]], snowflake_conn: snowflake.connector.connection.SnowflakeConnection, config: feast.repo_config.RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]
Bases:
feast.infra.offline_stores.offline_store.RetrievalJob
- property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]
Return metadata information about retrieval. Should be available even before materializing the dataset itself.
- property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
- persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]
Run the retrieval and persist the results in the same offline store used for read.
feast.infra.offline_stores.snowflake_source module
- class feast.infra.offline_stores.snowflake_source.SavedDatasetSnowflakeStorage(table_ref: str)[source]
Bases:
feast.saved_dataset.SavedDatasetStorage
- static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage [source]
- snowflake_options: feast.infra.offline_stores.snowflake_source.SnowflakeOptions
- to_data_source() feast.data_source.DataSource [source]
- class feast.infra.offline_stores.snowflake_source.SnowflakeLoggingDestination(*, table_name: str)[source]
Bases:
feast.feature_logging.LoggingDestination
- classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination [source]
- to_data_source() feast.data_source.DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.snowflake_source.SnowflakeOptions(database: Optional[str], schema: Optional[str], table: Optional[str], query: Optional[str], warehouse: Optional[str])[source]
Bases:
object
Configuration options for a Snowflake data source.
- classmethod from_proto(snowflake_options_proto: feast.core.DataSource_pb2.SnowflakeOptions)[source]
Creates a SnowflakeOptions from a protobuf representation of a snowflake option.
- Parameters
snowflake_options_proto – A protobuf representation of a DataSource
- Returns
A SnowflakeOptions object based on the snowflake_options protobuf.
- class feast.infra.offline_stores.snowflake_source.SnowflakeSource(*, database: Optional[str] = None, warehouse: Optional[str] = None, schema: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None, event_timestamp_column: Optional[str] = '', date_partition_column: Optional[str] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, name: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = '')[source]
Bases:
feast.data_source.DataSource
- property database
Returns the database of this snowflake source.
- static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]
Creates a SnowflakeSource from a protobuf representation of a SnowflakeSource.
- Parameters
data_source – A protobuf representation of a SnowflakeSource
- Returns
A SnowflakeSource object based on the data_source protobuf.
- get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]] [source]
Returns a mapping of column names to types for this snowflake source.
- Parameters
config – A RepoConfig describing the feature repo
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL.
- property query
Returns the snowflake options of this snowflake source.
- property schema
Returns the schema of this snowflake source.
- static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- property table
Returns the table of this snowflake source.
- to_proto() feast.core.DataSource_pb2.DataSource [source]
Converts a SnowflakeSource object to its protobuf representation.
- Returns
A DataSourceProto object.
- validate(config: feast.repo_config.RepoConfig)[source]
Validates the underlying data source.
- Parameters
config – Configuration object used to configure a feature store.
- property warehouse
Returns the warehouse of this snowflake source.