Source code for feast.infra.offline_stores.offline_utils

import uuid
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, KeysView, List, Optional, Set, Tuple

import numpy as np
import pandas as pd
import pyarrow as pa
from jinja2 import BaseLoader, Environment
from pandas import Timestamp

from feast.data_source import DataSource
from feast.errors import (
    EntityTimestampInferenceException,
    FeastEntityDFMissingColumnsError,
)
from feast.feature_view import FeatureView
from feast.importer import import_class
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.type_map import feast_value_type_to_pa
from feast.utils import _get_requested_feature_views_to_features_dict, to_naive_utc

DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"


[docs]def infer_event_timestamp_from_entity_df(entity_schema: Dict[str, np.dtype]) -> str: if DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL in entity_schema.keys(): return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL datetime_columns = [ column for column, dtype in entity_schema.items() if pd.core.dtypes.common.is_datetime64_any_dtype(dtype) ] if len(datetime_columns) == 1: print( f"Using {datetime_columns[0]} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}." ) return datetime_columns[0] else: raise EntityTimestampInferenceException(DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL)
[docs]def assert_expected_columns_in_entity_df( entity_schema: Dict[str, np.dtype], join_keys: Set[str], entity_df_event_timestamp_col: str, ): entity_columns = set(entity_schema.keys()) expected_columns = join_keys | {entity_df_event_timestamp_col} missing_keys = expected_columns - entity_columns if len(missing_keys) != 0: raise FeastEntityDFMissingColumnsError(expected_columns, missing_keys)
# TODO: Remove project and registry from the interface and call sites.
[docs]def get_expected_join_keys( project: str, feature_views: List[FeatureView], registry: BaseRegistry ) -> Set[str]: join_keys = set() for feature_view in feature_views: for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( entity_column.name, entity_column.name ) join_keys.add(join_key) return join_keys
[docs]def get_entity_df_timestamp_bounds( entity_df: pd.DataFrame, event_timestamp_col: str ) -> Tuple[Timestamp, Timestamp]: event_timestamp_series = entity_df[event_timestamp_col] return event_timestamp_series.min(), event_timestamp_series.max()
[docs]@dataclass(frozen=True) class FeatureViewQueryContext: """Context object used to template a BigQuery and Redshift point-in-time SQL query""" name: str ttl: int entities: List[str] features: List[str] # feature reference format field_mapping: Dict[str, str] timestamp_field: str created_timestamp_column: Optional[str] table_subquery: str entity_selections: List[str] min_event_timestamp: Optional[str] max_event_timestamp: str
[docs]def get_feature_view_query_context( feature_refs: List[str], feature_views: List[FeatureView], registry: BaseRegistry, project: str, entity_df_timestamp_range: Tuple[datetime, datetime], ) -> List[FeatureViewQueryContext]: """ Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query """ ( feature_views_to_feature_map, on_demand_feature_views_to_features, ) = _get_requested_feature_views_to_features_dict( feature_refs, feature_views, registry.list_on_demand_feature_views(project) ) query_context = [] for feature_view, features in feature_views_to_feature_map.items(): join_keys: List[str] = [] entity_selections: List[str] = [] for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( entity_column.name, entity_column.name ) join_keys.append(join_key) entity_selections.append(f"{entity_column.name} AS {join_key}") if isinstance(feature_view.ttl, timedelta): ttl_seconds = int(feature_view.ttl.total_seconds()) else: ttl_seconds = 0 reverse_field_mapping = { v: k for k, v in feature_view.batch_source.field_mapping.items() } features = [reverse_field_mapping.get(feature, feature) for feature in features] timestamp_field = reverse_field_mapping.get( feature_view.batch_source.timestamp_field, feature_view.batch_source.timestamp_field, ) created_timestamp_column = reverse_field_mapping.get( feature_view.batch_source.created_timestamp_column, feature_view.batch_source.created_timestamp_column, ) max_event_timestamp = to_naive_utc(entity_df_timestamp_range[1]).isoformat() min_event_timestamp = None if feature_view.ttl: min_event_timestamp = to_naive_utc( entity_df_timestamp_range[0] - feature_view.ttl ).isoformat() context = FeatureViewQueryContext( name=feature_view.projection.name_to_use(), ttl=ttl_seconds, entities=join_keys, features=features, field_mapping=feature_view.batch_source.field_mapping, timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, # TODO: Make created column optional and not hardcoded table_subquery=feature_view.batch_source.get_table_query_string(), entity_selections=entity_selections, min_event_timestamp=min_event_timestamp, max_event_timestamp=max_event_timestamp, ) query_context.append(context) return query_context
[docs]def build_point_in_time_query( feature_view_query_contexts: List[FeatureViewQueryContext], left_table_query_string: str, entity_df_event_timestamp_col: str, entity_df_columns: KeysView[str], query_template: str, full_feature_names: bool = False, ) -> str: """Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift""" template = Environment(loader=BaseLoader()).from_string(source=query_template) final_output_feature_names = list(entity_df_columns) final_output_feature_names.extend( [ ( f"{fv.name}__{fv.field_mapping.get(feature, feature)}" if full_feature_names else fv.field_mapping.get(feature, feature) ) for fv in feature_view_query_contexts for feature in fv.features ] ) # Add additional fields to dict template_context = { "left_table_query_string": left_table_query_string, "entity_df_event_timestamp_col": entity_df_event_timestamp_col, "unique_entity_keys": set( [entity for fv in feature_view_query_contexts for entity in fv.entities] ), "featureviews": [asdict(context) for context in feature_view_query_contexts], "full_feature_names": full_feature_names, "final_output_feature_names": final_output_feature_names, } query = template.render(template_context) return query
[docs]def get_temp_entity_table_name() -> str: """Returns a random table name for uploading the entity dataframe""" return "feast_entity_df_" + uuid.uuid4().hex
[docs]def get_offline_store_from_config(offline_store_config: Any) -> OfflineStore: """Creates an offline store corresponding to the given offline store config.""" module_name = offline_store_config.__module__ qualified_name = type(offline_store_config).__name__ class_name = qualified_name.replace("Config", "") offline_store_class = import_class(module_name, class_name, "OfflineStore") return offline_store_class()
[docs]def get_pyarrow_schema_from_batch_source( config: RepoConfig, batch_source: DataSource ) -> Tuple[pa.Schema, List[str]]: """Returns the pyarrow schema and column names for the given batch source.""" column_names_and_types = batch_source.get_table_column_names_and_types(config) pa_schema = [] column_names = [] for column_name, column_type in column_names_and_types: pa_schema.append( ( column_name, feast_value_type_to_pa( batch_source.source_datatype_to_feast_value_type()(column_type) ), ) ) column_names.append(column_name) return pa.schema(pa_schema), column_names