Source code for feast.feature_view

# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union

from google.protobuf.duration_pb2 import Duration
from google.protobuf.json_format import MessageToJson

from feast import utils
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
    FeatureViewMeta as FeatureViewMetaProto,
)
from feast.protos.feast.core.FeatureView_pb2 import (
    FeatureViewSpec as FeatureViewSpecProto,
)
from feast.protos.feast.core.FeatureView_pb2 import (
    MaterializationInterval as MaterializationIntervalProto,
)
from feast.repo_config import RepoConfig
from feast.usage import log_exceptions
from feast.value_type import ValueType

warnings.simplefilter("once", DeprecationWarning)


[docs]class FeatureView: """ A FeatureView defines a logical grouping of serveable features. Args: name: Name of the group of features. entities: The entities to which this group of features is associated. ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 can result in extremely computationally intensive queries. input: The source of data where this group of features is stored. batch_source (optional): The batch source of data where this group of features is stored. stream_source (optional): The stream source of data where this group of features is stored. features (optional): The set of features defined as part of this FeatureView. tags (optional): A dictionary of key-value pairs used for organizing FeatureViews. """ name: str entities: List[str] features: List[Feature] tags: Optional[Dict[str, str]] ttl: timedelta online: bool input: DataSource batch_source: DataSource stream_source: Optional[DataSource] = None created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None materialization_intervals: List[Tuple[datetime, datetime]] @log_exceptions def __init__( self, name: str, entities: List[str], ttl: Union[Duration, timedelta], input: Optional[DataSource] = None, batch_source: Optional[DataSource] = None, stream_source: Optional[DataSource] = None, features: Optional[List[Feature]] = None, tags: Optional[Dict[str, str]] = None, online: bool = True, ): """ Creates a FeatureView object. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. """ if input is not None: warnings.warn( ( "The argument 'input' is being deprecated. Please use 'batch_source' " "instead. Feast 0.13 and onwards will not support the argument 'input'." ), DeprecationWarning, ) _input = input or batch_source assert _input is not None _features = features or [] cols = [entity for entity in entities] + [feat.name for feat in _features] for col in cols: if _input.field_mapping is not None and col in _input.field_mapping.keys(): raise ValueError( f"The field {col} is mapped to {_input.field_mapping[col]} for this data source. " f"Please either remove this field mapping or use {_input.field_mapping[col]} as the " f"Entity or Feature name." ) self.name = name self.entities = entities self.features = _features self.tags = tags if tags is not None else {} if isinstance(ttl, Duration): self.ttl = timedelta(seconds=int(ttl.seconds)) else: self.ttl = ttl self.online = online self.input = _input self.batch_source = _input self.stream_source = stream_source self.materialization_intervals = [] self.created_timestamp: Optional[datetime] = None self.last_updated_timestamp: Optional[datetime] = None def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) return f"<{self.__class__.__name__}({', '.join(items)})>" def __str__(self): return str(MessageToJson(self.to_proto())) def __hash__(self): return hash(self.name) def __getitem__(self, item) -> FeatureViewProjection: assert isinstance(item, list) referenced_features = [] for feature in self.features: if feature.name in item: referenced_features.append(feature) return FeatureViewProjection(self.name, referenced_features) def __eq__(self, other): if not isinstance(other, FeatureView): raise TypeError( "Comparisons should only involve FeatureView class objects." ) if ( self.tags != other.tags or self.name != other.name or self.ttl != other.ttl or self.online != other.online ): return False if sorted(self.entities) != sorted(other.entities): return False if sorted(self.features) != sorted(other.features): return False if self.batch_source != other.batch_source: return False if self.stream_source != other.stream_source: return False return True
[docs] def is_valid(self): """ Validates the state of this feature view locally. Raises: ValueError: The feature view does not have a name or does not have entities. """ if not self.name: raise ValueError("Feature view needs a name.") if not self.entities: raise ValueError("Feature view has no entities.")
[docs] def to_proto(self) -> FeatureViewProto: """ Converts a feature view object to its protobuf representation. Returns: A FeatureViewProto protobuf. """ meta = FeatureViewMetaProto(materialization_intervals=[]) if self.created_timestamp: meta.created_timestamp.FromDatetime(self.created_timestamp) if self.last_updated_timestamp: meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp) for interval in self.materialization_intervals: interval_proto = MaterializationIntervalProto() interval_proto.start_time.FromDatetime(interval[0]) interval_proto.end_time.FromDatetime(interval[1]) meta.materialization_intervals.append(interval_proto) ttl_duration = None if self.ttl is not None: ttl_duration = Duration() ttl_duration.FromTimedelta(self.ttl) batch_source_proto = self.batch_source.to_proto() batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}" stream_source_proto = None if self.stream_source: stream_source_proto = self.stream_source.to_proto() stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" spec = FeatureViewSpecProto( name=self.name, entities=self.entities, features=[feature.to_proto() for feature in self.features], tags=self.tags, ttl=(ttl_duration if ttl_duration is not None else None), online=self.online, batch_source=batch_source_proto, stream_source=stream_source_proto, ) return FeatureViewProto(spec=spec, meta=meta)
[docs] @classmethod def from_proto(cls, feature_view_proto: FeatureViewProto): """ Creates a feature view from a protobuf representation of a feature view. Args: feature_view_proto: A protobuf representation of a feature view. Returns: A FeatureViewProto object based on the feature view protobuf. """ batch_source = DataSource.from_proto(feature_view_proto.spec.batch_source) stream_source = ( DataSource.from_proto(feature_view_proto.spec.stream_source) if feature_view_proto.spec.HasField("stream_source") else None ) feature_view = cls( name=feature_view_proto.spec.name, entities=[entity for entity in feature_view_proto.spec.entities], features=[ Feature( name=feature.name, dtype=ValueType(feature.value_type), labels=dict(feature.labels), ) for feature in feature_view_proto.spec.features ], tags=dict(feature_view_proto.spec.tags), online=feature_view_proto.spec.online, ttl=( None if feature_view_proto.spec.ttl.seconds == 0 and feature_view_proto.spec.ttl.nanos == 0 else feature_view_proto.spec.ttl ), batch_source=batch_source, stream_source=stream_source, ) if feature_view_proto.meta.HasField("created_timestamp"): feature_view.created_timestamp = ( feature_view_proto.meta.created_timestamp.ToDatetime() ) if feature_view_proto.meta.HasField("last_updated_timestamp"): feature_view.last_updated_timestamp = ( feature_view_proto.meta.last_updated_timestamp.ToDatetime() ) for interval in feature_view_proto.meta.materialization_intervals: feature_view.materialization_intervals.append( ( utils.make_tzaware(interval.start_time.ToDatetime()), utils.make_tzaware(interval.end_time.ToDatetime()), ) ) return feature_view
@property def most_recent_end_time(self) -> Optional[datetime]: """ Retrieves the latest time up to which the feature view has been materialized. Returns: The latest time, or None if the feature view has not been materialized. """ if len(self.materialization_intervals) == 0: return None return max([interval[1] for interval in self.materialization_intervals])
[docs] def infer_features_from_batch_source(self, config: RepoConfig): """ Infers the set of features associated to this feature view from the input source. Args: config: Configuration object used to configure the feature store. Raises: RegistryInferenceFailure: The set of features could not be inferred. """ if not self.features: columns_to_exclude = { self.batch_source.event_timestamp_column, self.batch_source.created_timestamp_column, } | set(self.entities) for ( col_name, col_datatype, ) in self.batch_source.get_table_column_names_and_types(config): if col_name not in columns_to_exclude and not re.match( "^__|__$", col_name, # double underscores often signal an internal-use column ): feature_name = ( self.batch_source.field_mapping[col_name] if col_name in self.batch_source.field_mapping.keys() else col_name ) self.features.append( Feature( feature_name, self.batch_source.source_datatype_to_feast_value_type()( col_datatype ), ) ) if not self.features: raise RegistryInferenceFailure( "FeatureView", f"Could not infer Features for the FeatureView named {self.name}.", )