Source code for feast.infra.infra_object

# Copyright 2021 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, List

from feast.errors import FeastInvalidInfraObjectType
from feast.importer import import_class
from feast.protos.feast.core.DatastoreTable_pb2 import (
    DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.DynamoDBTable_pb2 import (
    DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable"
DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.sqlite.SqliteTable"


[docs]class InfraObject(ABC): """ Represents a single infrastructure object (e.g. online store table) managed by Feast. """ @abstractmethod def __init__(self, name: str): self._name = name @property def name(self) -> str: return self._name
[docs] @abstractmethod def to_infra_object_proto(self) -> InfraObjectProto: """Converts an InfraObject to its protobuf representation, wrapped in an InfraObjectProto.""" pass
[docs] @abstractmethod def to_proto(self) -> Any: """Converts an InfraObject to its protobuf representation.""" pass
def __lt__(self, other) -> bool: return self.name < other.name
[docs] @staticmethod @abstractmethod def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any: """ Returns an InfraObject created from a protobuf representation. Args: infra_object_proto: A protobuf representation of an InfraObject. Raises: FeastInvalidInfraObjectType: The type of InfraObject could not be identified. """ if infra_object_proto.infra_object_class_type: cls = _get_infra_object_class_from_type( infra_object_proto.infra_object_class_type ) return cls.from_infra_object_proto(infra_object_proto) raise FeastInvalidInfraObjectType()
[docs] @staticmethod def from_proto(infra_object_proto: Any) -> Any: """ Converts a protobuf representation of a subclass to an object of that subclass. Args: infra_object_proto: A protobuf representation of an InfraObject. Raises: FeastInvalidInfraObjectType: The type of InfraObject could not be identified. """ if isinstance(infra_object_proto, DatastoreTableProto): infra_object_class_type = DATASTORE_INFRA_OBJECT_CLASS_TYPE elif isinstance(infra_object_proto, DynamoDBTableProto): infra_object_class_type = DYNAMODB_INFRA_OBJECT_CLASS_TYPE elif isinstance(infra_object_proto, SqliteTableProto): infra_object_class_type = SQLITE_INFRA_OBJECT_CLASS_TYPE else: raise FeastInvalidInfraObjectType() cls = _get_infra_object_class_from_type(infra_object_class_type) return cls.from_proto(infra_object_proto)
[docs] @abstractmethod def update(self): """ Deploys or updates the infrastructure object. """ pass
[docs] @abstractmethod def teardown(self): """ Tears down the infrastructure object. """ pass
[docs]@dataclass class Infra: """ Represents the set of infrastructure managed by Feast. Args: infra_objects: A list of InfraObjects, each representing one infrastructure object. """ infra_objects: List[InfraObject] = field(default_factory=list)
[docs] def to_proto(self) -> InfraProto: """ Converts Infra to its protobuf representation. Returns: An InfraProto protobuf. """ infra_proto = InfraProto() for infra_object in self.infra_objects: infra_object_proto = infra_object.to_infra_object_proto() infra_proto.infra_objects.append(infra_object_proto) return infra_proto
[docs] @classmethod def from_proto(cls, infra_proto: InfraProto): """ Returns an Infra object created from a protobuf representation. """ infra = cls() infra.infra_objects += [ InfraObject.from_infra_object_proto(infra_object_proto) for infra_object_proto in infra_proto.infra_objects ] return infra
def _get_infra_object_class_from_type(infra_object_class_type: str): module_name, infra_object_class_name = infra_object_class_type.rsplit(".", 1) return import_class(module_name, infra_object_class_name)