Source code for feast.type_map

# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Set, Tuple, Type

import numpy as np
import pandas as pd
import pyarrow
from google.protobuf.json_format import MessageToDict
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType
from google.protobuf.timestamp_pb2 import Timestamp

from feast.protos.feast.types.Value_pb2 import (
    BoolList,
    BytesList,
    DoubleList,
    FloatList,
    Int32List,
    Int64List,
    StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ProtoValue
from feast.value_type import ValueType


[docs]def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: """ Converts field value Proto to Dict and returns each field's Feast Value Type value in their respective Python value. Args: field_value_proto: Field value Proto Returns: Python native type representation/version of the given field_value_proto """ field_value_dict = MessageToDict(field_value_proto) for k, v in field_value_dict.items(): if "List" in k: val = v.get("val", []) else: val = v if k == "int64Val": return int(val) if k == "bytesVal": return bytes(val) if (k == "int64ListVal") or (k == "int32ListVal"): return [int(item) for item in val] if (k == "floatListVal") or (k == "doubleListVal"): return [float(item) for item in val] if k == "stringListVal": return [str(item) for item in val] if k == "bytesListVal": return [bytes(item) for item in val] if k == "boolListVal": return [bool(item) for item in val] if k in ["int32Val", "floatVal", "doubleVal", "stringVal", "boolVal"]: return val else: raise TypeError( f"Casting to Python native type for type {k} failed. " f"Type {k} not found" )
[docs]def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: value_type_to_pandas_type: Dict[ValueType, str] = { ValueType.FLOAT: "float", ValueType.INT32: "int", ValueType.INT64: "int", ValueType.STRING: "str", ValueType.DOUBLE: "float", ValueType.BYTES: "bytes", ValueType.BOOL: "bool", ValueType.UNIX_TIMESTAMP: "datetime", } if value_type in value_type_to_pandas_type: return value_type_to_pandas_type[value_type] raise TypeError( f"Casting to pandas type for type {value_type} failed. " f"Type {value_type} not found" )
[docs]def python_type_to_feast_value_type( name: str, value: Any = None, recurse: bool = True, type_name: Optional[str] = None ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native and Pandas types are supported. This function will recursively look for nested types when arrays are detected. All types must be homogenous. Args: name: Name of the value or field value: Value that will be inspected recurse: Whether to recursively look for nested types in arrays Returns: Feast Value Type """ type_name = type_name or type(value).__name__ type_map = { "int": ValueType.INT64, "str": ValueType.STRING, "float": ValueType.DOUBLE, "bytes": ValueType.BYTES, "float64": ValueType.DOUBLE, "float32": ValueType.FLOAT, "int64": ValueType.INT64, "uint64": ValueType.INT64, "int32": ValueType.INT32, "uint32": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, "bool": ValueType.BOOL, "timedelta": ValueType.UNIX_TIMESTAMP, "Timestamp": ValueType.UNIX_TIMESTAMP, "datetime": ValueType.UNIX_TIMESTAMP, "datetime64[ns]": ValueType.UNIX_TIMESTAMP, "datetime64[ns, tz]": ValueType.UNIX_TIMESTAMP, "category": ValueType.STRING, } if type_name in type_map: return type_map[type_name] if type_name == "ndarray" or isinstance(value, list): if recurse: # Convert to list type list_items = pd.core.series.Series(value) # This is the final type which we infer from the list common_item_value_type = None for item in list_items: if isinstance(item, ProtoValue): current_item_value_type: ValueType = _proto_value_to_value_type( item ) else: # Get the type from the current item, only one level deep current_item_value_type = python_type_to_feast_value_type( name=name, value=item, recurse=False ) # Validate whether the type stays consistent if ( common_item_value_type and not common_item_value_type == current_item_value_type ): raise ValueError( f"List value type for field {name} is inconsistent. " f"{common_item_value_type} different from " f"{current_item_value_type}." ) common_item_value_type = current_item_value_type if common_item_value_type is None: return ValueType.UNKNOWN return ValueType[common_item_value_type.name + "_LIST"] else: assert value raise ValueError( f"Value type for field {name} is {value.dtype.__str__()} but " f"recursion is not allowed. Array types can only be one level " f"deep." ) assert value return type_map[value.dtype.__str__()]
[docs]def python_values_to_feast_value_type( name: str, values: Any, recurse: bool = True ) -> ValueType: inferred_dtype = ValueType.UNKNOWN for row in values: current_dtype = python_type_to_feast_value_type( name, value=row, recurse=recurse ) if inferred_dtype is ValueType.UNKNOWN: inferred_dtype = current_dtype else: if current_dtype != inferred_dtype and current_dtype not in ( ValueType.UNKNOWN, ValueType.NULL, ): raise TypeError( f"Input entity {name} has mixed types, {current_dtype} and {inferred_dtype}. That is not allowed. " ) if inferred_dtype in (ValueType.UNKNOWN, ValueType.NULL): raise ValueError( f"field {name} cannot have all null values for type inference." ) return inferred_dtype
def _type_err(item, dtype): raise TypeError(f'Value "{item}" is of type {type(item)} not of type {dtype}') PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[ ValueType, Tuple[GeneratedProtocolMessageType, str, List[Type]] ] = { ValueType.FLOAT_LIST: ( FloatList, "float_list_val", [np.float32, np.float64, float], ), ValueType.DOUBLE_LIST: ( DoubleList, "double_list_val", [np.float64, np.float32, float], ), ValueType.INT32_LIST: (Int32List, "int32_list_val", [np.int32, int]), ValueType.INT64_LIST: (Int64List, "int64_list_val", [np.int64, np.int32, int]), ValueType.UNIX_TIMESTAMP_LIST: ( Int64List, "int64_list_val", [np.int64, np.int32, int], ), ValueType.STRING_LIST: (StringList, "string_list_val", [np.str_, str]), ValueType.BOOL_LIST: (BoolList, "bool_list_val", [np.bool_, bool]), ValueType.BYTES_LIST: (BytesList, "bytes_list_val", [np.bytes_, bytes]), } PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: Dict[ ValueType, Tuple[str, Any, Optional[Set[Type]]] ] = { ValueType.INT32: ("int32_val", lambda x: int(x), None), ValueType.INT64: ( "int64_val", lambda x: int(x.timestamp()) if isinstance(x, pd._libs.tslibs.timestamps.Timestamp) else int(x), None, ), ValueType.FLOAT: ("float_val", lambda x: float(x), None), ValueType.DOUBLE: ("double_val", lambda x: x, {float, np.float64}), ValueType.STRING: ("string_val", lambda x: str(x), None), ValueType.BYTES: ("bytes_val", lambda x: x, {bytes}), ValueType.BOOL: ("bool_val", lambda x: x, {bool}), } def _python_value_to_proto_value(feast_value_type: ValueType, value: Any) -> ProtoValue: """ Converts a Python (native, pandas) value to a Feast Proto Value based on a provided value type Args: feast_value_type: The target value type value: Value that will be converted Returns: Feast Value Proto """ # Detect list type and handle separately if "list" in feast_value_type.name.lower(): # Feature can be list but None is still valid if value is None: return ProtoValue() if feast_value_type in PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: proto_type, field_name, valid_types = PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE[ feast_value_type ] f = { field_name: proto_type( val=[ item if type(item) in valid_types else _type_err(item, valid_types[0]) for item in value ] ) } return ProtoValue(**f) # Handle scalar types below else: if pd.isnull(value): return ProtoValue() if feast_value_type == ValueType.UNIX_TIMESTAMP: if isinstance(value, datetime): return ProtoValue(int64_val=int(value.timestamp())) elif isinstance(value, Timestamp): return ProtoValue(int64_val=int(value.ToSeconds())) return ProtoValue(int64_val=int(value)) if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: ( field_name, func, valid_scalar_types, ) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type] if valid_scalar_types: assert type(value) in valid_scalar_types kwargs = {field_name: func(value)} return ProtoValue(**kwargs) raise Exception(f"Unsupported data type: ${str(type(value))}")
[docs]def python_value_to_proto_value( value: Any, feature_type: ValueType = ValueType.UNKNOWN ) -> ProtoValue: value_type = feature_type if value is not None and feature_type == ValueType.UNKNOWN: if isinstance(value, (list, np.ndarray)): value_type = ( feature_type if len(value) == 0 else python_type_to_feast_value_type("", value) ) else: value_type = python_type_to_feast_value_type("", value) return _python_value_to_proto_value(value_type, value)
def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType: """ Returns Feast ValueType given Feast ValueType string. Args: proto_str: str Returns: A variant of ValueType. """ proto_str = proto_value.WhichOneof("val") type_map = { "int32_val": ValueType.INT32, "int64_val": ValueType.INT64, "double_val": ValueType.DOUBLE, "float_val": ValueType.FLOAT, "string_val": ValueType.STRING, "bytes_val": ValueType.BYTES, "bool_val": ValueType.BOOL, "int32_list_val": ValueType.INT32_LIST, "int64_list_val": ValueType.INT64_LIST, "double_list_val": ValueType.DOUBLE_LIST, "float_list_val": ValueType.FLOAT_LIST, "string_list_val": ValueType.STRING_LIST, "bytes_list_val": ValueType.BYTES_LIST, "bool_list_val": ValueType.BOOL_LIST, None: ValueType.NULL, } return type_map[proto_str]
[docs]def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType: if re.match(r"^timestamp", pa_type_as_str): return ValueType.INT64 type_map = { "int32": ValueType.INT32, "int64": ValueType.INT64, "double": ValueType.DOUBLE, "float": ValueType.FLOAT, "string": ValueType.STRING, "binary": ValueType.BYTES, "bool": ValueType.BOOL, "list<item: int32>": ValueType.INT32_LIST, "list<item: int64>": ValueType.INT64_LIST, "list<item: double>": ValueType.DOUBLE_LIST, "list<item: float>": ValueType.FLOAT_LIST, "list<item: string>": ValueType.STRING_LIST, "list<item: binary>": ValueType.BYTES_LIST, "list<item: bool>": ValueType.BOOL_LIST, "null": ValueType.NULL, } return type_map[pa_type_as_str]
[docs]def bq_to_feast_value_type(bq_type_as_str: str) -> ValueType: type_map: Dict[str, ValueType] = { "DATETIME": ValueType.UNIX_TIMESTAMP, "TIMESTAMP": ValueType.UNIX_TIMESTAMP, "INTEGER": ValueType.INT64, "INT64": ValueType.INT64, "STRING": ValueType.STRING, "FLOAT": ValueType.DOUBLE, "FLOAT64": ValueType.DOUBLE, "BYTES": ValueType.BYTES, "BOOL": ValueType.BOOL, "ARRAY<INT64>": ValueType.INT64_LIST, "ARRAY<FLOAT64>": ValueType.DOUBLE_LIST, "ARRAY<STRING>": ValueType.STRING_LIST, "ARRAY<BYTES>": ValueType.BYTES_LIST, "ARRAY<BOOL>": ValueType.BOOL_LIST, "NULL": ValueType.NULL, } return type_map[bq_type_as_str]
[docs]def redshift_to_feast_value_type(redshift_type_as_str: str) -> ValueType: # Type names from https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html type_map = { "int2": ValueType.INT32, "int4": ValueType.INT32, "int8": ValueType.INT64, "numeric": ValueType.DOUBLE, "float4": ValueType.FLOAT, "float8": ValueType.DOUBLE, "bool": ValueType.BOOL, "character": ValueType.STRING, "varchar": ValueType.STRING, "timestamp": ValueType.UNIX_TIMESTAMP, "timestamptz": ValueType.UNIX_TIMESTAMP, # skip date, geometry, hllsketch, time, timetz } return type_map[redshift_type_as_str.lower()]
[docs]def pa_to_redshift_value_type(pa_type: pyarrow.DataType) -> str: # PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html # Redshift type: https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html pa_type_as_str = str(pa_type).lower() if pa_type_as_str.startswith("timestamp"): if "tz=" in pa_type_as_str: return "timestamptz" else: return "timestamp" if pa_type_as_str.startswith("date"): return "date" if pa_type_as_str.startswith("decimal"): # PyArrow decimal types (e.g. "decimal(38,37)") luckily directly map to the Redshift type. return pa_type_as_str # We have to take into account how arrow types map to parquet types as well. # For example, null type maps to int32 in parquet, so we have to use int4 in Redshift. # Other mappings have also been adjusted accordingly. type_map = { "null": "int4", "bool": "bool", "int8": "int4", "int16": "int4", "int32": "int4", "int64": "int8", "uint8": "int4", "uint16": "int4", "uint32": "int8", "uint64": "int8", "float": "float4", "double": "float8", "binary": "varchar", "string": "varchar", } return type_map[pa_type_as_str]