Source code for feast.entity

# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Dict, Optional

import yaml
from google.protobuf import json_format
from google.protobuf.json_format import MessageToDict, MessageToJson

from feast.loaders import yaml as feast_yaml
from feast.protos.feast.core.Entity_pb2 import Entity as EntityV2Proto
from feast.protos.feast.core.Entity_pb2 import EntityMeta as EntityMetaProto
from feast.protos.feast.core.Entity_pb2 import EntitySpecV2 as EntitySpecProto
from feast.usage import log_exceptions
from feast.value_type import ValueType


[docs]class Entity: """ Represents a collection of entities and associated metadata. Args: name: Name of the entity. value_type (optional): The type of the entity, such as string or float. description (optional): Additional information to describe the entity. join_key (optional): A property that uniquely identifies different entities within the collection. Used as a key for joining entities with their associated features. If not specified, defaults to the name of the entity. labels (optional): User-defined metadata in dictionary form. """ _name: str _value_type: ValueType _description: str _join_key: str _labels: Dict[str, str] _created_timestamp: Optional[datetime] _last_updated_timestamp: Optional[datetime] @log_exceptions def __init__( self, name: str, value_type: ValueType = ValueType.UNKNOWN, description: str = "", join_key: Optional[str] = None, labels: Optional[Dict[str, str]] = None, ): """Creates an Entity object.""" self._name = name self._description = description self._value_type = value_type if join_key: self._join_key = join_key else: self._join_key = name if labels is None: self._labels = dict() else: self._labels = labels self._created_timestamp: Optional[datetime] = None self._last_updated_timestamp: Optional[datetime] = None def __hash__(self) -> int: return hash((id(self), self.name)) def __eq__(self, other): if not isinstance(other, Entity): raise TypeError("Comparisons should only involve Entity class objects.") if ( self.labels != other.labels or self.name != other.name or self.description != other.description or self.value_type != other.value_type or self.join_key != other.join_key ): return False return True def __str__(self): return str(MessageToJson(self.to_proto())) @property def name(self) -> str: """ Gets the name of this entity. """ return self._name @name.setter def name(self, name): """ Sets the name of this entity. """ self._name = name @property def description(self) -> str: """ Gets the description of this entity. """ return self._description @description.setter def description(self, description): """ Sets the description of this entity. """ self._description = description @property def join_key(self) -> str: """ Gets the join key of this entity. """ return self._join_key @join_key.setter def join_key(self, join_key): """ Sets the join key of this entity. """ self._join_key = join_key @property def value_type(self) -> ValueType: """ Gets the type of this entity. """ return self._value_type @value_type.setter def value_type(self, value_type: ValueType): """ Sets the type of this entity. """ self._value_type = value_type @property def labels(self) -> Dict[str, str]: """ Gets the labels of this entity. """ return self._labels @labels.setter def labels(self, labels: Dict[str, str]): """ Sets the labels of this entity. """ self._labels = labels @property def created_timestamp(self) -> Optional[datetime]: """ Gets the created_timestamp of this entity. """ return self._created_timestamp @property def last_updated_timestamp(self) -> Optional[datetime]: """ Gets the last_updated_timestamp of this entity. """ return self._last_updated_timestamp
[docs] def is_valid(self): """ Validates the state of this entity locally. Raises: ValueError: The entity does not have a name or does not have a type. """ if not self.name: raise ValueError("No name found in entity.") if not self.value_type: raise ValueError("No type found in entity {self.value_type}")
[docs] @classmethod def from_yaml(cls, yml: str): """ Creates an entity from a YAML string body or a file path. Args: yml: Either a file path containing a yaml file or a YAML string. Returns: An EntityV2 object based on the YAML file. """ return cls.from_dict(feast_yaml.yaml_loader(yml, load_single=True))
[docs] @classmethod def from_dict(cls, entity_dict): """ Creates an entity from a dict. Args: entity_dict: A dict representation of an entity. Returns: An EntityV2 object based on the entity dict. """ entity_proto = json_format.ParseDict( entity_dict, EntityV2Proto(), ignore_unknown_fields=True ) return cls.from_proto(entity_proto)
[docs] @classmethod def from_proto(cls, entity_proto: EntityV2Proto): """ Creates an entity from a protobuf representation of an entity. Args: entity_proto: A protobuf representation of an entity. Returns: An EntityV2 object based on the entity protobuf. """ entity = cls( name=entity_proto.spec.name, description=entity_proto.spec.description, value_type=ValueType(entity_proto.spec.value_type), labels=entity_proto.spec.labels, join_key=entity_proto.spec.join_key, ) if entity_proto.meta.HasField("created_timestamp"): entity._created_timestamp = entity_proto.meta.created_timestamp.ToDatetime() if entity_proto.meta.HasField("last_updated_timestamp"): entity._last_updated_timestamp = ( entity_proto.meta.last_updated_timestamp.ToDatetime() ) return entity
[docs] def to_proto(self) -> EntityV2Proto: """ Converts an entity object to its protobuf representation. Returns: An EntityV2Proto protobuf. """ meta = EntityMetaProto() if self._created_timestamp: meta.created_timestamp.FromDatetime(self._created_timestamp) if self._last_updated_timestamp: meta.last_updated_timestamp.FromDatetime(self._last_updated_timestamp) spec = EntitySpecProto( name=self.name, description=self.description, value_type=self.value_type.value, labels=self.labels, join_key=self.join_key, ) return EntityV2Proto(spec=spec, meta=meta)
[docs] def to_dict(self) -> Dict: """ Converts entity to dict. Returns: Dictionary object representation of entity. """ entity_dict = MessageToDict(self.to_proto()) # Remove meta when empty for more readable exports if entity_dict["meta"] == {}: del entity_dict["meta"] return entity_dict
[docs] def to_yaml(self): """ Converts a entity to a YAML string. Returns: An entity string returned in YAML format. """ entity_dict = self.to_dict() return yaml.dump(entity_dict, allow_unicode=True, sort_keys=False)
[docs] def to_spec_proto(self) -> EntitySpecProto: """ Converts an EntityV2 object to its protobuf representation. Used when passing EntitySpecV2 object to Feast request. Returns: An EntitySpecV2 protobuf. """ spec = EntitySpecProto( name=self.name, description=self.description, value_type=self.value_type.value, labels=self.labels, join_key=self.join_key, ) return spec
def _update_from_entity(self, entity): """ Deep replaces one entity with another. Args: entity: Entity to use as a source of configuration. """ self.name = entity.name self.description = entity.description self.value_type = entity.value_type self.labels = entity.labels self.join_key = entity.join_key self._created_timestamp = entity.created_timestamp self._last_updated_timestamp = entity.last_updated_timestamp