Source code for feast.dqm.profilers.ge_profiler

import json
from types import FunctionType
from typing import Any, Callable, Dict, List

import dill
import great_expectations as ge
import numpy as np
import pandas as pd
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import PandasDataset

from feast.dqm.profilers.profiler import (
    Profile,
    Profiler,
    ValidationError,
    ValidationReport,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
    GEValidationProfile as GEValidationProfileProto,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
    GEValidationProfiler as GEValidationProfilerProto,
)
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus


def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
    dataset_copy = dataset.copy(deep=True)

    for column in dataset.columns:
        if pd.api.types.is_datetime64_any_dtype(dataset[column]):
            # GE cannot parse Timestamp or other pandas datetime time
            dataset_copy[column] = dataset[column].dt.strftime("%Y-%m-%dT%H:%M:%S")

        if dataset[column].dtype == np.float32:
            # GE converts expectation arguments into native Python float
            # This could cause error on comparison => so better to convert to double prematurely
            dataset_copy[column] = dataset[column].astype(np.float64)

        status_column = f"{column}__status"
        if status_column in dataset.columns:
            dataset_copy[column] = dataset_copy[column].mask(
                dataset[status_column] == FieldStatus.NOT_FOUND, np.nan
            )

    return dataset_copy


def _add_feature_metadata(dataset: PandasDataset) -> PandasDataset:
    for column in dataset.columns:
        if "__" not in column:
            # not a feature column
            continue

        if "event_timestamp" in dataset.columns:
            dataset[f"{column}__timestamp"] = dataset["event_timestamp"]

        dataset[f"{column}__status"] = FieldStatus.PRESENT
        dataset[f"{column}__status"] = dataset[f"{column}__status"].mask(
            dataset[column].isna(), FieldStatus.NOT_FOUND
        )

    return dataset


[docs]class GEProfile(Profile): """ GEProfile is an implementation of abstract Profile for integration with Great Expectations. It executes validation by applying expectations from ExpectationSuite instance to a given dataset. """ expectation_suite: ExpectationSuite def __init__(self, expectation_suite: ExpectationSuite): self.expectation_suite = expectation_suite
[docs] def validate(self, df: pd.DataFrame) -> "GEValidationReport": """ Validate provided dataframe against GE expectation suite. 1. Pandas dataframe is converted into PandasDataset (GE type) 2. Some fixes applied to the data to avoid crashes inside GE (see _prepare_dataset) 3. Each expectation from ExpectationSuite instance tested against resulting dataset Return GEValidationReport, which parses great expectation's schema into list of generic ValidationErrors. """ dataset = PandasDataset(df) dataset = _prepare_dataset(dataset) results = ge.validate( dataset, expectation_suite=self.expectation_suite, result_format="COMPLETE" ) return GEValidationReport(results)
[docs] def to_proto(self): return GEValidationProfileProto( expectation_suite=json.dumps(self.expectation_suite.to_json_dict()).encode() )
[docs] @classmethod def from_proto(cls, proto: GEValidationProfileProto) -> "GEProfile": return GEProfile( expectation_suite=ExpectationSuite(**json.loads(proto.expectation_suite)) )
def __repr__(self): expectations = json.dumps( [e.to_json_dict() for e in self.expectation_suite.expectations], indent=2 ) return f"<GEProfile with expectations: {expectations}>"
[docs]class GEProfiler(Profiler): """ GEProfiler is an implementation of abstract Profiler for integration with Great Expectations. It wraps around user defined profiler that should accept dataset (in a form of pandas dataframe) and return ExpectationSuite. """ def __init__( self, user_defined_profiler: Callable[[pd.DataFrame], ExpectationSuite], with_feature_metadata: bool = False, ): self.user_defined_profiler = user_defined_profiler self.with_feature_metadata = with_feature_metadata
[docs] def analyze_dataset(self, df: pd.DataFrame) -> Profile: """ Generate GEProfile with ExpectationSuite (set of expectations) from a given pandas dataframe by applying user defined profiler. Some fixes are also applied to the dataset (see _prepare_dataset function) to make it compatible with GE. Return GEProfile """ dataset = PandasDataset(df) dataset = _prepare_dataset(dataset) if self.with_feature_metadata: dataset = _add_feature_metadata(dataset) return GEProfile(expectation_suite=self.user_defined_profiler(dataset))
[docs] def to_proto(self): # keep only the code and drop context for now # ToDo (pyalex): include some context, but not all (dill tries to pull too much) udp = FunctionType(self.user_defined_profiler.__code__, {}) return GEValidationProfilerProto( profiler=GEValidationProfilerProto.UserDefinedProfiler( body=dill.dumps(udp, recurse=False) ) )
[docs] @classmethod def from_proto(cls, proto: GEValidationProfilerProto) -> "GEProfiler": return GEProfiler(user_defined_profiler=dill.loads(proto.profiler.body))
[docs]class GEValidationReport(ValidationReport): def __init__(self, validation_result: Dict[Any, Any]): self._validation_result = validation_result @property def is_success(self) -> bool: return self._validation_result["success"] @property def errors(self) -> List["ValidationError"]: return [ ValidationError( check_name=res.expectation_config.expectation_type, column_name=res.expectation_config.kwargs["column"], check_config=res.expectation_config.kwargs, missing_count=res["result"].get("missing_count"), missing_percent=res["result"].get("missing_percent"), unexpected_count=res["result"].get("unexpected_count"), unexpected_percent=res["result"].get("unexpected_percent"), ) for res in self._validation_result["results"] if not res["success"] ] def __repr__(self): failed_expectations = [ res.to_json_dict() for res in self._validation_result["results"] if not res["success"] ] return json.dumps(failed_expectations, indent=2)
[docs]def ge_profiler(*args, with_feature_metadata=False): def wrapper(fun): return GEProfiler( user_defined_profiler=fun, with_feature_metadata=with_feature_metadata ) if args: return wrapper(args[0]) return wrapper