Source code for feast.diff.infra_diff

from dataclasses import dataclass
from typing import Generic, Iterable, List, Tuple, TypeVar

from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.infra.infra_object import (
    DATASTORE_INFRA_OBJECT_CLASS_TYPE,
    DYNAMODB_INFRA_OBJECT_CLASS_TYPE,
    SQLITE_INFRA_OBJECT_CLASS_TYPE,
    InfraObject,
)
from feast.protos.feast.core.DatastoreTable_pb2 import (
    DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.DynamoDBTable_pb2 import (
    DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

InfraObjectProto = TypeVar(
    "InfraObjectProto", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto
)


[docs]@dataclass class InfraObjectDiff(Generic[InfraObjectProto]): name: str infra_object_type: str current_infra_object: InfraObjectProto new_infra_object: InfraObjectProto infra_object_property_diffs: List[PropertyDiff] transition_type: TransitionType
[docs]@dataclass class InfraDiff: infra_object_diffs: List[InfraObjectDiff] def __init__(self): self.infra_object_diffs = []
[docs] def update(self): """Apply the infrastructure changes specified in this object.""" for infra_object_diff in self.infra_object_diffs: if infra_object_diff.transition_type in [ TransitionType.DELETE, TransitionType.UPDATE, ]: infra_object = InfraObject.from_proto( infra_object_diff.current_infra_object ) infra_object.teardown() elif infra_object_diff.transition_type in [ TransitionType.CREATE, TransitionType.UPDATE, ]: infra_object = InfraObject.from_proto( infra_object_diff.new_infra_object ) infra_object.update()
[docs] def to_string(self): from colorama import Fore, Style log_string = "" message_action_map = { TransitionType.CREATE: ("Created", Fore.GREEN), TransitionType.DELETE: ("Deleted", Fore.RED), TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX), TransitionType.UPDATE: ("Updated", Fore.YELLOW), } for infra_object_diff in self.infra_object_diffs: if infra_object_diff.transition_type == TransitionType.UNCHANGED: continue action, color = message_action_map[infra_object_diff.transition_type] log_string += f"{action} {infra_object_diff.infra_object_type} {Style.BRIGHT + color}{infra_object_diff.name}{Style.RESET_ALL}\n" if infra_object_diff.transition_type == TransitionType.UPDATE: for _p in infra_object_diff.infra_object_property_diffs: log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n" log_string = ( f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to infrastructure" if not log_string else log_string ) return log_string
[docs]def tag_infra_proto_objects_for_keep_delete_add( existing_objs: Iterable[InfraObjectProto], desired_objs: Iterable[InfraObjectProto] ) -> Tuple[ Iterable[InfraObjectProto], Iterable[InfraObjectProto], Iterable[InfraObjectProto] ]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} objs_to_add = [e for e in desired_objs if e.name not in existing_obj_names] objs_to_keep = [e for e in desired_objs if e.name in existing_obj_names] objs_to_delete = [e for e in existing_objs if e.name not in desired_obj_names] return objs_to_keep, objs_to_delete, objs_to_add
[docs]def diff_infra_protos( current_infra_proto: InfraProto, new_infra_proto: InfraProto ) -> InfraDiff: infra_diff = InfraDiff() infra_object_class_types_to_str = { DATASTORE_INFRA_OBJECT_CLASS_TYPE: "datastore table", DYNAMODB_INFRA_OBJECT_CLASS_TYPE: "dynamodb table", SQLITE_INFRA_OBJECT_CLASS_TYPE: "sqlite table", } for infra_object_class_type in infra_object_class_types_to_str: current_infra_objects = get_infra_object_protos_by_type( current_infra_proto, infra_object_class_type ) new_infra_objects = get_infra_object_protos_by_type( new_infra_proto, infra_object_class_type ) ( infra_objects_to_keep, infra_objects_to_delete, infra_objects_to_add, ) = tag_infra_proto_objects_for_keep_delete_add( current_infra_objects, new_infra_objects, ) for e in infra_objects_to_add: infra_diff.infra_object_diffs.append( InfraObjectDiff( e.name, infra_object_class_types_to_str[infra_object_class_type], None, e, [], TransitionType.CREATE, ) ) for e in infra_objects_to_delete: infra_diff.infra_object_diffs.append( InfraObjectDiff( e.name, infra_object_class_types_to_str[infra_object_class_type], e, None, [], TransitionType.DELETE, ) ) for e in infra_objects_to_keep: current_infra_object = [ _e for _e in current_infra_objects if _e.name == e.name ][0] infra_diff.infra_object_diffs.append( diff_between( current_infra_object, e, infra_object_class_types_to_str[infra_object_class_type], ) ) return infra_diff
[docs]def get_infra_object_protos_by_type( infra_proto: InfraProto, infra_object_class_type: str ) -> List[InfraObjectProto]: return [ InfraObject.from_infra_object_proto(infra_object).to_proto() for infra_object in infra_proto.infra_objects if infra_object.infra_object_class_type == infra_object_class_type ]
FIELDS_TO_IGNORE = {"project"}
[docs]def diff_between( current: InfraObjectProto, new: InfraObjectProto, infra_object_type: str ) -> InfraObjectDiff: assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name property_diffs = [] transition: TransitionType = TransitionType.UNCHANGED if current != new: for _field in current.DESCRIPTOR.fields: if _field.name in FIELDS_TO_IGNORE: continue if getattr(current, _field.name) != getattr(new, _field.name): transition = TransitionType.UPDATE property_diffs.append( PropertyDiff( _field.name, getattr(current, _field.name), getattr(new, _field.name), ) ) return InfraObjectDiff( new.name, infra_object_type, current, new, property_diffs, transition, )