Source code for feast.feature_logging

import abc
from typing import TYPE_CHECKING, Dict, Optional, Type, cast

import pyarrow as pa
from pytz import UTC

from feast.data_source import DataSource
from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE
from feast.errors import (
    FeastObjectNotFoundException,
    FeatureViewNotFoundException,
    OnDemandFeatureViewNotFoundException,
)
from feast.feature_view import DUMMY_ENTITY_ID
from feast.protos.feast.core.FeatureService_pb2 import (
    LoggingConfig as LoggingConfigProto,
)

if TYPE_CHECKING:
    from feast.feature_service import FeatureService
    from feast.infra.registry.base_registry import BaseRegistry


REQUEST_ID_FIELD = "__request_id"
LOG_TIMESTAMP_FIELD = "__log_timestamp"
LOG_DATE_FIELD = "__log_date"


[docs]class LoggingSource: """ Logging source describes object that produces logs (eg, feature service produces logs of served features). It should be able to provide schema of produced logs table and additional metadata that describes logs data. """
[docs] @abc.abstractmethod def get_schema(self, registry: "BaseRegistry") -> pa.Schema: """Generate schema for logs destination.""" raise NotImplementedError
[docs] @abc.abstractmethod def get_log_timestamp_column(self) -> str: """Return timestamp column that must exist in generated schema.""" raise NotImplementedError
[docs]class FeatureServiceLoggingSource(LoggingSource): def __init__(self, feature_service: "FeatureService", project: str): self._feature_service = feature_service self._project = project
[docs] def get_schema(self, registry: "BaseRegistry") -> pa.Schema: fields: Dict[str, pa.DataType] = {} for projection in self._feature_service.feature_view_projections: # The order of fields in the generated schema should match # the order created on the other side (inside Go logger). # Otherwise, some offline stores might not accept parquet files (produced by Go). # Go code can be found here: # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51 try: feature_view = registry.get_feature_view(projection.name, self._project) except FeatureViewNotFoundException: try: on_demand_feature_view = registry.get_on_demand_feature_view( projection.name, self._project ) except OnDemandFeatureViewNotFoundException: raise FeastObjectNotFoundException( f"Can't recognize feature view with a name {projection.name}" ) for ( request_source ) in on_demand_feature_view.source_request_sources.values(): for field in request_source.schema: fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] else: for entity_column in feature_view.entity_columns: if entity_column.name == DUMMY_ENTITY_ID: continue join_key = projection.join_key_map.get( entity_column.name, entity_column.name ) fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype] for feature in projection.features: fields[ f"{projection.name_to_use()}__{feature.name}" ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] fields[ f"{projection.name_to_use()}__{feature.name}__timestamp" ] = PA_TIMESTAMP_TYPE fields[ f"{projection.name_to_use()}__{feature.name}__status" ] = pa.int32() # system columns fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) fields[LOG_DATE_FIELD] = pa.date32() fields[REQUEST_ID_FIELD] = pa.string() return pa.schema( [pa.field(name, data_type) for name, data_type in fields.items()] )
[docs] def get_log_timestamp_column(self) -> str: return LOG_TIMESTAMP_FIELD
class _DestinationRegistry(type): classes_by_proto_attr_name: Dict[str, Type["LoggingDestination"]] = {} def __new__(cls, name, bases, dct): kls = type.__new__(cls, name, bases, dct) if dct.get("_proto_kind"): cls.classes_by_proto_attr_name[dct["_proto_kind"]] = kls return kls
[docs]class LoggingDestination(metaclass=_DestinationRegistry): """ Logging destination contains details about where exactly logs should be written inside an offline store. It is implementation specific - each offline store must implement LoggingDestination subclass. Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message and "_proto_kind" property of each subclass. """ _proto_kind: str
[docs] @classmethod @abc.abstractmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": raise NotImplementedError
[docs] @abc.abstractmethod def to_proto(self) -> LoggingConfigProto: raise NotImplementedError
[docs] @abc.abstractmethod def to_data_source(self) -> DataSource: """ Convert this object into a data source to read logs from an offline store. """ raise NotImplementedError
[docs]class LoggingConfig: destination: LoggingDestination sample_rate: float def __init__(self, destination: LoggingDestination, sample_rate: float = 1.0): self.destination = destination self.sample_rate = sample_rate
[docs] @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]: proto_kind = cast(str, config_proto.WhichOneof("destination")) if proto_kind is None: return if proto_kind == "custom_destination": proto_kind = config_proto.custom_destination.kind destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind] return LoggingConfig( destination=destination_class.from_proto(config_proto), sample_rate=config_proto.sample_rate, )
[docs] def to_proto(self) -> LoggingConfigProto: proto = self.destination.to_proto() proto.sample_rate = self.sample_rate return proto