feast.infra.offline_stores package

Subpackages

Submodules

feast.infra.offline_stores.bigquery module

class feast.infra.offline_stores.bigquery.BigQueryOfflineStore[source]

Bases: feast.infra.offline_stores.offline_store.OfflineStore

static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob[source]
static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]

Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically

(if it doesn’t exist).

Hence, this function can be called repeatedly with the same destination to flush logs in chunks.

Parameters
  • config – Repo configuration object

  • data – Arrow table or path to parquet directory that contains logs dataset.

  • source – Logging source that provides schema and some additional metadata.

  • logging_config – used to determine destination

  • registry – Feast registry

This is an optional method that could be supported only be some stores.

class feast.infra.offline_stores.bigquery.BigQueryOfflineStoreConfig(*, type: Literal['bigquery'] = 'bigquery', dataset: pydantic.types.StrictStr = 'feast', project_id: pydantic.types.StrictStr = None, location: pydantic.types.StrictStr = None)[source]

Bases: feast.repo_config.FeastConfigBaseModel

Offline store config for GCP BigQuery

dataset: pydantic.types.StrictStr

(optional) BigQuery Dataset name for temporary tables

location: Optional[pydantic.types.StrictStr]

(optional) GCP location name used for the BigQuery offline store. Examples of location names include US, EU, us-central1, us-west4. If a location is not specified, the location defaults to the US multi-regional location. For more information on BigQuery data locations see: https://cloud.google.com/bigquery/docs/locations

project_id: Optional[pydantic.types.StrictStr]

(optional) GCP project name used for the BigQuery offline store

type: Literal['bigquery']

Offline store type selector

class feast.infra.offline_stores.bigquery.BigQueryRetrievalJob(query: Union[str, Callable[[], AbstractContextManager[str]]], client: google.cloud.bigquery.client.Client, config: feast.repo_config.RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]

Bases: feast.infra.offline_stores.offline_store.RetrievalJob

property full_feature_names: bool
property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]

Return metadata information about retrieval. Should be available even before materializing the dataset itself.

property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]

Run the retrieval and persist the results in the same offline store used for read.

to_bigquery(job_config: Optional[google.cloud.bigquery.job.query.QueryJobConfig] = None, timeout: int = 1800, retry_cadence: int = 10) Optional[str][source]

Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table. Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).

Parameters
  • job_config – An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.

  • timeout – An optional number of seconds for setting the time limit of the QueryJob.

  • retry_cadence – An optional number of seconds for setting how long the job should checked for completion.

Returns

Returns the destination table name or returns None if job_config.dry_run is True.

to_sql() str[source]

Returns the SQL query that will be executed in BigQuery to build the historical feature table.

feast.infra.offline_stores.bigquery.arrow_schema_to_bq_schema(arrow_schema: pyarrow.lib.Schema) List[google.cloud.bigquery.schema.SchemaField][source]
feast.infra.offline_stores.bigquery.block_until_done(client: google.cloud.bigquery.client.Client, bq_job: Union[google.cloud.bigquery.job.query.QueryJob, google.cloud.bigquery.job.load.LoadJob], timeout: int = 1800, retry_cadence: float = 1)[source]

Waits for bq_job to finish running, up to a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).

Parameters
  • client – A bigquery.client.Client to monitor the bq_job.

  • bq_job – The bigquery.job.QueryJob that blocks until done runnning.

  • timeout – An optional number of seconds for setting the time limit of the job.

  • retry_cadence – An optional number of seconds for setting how long the job should checked for completion.

Raises
  • BigQueryJobStillRunning exception if the function has blocked longer than 30 minutes.

  • BigQueryJobCancelled exception to signify when that the job has been cancelled (i.e. from timeout or KeyboardInterrupt)

feast.infra.offline_stores.bigquery_source module

class feast.infra.offline_stores.bigquery_source.BigQueryLoggingDestination(*, table_ref)[source]

Bases: feast.feature_logging.LoggingDestination

classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination[source]
table: str
to_data_source() feast.data_source.DataSource[source]

Convert this object into a data source to read logs from an offline store.

to_proto() feast.core.FeatureService_pb2.LoggingConfig[source]
class feast.infra.offline_stores.bigquery_source.BigQueryOptions(table: Optional[str], query: Optional[str])[source]

Bases: object

Configuration options for a BigQuery data source.

classmethod from_proto(bigquery_options_proto: feast.core.DataSource_pb2.BigQueryOptions)[source]

Creates a BigQueryOptions from a protobuf representation of a BigQuery option

Parameters

bigquery_options_proto – A protobuf representation of a DataSource

Returns

Returns a BigQueryOptions object based on the bigquery_options protobuf

to_proto() feast.core.DataSource_pb2.BigQueryOptions[source]

Converts an BigQueryOptionsProto object to its protobuf representation.

Returns

BigQueryOptionsProto protobuf

class feast.infra.offline_stores.bigquery_source.BigQuerySource(*, event_timestamp_column: Optional[str] = '', table: Optional[str] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, query: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = None)[source]

Bases: feast.data_source.DataSource

created_timestamp_column: str
date_partition_column: str
description: str
field_mapping: Dict[str, str]
static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]

Converts data source config in protobuf spec to a DataSource class object.

Parameters

data_source – A protobuf representation of a DataSource.

Returns

A DataSource class object.

Raises

ValueError – The type of DataSource could not be identified.

get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]][source]

Returns the list of column names and raw column types.

Parameters

config – Configuration object used to configure a feature store.

get_table_query_string() str[source]

Returns a string that can directly be used to reference this table in SQL

name: str
owner: str
property query
static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType][source]

Returns the callable method that returns Feast type given the raw column type.

property table
tags: Dict[str, str]
timestamp_field: str
to_proto() feast.core.DataSource_pb2.DataSource[source]

Converts a DataSourceProto object to its protobuf representation.

validate(config: feast.repo_config.RepoConfig)[source]

Validates the underlying data source.

Parameters

config – Configuration object used to configure a feature store.

class feast.infra.offline_stores.bigquery_source.SavedDatasetBigQueryStorage(table: str)[source]

Bases: feast.saved_dataset.SavedDatasetStorage

bigquery_options: feast.infra.offline_stores.bigquery_source.BigQueryOptions
static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage[source]
to_data_source() feast.data_source.DataSource[source]
to_proto() feast.core.SavedDataset_pb2.SavedDatasetStorage[source]

feast.infra.offline_stores.file module

class feast.infra.offline_stores.file.FileOfflineStore[source]

Bases: feast.infra.offline_stores.offline_store.OfflineStore

static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob[source]
static offline_write_batch(config: feast.repo_config.RepoConfig, feature_view: feast.feature_view.FeatureView, data: pyarrow.lib.Table, progress: Optional[Callable[[int], Any]])[source]

Write features to a specified destination in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically

(if it doesn’t exist).

Hence, this function can be called repeatedly with the same destination config to write features.

Parameters
  • config – Repo configuration object

  • table – FeatureView to write the data to.

  • data – pyarrow table containing feature data and timestamp column for historical feature retrieval

  • progress – Optional function to be called once every mini-batch of rows is written to

  • progress. (the online store. Can be used to display) –

static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]

Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically

(if it doesn’t exist).

Hence, this function can be called repeatedly with the same destination to flush logs in chunks.

Parameters
  • config – Repo configuration object

  • data – Arrow table or path to parquet directory that contains logs dataset.

  • source – Logging source that provides schema and some additional metadata.

  • logging_config – used to determine destination

  • registry – Feast registry

This is an optional method that could be supported only be some stores.

class feast.infra.offline_stores.file.FileOfflineStoreConfig(*, type: Literal['file'] = 'file')[source]

Bases: feast.repo_config.FeastConfigBaseModel

Offline store config for local (file-based) store

type: Literal['file']

Offline store type selector

class feast.infra.offline_stores.file.FileRetrievalJob(evaluation_function: Callable, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]

Bases: feast.infra.offline_stores.offline_store.RetrievalJob

property full_feature_names: bool
property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]

Return metadata information about retrieval. Should be available even before materializing the dataset itself.

property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]

Run the retrieval and persist the results in the same offline store used for read.

feast.infra.offline_stores.file_source module

class feast.infra.offline_stores.file_source.FileLoggingDestination(*, path: str, s3_endpoint_override='', partition_by: Optional[List[str]] = None)[source]

Bases: feast.feature_logging.LoggingDestination

classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination[source]
partition_by: Optional[List[str]]
path: str
s3_endpoint_override: str
to_data_source() feast.data_source.DataSource[source]

Convert this object into a data source to read logs from an offline store.

to_proto() feast.core.FeatureService_pb2.LoggingConfig[source]
class feast.infra.offline_stores.file_source.FileOptions(file_format: Optional[feast.data_format.FileFormat], s3_endpoint_override: Optional[str], uri: Optional[str])[source]

Bases: object

Configuration options for a file data source.

classmethod from_proto(file_options_proto: feast.core.DataSource_pb2.FileOptions)[source]

Creates a FileOptions from a protobuf representation of a file option

Parameters

file_options_proto – a protobuf representation of a datasource

Returns

Returns a FileOptions object based on the file_options protobuf

to_proto() feast.core.DataSource_pb2.FileOptions[source]

Converts an FileOptionsProto object to its protobuf representation.

Returns

FileOptionsProto protobuf

class feast.infra.offline_stores.file_source.FileSource(*args, path: Optional[str] = None, event_timestamp_column: Optional[str] = '', file_format: Optional[feast.data_format.FileFormat] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = '', s3_endpoint_override: Optional[str] = None, name: Optional[str] = '', description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = '')[source]

Bases: feast.data_source.DataSource

static create_filesystem_and_path(path: str, s3_endpoint_override: str) Tuple[Optional[pyarrow._fs.FileSystem], str][source]
created_timestamp_column: str
date_partition_column: str
description: str
field_mapping: Dict[str, str]
static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]

Converts data source config in protobuf spec to a DataSource class object.

Parameters

data_source – A protobuf representation of a DataSource.

Returns

A DataSource class object.

Raises

ValueError – The type of DataSource could not be identified.

get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]][source]

Returns the list of column names and raw column types.

Parameters

config – Configuration object used to configure a feature store.

get_table_query_string() str[source]

Returns a string that can directly be used to reference this table in SQL.

name: str
owner: str
property path

Returns the path of this file data source.

static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType][source]

Returns the callable method that returns Feast type given the raw column type.

tags: Dict[str, str]
timestamp_field: str
to_proto() feast.core.DataSource_pb2.DataSource[source]

Converts a DataSourceProto object to its protobuf representation.

validate(config: feast.repo_config.RepoConfig)[source]

Validates the underlying data source.

Parameters

config – Configuration object used to configure a feature store.

class feast.infra.offline_stores.file_source.SavedDatasetFileStorage(path: str, file_format: feast.data_format.FileFormat = <feast.data_format.ParquetFormat object>, s3_endpoint_override: Optional[str] = None)[source]

Bases: feast.saved_dataset.SavedDatasetStorage

file_options: feast.infra.offline_stores.file_source.FileOptions
static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage[source]
to_data_source() feast.data_source.DataSource[source]
to_proto() feast.core.SavedDataset_pb2.SavedDatasetStorage[source]

feast.infra.offline_stores.offline_store module

class feast.infra.offline_stores.offline_store.OfflineStore[source]

Bases: abc.ABC

OfflineStore is an object used for all interaction between Feast and the service used for offline storage of features.

abstract static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob[source]
static offline_write_batch(config: feast.repo_config.RepoConfig, feature_view: feast.feature_view.FeatureView, data: pyarrow.lib.Table, progress: Optional[Callable[[int], Any]])[source]

Write features to a specified destination in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically

(if it doesn’t exist).

Hence, this function can be called repeatedly with the same destination config to write features.

Parameters
  • config – Repo configuration object

  • table – FeatureView to write the data to.

  • data – pyarrow table containing feature data and timestamp column for historical feature retrieval

  • progress – Optional function to be called once every mini-batch of rows is written to

  • progress. (the online store. Can be used to display) –

abstract static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

abstract static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]

Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically

(if it doesn’t exist).

Hence, this function can be called repeatedly with the same destination to flush logs in chunks.

Parameters
  • config – Repo configuration object

  • data – Arrow table or path to parquet directory that contains logs dataset.

  • source – Logging source that provides schema and some additional metadata.

  • logging_config – used to determine destination

  • registry – Feast registry

This is an optional method that could be supported only be some stores.

class feast.infra.offline_stores.offline_store.RetrievalJob[source]

Bases: abc.ABC

RetrievalJob is used to manage the execution of a historical feature retrieval

abstract property full_feature_names: bool
abstract property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]

Return metadata information about retrieval. Should be available even before materializing the dataset itself.

abstract property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
abstract persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]

Run the retrieval and persist the results in the same offline store used for read.

to_arrow(validation_reference: Optional[ValidationReference] = None) pyarrow.lib.Table[source]

Return dataset as pyarrow Table synchronously :param validation_reference: If provided resulting dataset will be validated against this reference profile.

to_df(validation_reference: Optional[ValidationReference] = None) pandas.core.frame.DataFrame[source]

Return dataset as Pandas DataFrame synchronously including on demand transforms :param validation_reference: If provided resulting dataset will be validated against this reference profile.

class feast.infra.offline_stores.offline_store.RetrievalMetadata(features: List[str], keys: List[str], min_event_timestamp: Optional[datetime.datetime] = None, max_event_timestamp: Optional[datetime.datetime] = None)[source]

Bases: object

features: List[str]
keys: List[str]
max_event_timestamp: Optional[datetime.datetime]
min_event_timestamp: Optional[datetime.datetime]

feast.infra.offline_stores.offline_utils module

class feast.infra.offline_stores.offline_utils.FeatureViewQueryContext(name: str, ttl: int, entities: List[str], features: List[str], field_mapping: Dict[str, str], timestamp_field: str, created_timestamp_column: Optional[str], table_subquery: str, entity_selections: List[str], min_event_timestamp: Optional[str], max_event_timestamp: str)[source]

Bases: object

Context object used to template a BigQuery and Redshift point-in-time SQL query

created_timestamp_column: Optional[str]
entities: List[str]
entity_selections: List[str]
features: List[str]
field_mapping: Dict[str, str]
max_event_timestamp: str
min_event_timestamp: Optional[str]
name: str
table_subquery: str
timestamp_field: str
ttl: int
feast.infra.offline_stores.offline_utils.assert_expected_columns_in_entity_df(entity_schema: Dict[str, numpy.dtype], join_keys: Set[str], entity_df_event_timestamp_col: str)[source]
feast.infra.offline_stores.offline_utils.build_point_in_time_query(feature_view_query_contexts: List[feast.infra.offline_stores.offline_utils.FeatureViewQueryContext], left_table_query_string: str, entity_df_event_timestamp_col: str, entity_df_columns: KeysView[str], query_template: str, full_feature_names: bool = False) str[source]

Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift

feast.infra.offline_stores.offline_utils.get_entity_df_timestamp_bounds(entity_df: pandas.core.frame.DataFrame, event_timestamp_col: str) Tuple[pandas._libs.tslibs.timestamps.Timestamp, pandas._libs.tslibs.timestamps.Timestamp][source]
feast.infra.offline_stores.offline_utils.get_expected_join_keys(project: str, feature_views: List[feast.feature_view.FeatureView], registry: feast.registry.BaseRegistry) Set[str][source]
feast.infra.offline_stores.offline_utils.get_feature_view_query_context(feature_refs: List[str], feature_views: List[feast.feature_view.FeatureView], registry: feast.registry.BaseRegistry, project: str, entity_df_timestamp_range: Tuple[datetime.datetime, datetime.datetime]) List[feast.infra.offline_stores.offline_utils.FeatureViewQueryContext][source]

Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query

feast.infra.offline_stores.offline_utils.get_offline_store_from_config(offline_store_config: Any) feast.infra.offline_stores.offline_store.OfflineStore[source]

Creates an offline store corresponding to the given offline store config.

feast.infra.offline_stores.offline_utils.get_temp_entity_table_name() str[source]

Returns a random table name for uploading the entity dataframe

feast.infra.offline_stores.offline_utils.infer_event_timestamp_from_entity_df(entity_schema: Dict[str, numpy.dtype]) str[source]

feast.infra.offline_stores.redshift module

class feast.infra.offline_stores.redshift.RedshiftOfflineStore[source]

Bases: feast.infra.offline_stores.offline_store.OfflineStore

static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob[source]
static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]

Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically

(if it doesn’t exist).

Hence, this function can be called repeatedly with the same destination to flush logs in chunks.

Parameters
  • config – Repo configuration object

  • data – Arrow table or path to parquet directory that contains logs dataset.

  • source – Logging source that provides schema and some additional metadata.

  • logging_config – used to determine destination

  • registry – Feast registry

This is an optional method that could be supported only be some stores.

class feast.infra.offline_stores.redshift.RedshiftOfflineStoreConfig(*, type: Literal['redshift'] = 'redshift', cluster_id: pydantic.types.StrictStr, region: pydantic.types.StrictStr, user: pydantic.types.StrictStr, database: pydantic.types.StrictStr, s3_staging_location: pydantic.types.StrictStr, iam_role: pydantic.types.StrictStr)[source]

Bases: feast.repo_config.FeastConfigBaseModel

Offline store config for AWS Redshift

cluster_id: pydantic.types.StrictStr

Redshift cluster identifier

database: pydantic.types.StrictStr

Redshift database name

iam_role: pydantic.types.StrictStr

IAM Role for Redshift, granting it access to S3

region: pydantic.types.StrictStr

Redshift cluster’s AWS region

s3_staging_location: pydantic.types.StrictStr

S3 path for importing & exporting data to Redshift

type: Literal['redshift']

Offline store type selector

user: pydantic.types.StrictStr

Redshift user name

class feast.infra.offline_stores.redshift.RedshiftRetrievalJob(query: Union[str, Callable[[], AbstractContextManager[str]]], redshift_client, s3_resource, config: feast.repo_config.RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]

Bases: feast.infra.offline_stores.offline_store.RetrievalJob

property full_feature_names: bool
property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]

Return metadata information about retrieval. Should be available even before materializing the dataset itself.

property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]

Run the retrieval and persist the results in the same offline store used for read.

to_redshift(table_name: str) None[source]

Save dataset as a new Redshift table

to_s3() str[source]

Export dataset to S3 in Parquet format and return path

feast.infra.offline_stores.redshift_source module

class feast.infra.offline_stores.redshift_source.RedshiftLoggingDestination(*, table_name: str)[source]

Bases: feast.feature_logging.LoggingDestination

classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination[source]
table_name: str
to_data_source() feast.data_source.DataSource[source]

Convert this object into a data source to read logs from an offline store.

to_proto() feast.core.FeatureService_pb2.LoggingConfig[source]
class feast.infra.offline_stores.redshift_source.RedshiftOptions(table: Optional[str], schema: Optional[str], query: Optional[str], database: Optional[str])[source]

Bases: object

Configuration options for a Redshift data source.

classmethod from_proto(redshift_options_proto: feast.core.DataSource_pb2.RedshiftOptions)[source]

Creates a RedshiftOptions from a protobuf representation of a Redshift option.

Parameters

redshift_options_proto – A protobuf representation of a DataSource

Returns

A RedshiftOptions object based on the redshift_options protobuf.

to_proto() feast.core.DataSource_pb2.RedshiftOptions[source]

Converts an RedshiftOptionsProto object to its protobuf representation.

Returns

A RedshiftOptionsProto protobuf.

class feast.infra.offline_stores.redshift_source.RedshiftSource(*, event_timestamp_column: Optional[str] = '', table: Optional[str] = None, schema: Optional[str] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, query: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', database: Optional[str] = '', timestamp_field: Optional[str] = '')[source]

Bases: feast.data_source.DataSource

created_timestamp_column: str
property database

Returns the Redshift database of this Redshift source.

date_partition_column: str
description: str
field_mapping: Dict[str, str]
static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]

Creates a RedshiftSource from a protobuf representation of a RedshiftSource.

Parameters

data_source – A protobuf representation of a RedshiftSource

Returns

A RedshiftSource object based on the data_source protobuf.

get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]][source]

Returns a mapping of column names to types for this Redshift source.

Parameters

config – A RepoConfig describing the feature repo

get_table_query_string() str[source]

Returns a string that can directly be used to reference this table in SQL.

name: str
owner: str
property query

Returns the Redshift query of this Redshift source.

property schema

Returns the schema of this Redshift source.

static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType][source]

Returns the callable method that returns Feast type given the raw column type.

property table

Returns the table of this Redshift source.

tags: Dict[str, str]
timestamp_field: str
to_proto() feast.core.DataSource_pb2.DataSource[source]

Converts a RedshiftSource object to its protobuf representation.

Returns

A DataSourceProto object.

validate(config: feast.repo_config.RepoConfig)[source]

Validates the underlying data source.

Parameters

config – Configuration object used to configure a feature store.

class feast.infra.offline_stores.redshift_source.SavedDatasetRedshiftStorage(table_ref: str)[source]

Bases: feast.saved_dataset.SavedDatasetStorage

static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage[source]
redshift_options: feast.infra.offline_stores.redshift_source.RedshiftOptions
to_data_source() feast.data_source.DataSource[source]
to_proto() feast.core.SavedDataset_pb2.SavedDatasetStorage[source]

feast.infra.offline_stores.snowflake module

class feast.infra.offline_stores.snowflake.SnowflakeOfflineStore[source]

Bases: feast.infra.offline_stores.offline_store.OfflineStore

static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.BaseRegistry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob[source]
static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static write_logged_features(config: feast.repo_config.RepoConfig, data: Union[pyarrow.lib.Table, pathlib.Path], source: feast.feature_logging.LoggingSource, logging_config: feast.feature_logging.LoggingConfig, registry: feast.registry.BaseRegistry)[source]

Write logged features to a specified destination (taken from logging_config) in the offline store. Data can be appended to an existing table (destination) or a new one will be created automatically

(if it doesn’t exist).

Hence, this function can be called repeatedly with the same destination to flush logs in chunks.

Parameters
  • config – Repo configuration object

  • data – Arrow table or path to parquet directory that contains logs dataset.

  • source – Logging source that provides schema and some additional metadata.

  • logging_config – used to determine destination

  • registry – Feast registry

This is an optional method that could be supported only be some stores.

class feast.infra.offline_stores.snowflake.SnowflakeOfflineStoreConfig(*, type: Literal['snowflake.offline'] = 'snowflake.offline', config_path: str = '/home/docs/.snowsql/config', account: str = None, user: str = None, password: str = None, role: str = None, warehouse: str = None, database: str = None, schema: str = None)[source]

Bases: feast.repo_config.FeastConfigBaseModel

Offline store config for Snowflake

class Config[source]

Bases: object

allow_population_by_field_name = True
account: Optional[str]

Snowflake deployment identifier – drop .snowflakecomputing.com

config_path: Optional[str]

Snowflake config path – absolute path required (Cant use ~)

database: Optional[str]

Snowflake database name

password: Optional[str]

Snowflake password

role: Optional[str]

Snowflake role name

schema_: Optional[str]

Snowflake schema name

type: Literal['snowflake.offline']

Offline store type selector

user: Optional[str]

Snowflake user name

warehouse: Optional[str]

Snowflake warehouse name

class feast.infra.offline_stores.snowflake.SnowflakeRetrievalJob(query: Union[str, Callable[[], AbstractContextManager[str]]], snowflake_conn: snowflake.connector.connection.SnowflakeConnection, config: feast.repo_config.RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]

Bases: feast.infra.offline_stores.offline_store.RetrievalJob

property full_feature_names: bool
property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]

Return metadata information about retrieval. Should be available even before materializing the dataset itself.

property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]

Run the retrieval and persist the results in the same offline store used for read.

to_arrow_chunks(arrow_options: Optional[Dict] = None) Optional[List][source]
to_snowflake(table_name: str) None[source]

Save dataset as a new Snowflake table

to_sql() str[source]

Returns the SQL query that will be executed in Snowflake to build the historical feature table.

feast.infra.offline_stores.snowflake_source module

class feast.infra.offline_stores.snowflake_source.SavedDatasetSnowflakeStorage(table_ref: str)[source]

Bases: feast.saved_dataset.SavedDatasetStorage

static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage[source]
snowflake_options: feast.infra.offline_stores.snowflake_source.SnowflakeOptions
to_data_source() feast.data_source.DataSource[source]
to_proto() feast.core.SavedDataset_pb2.SavedDatasetStorage[source]
class feast.infra.offline_stores.snowflake_source.SnowflakeLoggingDestination(*, table_name: str)[source]

Bases: feast.feature_logging.LoggingDestination

classmethod from_proto(config_proto: feast.core.FeatureService_pb2.LoggingConfig) feast.feature_logging.LoggingDestination[source]
table_name: str
to_data_source() feast.data_source.DataSource[source]

Convert this object into a data source to read logs from an offline store.

to_proto() feast.core.FeatureService_pb2.LoggingConfig[source]
class feast.infra.offline_stores.snowflake_source.SnowflakeOptions(database: Optional[str], schema: Optional[str], table: Optional[str], query: Optional[str], warehouse: Optional[str])[source]

Bases: object

Configuration options for a Snowflake data source.

classmethod from_proto(snowflake_options_proto: feast.core.DataSource_pb2.SnowflakeOptions)[source]

Creates a SnowflakeOptions from a protobuf representation of a snowflake option.

Parameters

snowflake_options_proto – A protobuf representation of a DataSource

Returns

A SnowflakeOptions object based on the snowflake_options protobuf.

to_proto() feast.core.DataSource_pb2.SnowflakeOptions[source]

Converts an SnowflakeOptionsProto object to its protobuf representation.

Returns

A SnowflakeOptionsProto protobuf.

class feast.infra.offline_stores.snowflake_source.SnowflakeSource(*, database: Optional[str] = None, warehouse: Optional[str] = None, schema: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None, event_timestamp_column: Optional[str] = '', date_partition_column: Optional[str] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, name: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = '')[source]

Bases: feast.data_source.DataSource

created_timestamp_column: str
property database

Returns the database of this snowflake source.

date_partition_column: str
description: str
field_mapping: Dict[str, str]
static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]

Creates a SnowflakeSource from a protobuf representation of a SnowflakeSource.

Parameters

data_source – A protobuf representation of a SnowflakeSource

Returns

A SnowflakeSource object based on the data_source protobuf.

get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]][source]

Returns a mapping of column names to types for this snowflake source.

Parameters

config – A RepoConfig describing the feature repo

get_table_query_string() str[source]

Returns a string that can directly be used to reference this table in SQL.

name: str
owner: str
property query

Returns the snowflake options of this snowflake source.

property schema

Returns the schema of this snowflake source.

static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType][source]

Returns the callable method that returns Feast type given the raw column type.

property table

Returns the table of this snowflake source.

tags: Dict[str, str]
timestamp_field: str
to_proto() feast.core.DataSource_pb2.DataSource[source]

Converts a SnowflakeSource object to its protobuf representation.

Returns

A DataSourceProto object.

validate(config: feast.repo_config.RepoConfig)[source]

Validates the underlying data source.

Parameters

config – Configuration object used to configure a feature store.

property warehouse

Returns the warehouse of this snowflake source.

Module contents