feast.infra.offline_stores.contrib.spark_offline_store package

Subpackages

Submodules

feast.infra.offline_stores.contrib.spark_offline_store.spark module

class feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStore[source]

Bases: feast.infra.offline_stores.offline_store.OfflineStore

static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.Registry, project: str, full_feature_names: bool = False) feast.infra.offline_stores.offline_store.RetrievalJob[source]
static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime) feast.infra.offline_stores.offline_store.RetrievalJob[source]

This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.

Note that join_key_columns, feature_name_columns, timestamp_field, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • timestamp_field – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

class feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStoreConfig(*, type: pydantic.types.StrictStr = 'spark', spark_conf: Dict[str, str] = None)[source]

Bases: feast.repo_config.FeastConfigBaseModel

spark_conf: Optional[Dict[str, str]]

Configuration overlay for the spark session

type: pydantic.types.StrictStr

Offline store type selector

class feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkRetrievalJob(spark_session: pyspark.sql.session.SparkSession, query: str, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]

Bases: feast.infra.offline_stores.offline_store.RetrievalJob

property full_feature_names: bool
property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]

Return metadata information about retrieval. Should be available even before materializing the dataset itself.

property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]

Run the retrieval and persist the results in the same offline store used for read. Please note the persisting is done only within the scope of the spark session.

to_spark_df() pyspark.sql.dataframe.DataFrame[source]
feast.infra.offline_stores.contrib.spark_offline_store.spark.get_spark_session_or_start_new_with_repoconfig(store_config: feast.infra.offline_stores.contrib.spark_offline_store.spark.SparkOfflineStoreConfig) pyspark.sql.session.SparkSession[source]

feast.infra.offline_stores.contrib.spark_offline_store.spark_source module

class feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SavedDatasetSparkStorage(table: Optional[str] = None, query: Optional[str] = None, path: Optional[str] = None, file_format: Optional[str] = None)[source]

Bases: feast.saved_dataset.SavedDatasetStorage

static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage[source]
spark_options: feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkOptions
to_data_source() feast.data_source.DataSource[source]
to_proto() feast.core.SavedDataset_pb2.SavedDatasetStorage[source]
class feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkOptions(table: Optional[str], query: Optional[str], path: Optional[str], file_format: Optional[str])[source]

Bases: object

allowed_formats = ['csv', 'json', 'parquet', 'delta', 'avro']
property file_format
classmethod from_proto(spark_options_proto: feast.core.DataSource_pb2.SparkOptions)[source]

Creates a SparkOptions from a protobuf representation of a spark option :param spark_options_proto: a protobuf representation of a datasource

Returns

Returns a SparkOptions object based on the spark_options protobuf

property path
property query
property table
to_proto() feast.core.DataSource_pb2.SparkOptions[source]

Converts an SparkOptionsProto object to its protobuf representation. :returns: SparkOptionsProto protobuf

class feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource(*, name: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None, path: Optional[str] = None, file_format: Optional[str] = None, event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = None)[source]

Bases: feast.data_source.DataSource

created_timestamp_column: str
date_partition_column: str
description: str
field_mapping: Dict[str, str]
property file_format

Returns the file format of this feature data source.

static from_proto(data_source: feast.core.DataSource_pb2.DataSource) Any[source]

Converts data source config in protobuf spec to a DataSource class object.

Parameters

data_source – A protobuf representation of a DataSource.

Returns

A DataSource class object.

Raises

ValueError – The type of DataSource could not be identified.

get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]][source]

Returns the list of column names and raw column types.

Parameters

config – Configuration object used to configure a feature store.

get_table_query_string() str[source]

Returns a string that can directly be used to reference this table in SQL

name: str
owner: str
property path

Returns the path of the spark data source file.

property query

Returns the query of this feature data source

static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType][source]

Returns the callable method that returns Feast type given the raw column type.

property table

Returns the table of this feature data source

tags: Dict[str, str]
timestamp_field: str
to_proto() feast.core.DataSource_pb2.DataSource[source]

Converts a DataSourceProto object to its protobuf representation.

validate(config: feast.repo_config.RepoConfig)[source]

Validates the underlying data source.

Parameters

config – Configuration object used to configure a feature store.

class feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSourceFormat(value)[source]

Bases: enum.Enum

An enumeration.

avro = 'avro'
csv = 'csv'
delta = 'delta'
json = 'json'
parquet = 'parquet'

Module contents