Source code for feast.infra.provider

from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas as pd
import pyarrow
from tqdm import tqdm

from feast import FeatureService, errors
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.importer import import_class
from feast.infra.infra_object import Infra
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset

PROVIDERS_CLASS_FOR_TYPE = {
    "gcp": "feast.infra.gcp.GcpProvider",
    "aws": "feast.infra.aws.AwsProvider",
    "local": "feast.infra.local.LocalProvider",
    "azure": "feast.infra.contrib.azure_provider.AzureProvider",
}


[docs]class Provider(ABC): """ A provider defines an implementation of a feature store object. It orchestrates the various components of a feature store, such as the offline store, online store, and materialization engine. It is configured through a RepoConfig object. """ @abstractmethod def __init__(self, config: RepoConfig): pass
[docs] @abstractmethod def update_infra( self, project: str, tables_to_delete: Sequence[FeatureView], tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, ): """ Reconciles cloud resources with the specified set of Feast objects. Args: project: Feast project to which the objects belong. tables_to_delete: Feature views whose corresponding infrastructure should be deleted. tables_to_keep: Feature views whose corresponding infrastructure should not be deleted, and may need to be updated. entities_to_delete: Entities whose corresponding infrastructure should be deleted. entities_to_keep: Entities whose corresponding infrastructure should not be deleted, and may need to be updated. partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so infrastructure corresponding to other feature views should be not be touched. """ pass
[docs] def plan_infra( self, config: RepoConfig, desired_registry_proto: RegistryProto ) -> Infra: """ Returns the Infra required to support the desired registry. Args: config: The RepoConfig for the current FeatureStore. desired_registry_proto: The desired registry, in proto form. """ return Infra()
[docs] @abstractmethod def teardown_infra( self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ): """ Tears down all cloud resources for the specified set of Feast objects. Args: project: Feast project to which the objects belong. tables: Feature views whose corresponding infrastructure should be deleted. entities: Entities whose corresponding infrastructure should be deleted. """ pass
[docs] @abstractmethod def online_write_batch( self, config: RepoConfig, table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], ) -> None: """ Writes a batch of feature rows to the online store. If a tz-naive timestamp is passed to this method, it is assumed to be UTC. Args: config: The config for the current feature store. table: Feature view to which these feature rows correspond. data: A list of quadruplets containing feature data. Each quadruplet contains an entity key, a dict containing feature values, an event timestamp for the row, and the created timestamp for the row if it exists. progress: Function to be called once a batch of rows is written to the online store, used to show progress. """ pass
[docs] def ingest_df( self, feature_view: FeatureView, df: pd.DataFrame, ): """ Persists a dataframe to the online store. Args: feature_view: The feature view to which the dataframe corresponds. df: The dataframe to be persisted. """ pass
[docs] def ingest_df_to_offline_store( self, feature_view: FeatureView, df: pyarrow.Table, ): """ Persists a dataframe to the offline store. Args: feature_view: The feature view to which the dataframe corresponds. df: The dataframe to be persisted. """ pass
[docs] @abstractmethod def materialize_single_feature_view( self, config: RepoConfig, feature_view: FeatureView, start_date: datetime, end_date: datetime, registry: BaseRegistry, project: str, tqdm_builder: Callable[[int], tqdm], ) -> None: """ Writes latest feature values in the specified time range to the online store. Args: config: The config for the current feature store. feature_view: The feature view to materialize. start_date: The start of the time range. end_date: The end of the time range. registry: The registry for the current feature store. project: Feast project to which the objects belong. tqdm_builder: A function to monitor the progress of materialization. """ pass
[docs] @abstractmethod def get_historical_features( self, config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: Union[pd.DataFrame, str], registry: BaseRegistry, project: str, full_feature_names: bool, ) -> RetrievalJob: """ Retrieves the point-in-time correct historical feature values for the specified entity rows. Args: config: The config for the current feature store. feature_views: A list containing all feature views that are referenced in the entity rows. feature_refs: The features to be retrieved. entity_df: A collection of rows containing all entity columns on which features need to be joined, as well as the timestamp column used for point-in-time joins. Either a pandas dataframe can be provided or a SQL query. registry: The registry for the current feature store. project: Feast project to which the feature views belong. full_feature_names: If True, feature names will be prefixed with the corresponding feature view name, changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions" changes to "customer_fv__daily_transactions"). Returns: A RetrievalJob that can be executed to get the features. """ pass
[docs] @abstractmethod def online_read( self, config: RepoConfig, table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Reads features values for the given entity keys. Args: config: The config for the current feature store. table: The feature view whose feature values should be read. entity_keys: The list of entity keys for which feature values should be read. requested_features: The list of features that should be read. Returns: A list of the same length as entity_keys. Each item in the list is a tuple where the first item is the event timestamp for the row, and the second item is a dict mapping feature names to values, which are returned in proto format. """ pass
[docs] @abstractmethod def retrieve_saved_dataset( self, config: RepoConfig, dataset: SavedDataset ) -> RetrievalJob: """ Reads a saved dataset. Args: config: The config for the current feature store. dataset: A SavedDataset object containing all parameters necessary for retrieving the dataset. Returns: A RetrievalJob that can be executed to get the saved dataset. """ pass
[docs] @abstractmethod def write_feature_service_logs( self, feature_service: FeatureService, logs: Union[pyarrow.Table, Path], config: RepoConfig, registry: BaseRegistry, ): """ Writes features and entities logged by a feature server to the offline store. The schema of the logs table is inferred from the specified feature service. Only feature services with configured logging are accepted. Args: feature_service: The feature service to be logged. logs: The logs, either as an arrow table or as a path to a parquet directory. config: The config for the current feature store. registry: The registry for the current feature store. """ pass
[docs] @abstractmethod def retrieve_feature_service_logs( self, feature_service: FeatureService, start_date: datetime, end_date: datetime, config: RepoConfig, registry: BaseRegistry, ) -> RetrievalJob: """ Reads logged features for the specified time window. Args: feature_service: The feature service whose logs should be retrieved. start_date: The start of the window. end_date: The end of the window. config: The config for the current feature store. registry: The registry for the current feature store. Returns: A RetrievalJob that can be executed to get the feature service logs. """ pass
[docs] def get_feature_server_endpoint(self) -> Optional[str]: """Returns endpoint for the feature server, if it exists.""" return None
[docs]def get_provider(config: RepoConfig) -> Provider: if "." not in config.provider: if config.provider not in PROVIDERS_CLASS_FOR_TYPE: raise errors.FeastProviderNotImplementedError(config.provider) provider = PROVIDERS_CLASS_FOR_TYPE[config.provider] else: provider = config.provider # Split provider into module and class names by finding the right-most dot. # For example, provider 'foo.bar.MyProvider' will be parsed into 'foo.bar' and 'MyProvider' module_name, class_name = provider.rsplit(".", 1) cls = import_class(module_name, class_name, "Provider") return cls(config)