Source code for feast.type_map

# Copyright 2019 The Feast Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
from datetime import datetime, timezone
from typing import (

import numpy as np
import pandas as pd
import pyarrow
from google.protobuf.timestamp_pb2 import Timestamp

from feast.protos.feast.types.Value_pb2 import (
from feast.protos.feast.types.Value_pb2 import Value as ProtoValue
from feast.value_type import ListType, ValueType

# null timestamps get converted to -9223372036854775808
NULL_TIMESTAMP_INT_VALUE = np.datetime64("NaT").astype(int)

[docs]def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: """ Converts field value Proto to Dict and returns each field's Feast Value Type value in their respective Python value. Args: field_value_proto: Field value Proto Returns: Python native type representation/version of the given field_value_proto """ val_attr = field_value_proto.WhichOneof("val") if val_attr is None: return None val = getattr(field_value_proto, val_attr) # If it's a _LIST type extract the list. if hasattr(val, "val"): val = list(val.val) # Convert UNIX_TIMESTAMP values to `datetime` if val_attr == "unix_timestamp_list_val": val = [ datetime.fromtimestamp(v, tz=timezone.utc) if v != NULL_TIMESTAMP_INT_VALUE else None for v in val ] elif val_attr == "unix_timestamp_val": val = ( datetime.fromtimestamp(val, tz=timezone.utc) if val != NULL_TIMESTAMP_INT_VALUE else None ) return val
[docs]def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: value_type_to_pandas_type: Dict[ValueType, str] = { ValueType.FLOAT: "float", ValueType.INT32: "int", ValueType.INT64: "int", ValueType.STRING: "str", ValueType.DOUBLE: "float", ValueType.BYTES: "bytes", ValueType.BOOL: "bool", ValueType.UNIX_TIMESTAMP: "datetime64[ns]", } if"_LIST"): return "object" if value_type in value_type_to_pandas_type: return value_type_to_pandas_type[value_type] raise TypeError( f"Casting to pandas type for type {value_type} failed. " f"Type {value_type} not found" )
[docs]def python_type_to_feast_value_type( name: str, value: Any = None, recurse: bool = True, type_name: Optional[str] = None ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native and Pandas types are supported. This function will recursively look for nested types when arrays are detected. All types must be homogenous. Args: name: Name of the value or field value: Value that will be inspected recurse: Whether to recursively look for nested types in arrays Returns: Feast Value Type """ type_name = (type_name or type(value).__name__).lower() type_map = { "int": ValueType.INT64, "str": ValueType.STRING, "string": ValueType.STRING, # pandas.StringDtype "float": ValueType.DOUBLE, "bytes": ValueType.BYTES, "float64": ValueType.DOUBLE, "float32": ValueType.FLOAT, "int64": ValueType.INT64, "uint64": ValueType.INT64, "int32": ValueType.INT32, "uint32": ValueType.INT32, "int16": ValueType.INT32, "uint16": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, "bool": ValueType.BOOL, "timedelta": ValueType.UNIX_TIMESTAMP, "timestamp": ValueType.UNIX_TIMESTAMP, "datetime": ValueType.UNIX_TIMESTAMP, "datetime64[ns]": ValueType.UNIX_TIMESTAMP, "datetime64[ns, tz]": ValueType.UNIX_TIMESTAMP, "category": ValueType.STRING, } if type_name in type_map: return type_map[type_name] if isinstance(value, np.ndarray) and str(value.dtype) in type_map: item_type = type_map[str(value.dtype)] return ValueType[ + "_LIST"] if isinstance(value, (list, np.ndarray)): # if the value's type is "ndarray" and we couldn't infer from "value.dtype" # this is most probably array of "object", # so we need to iterate over objects and try to infer type of each item if not recurse: raise ValueError( f"Value type for field {name} is {type(value)} but " f"recursion is not allowed. Array types can only be one level " f"deep." ) # This is the final type which we infer from the list common_item_value_type = None for item in value: if isinstance(item, ProtoValue): current_item_value_type: ValueType = _proto_value_to_value_type(item) else: # Get the type from the current item, only one level deep current_item_value_type = python_type_to_feast_value_type( name=name, value=item, recurse=False ) # Validate whether the type stays consistent if ( common_item_value_type and not common_item_value_type == current_item_value_type ): raise ValueError( f"List value type for field {name} is inconsistent. " f"{common_item_value_type} different from " f"{current_item_value_type}." ) common_item_value_type = current_item_value_type if common_item_value_type is None: return ValueType.UNKNOWN return ValueType[ + "_LIST"] raise ValueError( f"Value with native type {type_name} " f"cannot be converted into Feast value type" )
[docs]def python_values_to_feast_value_type( name: str, values: Any, recurse: bool = True ) -> ValueType: inferred_dtype = ValueType.UNKNOWN for row in values: current_dtype = python_type_to_feast_value_type( name, value=row, recurse=recurse ) if inferred_dtype is ValueType.UNKNOWN: inferred_dtype = current_dtype else: if current_dtype != inferred_dtype and current_dtype not in ( ValueType.UNKNOWN, ValueType.NULL, ): raise TypeError( f"Input entity {name} has mixed types, {current_dtype} and {inferred_dtype}. That is not allowed. " ) if inferred_dtype in (ValueType.UNKNOWN, ValueType.NULL): raise ValueError( f"field {name} cannot have all null values for type inference." ) return inferred_dtype
def _type_err(item, dtype): raise TypeError(f'Value "{item}" is of type {type(item)} not of type {dtype}') PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[ ValueType, Tuple[ListType, str, List[Type]] ] = { ValueType.FLOAT_LIST: ( FloatList, "float_list_val", [np.float32, np.float64, float], ), ValueType.DOUBLE_LIST: ( DoubleList, "double_list_val", [np.float64, np.float32, float], ), ValueType.INT32_LIST: (Int32List, "int32_list_val", [np.int64, np.int32, int]), ValueType.INT64_LIST: (Int64List, "int64_list_val", [np.int64, np.int32, int]), ValueType.UNIX_TIMESTAMP_LIST: ( Int64List, "int64_list_val", [np.datetime64, np.int64, np.int32, int, datetime, Timestamp], ), ValueType.STRING_LIST: (StringList, "string_list_val", [np.str_, str]), ValueType.BOOL_LIST: (BoolList, "bool_list_val", [np.bool_, bool]), ValueType.BYTES_LIST: (BytesList, "bytes_list_val", [np.bytes_, bytes]), } PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: Dict[ ValueType, Tuple[str, Any, Optional[Set[Type]]] ] = { ValueType.INT32: ("int32_val", lambda x: int(x), None), ValueType.INT64: ( "int64_val", lambda x: int(x.timestamp()) if isinstance(x, pd._libs.tslibs.timestamps.Timestamp) else int(x), None, ), ValueType.FLOAT: ("float_val", lambda x: float(x), None), ValueType.DOUBLE: ("double_val", lambda x: x, {float, np.float64}), ValueType.STRING: ("string_val", lambda x: str(x), None), ValueType.BYTES: ("bytes_val", lambda x: x, {bytes}), ValueType.BOOL: ("bool_val", lambda x: x, {bool, np.bool_}), } def _python_datetime_to_int_timestamp( values: Sequence[Any], ) -> Sequence[Union[int, np.int_]]: # Fast path for Numpy array. if isinstance(values, np.ndarray) and isinstance(values.dtype, np.datetime64): if values.ndim != 1: raise ValueError("Only 1 dimensional arrays are supported.") return cast(Sequence[np.int_], values.astype("datetime64[s]").astype(np.int_)) int_timestamps = [] for value in values: if isinstance(value, datetime): int_timestamps.append(int(value.timestamp())) elif isinstance(value, Timestamp): int_timestamps.append(int(value.ToSeconds())) elif isinstance(value, np.datetime64): int_timestamps.append(value.astype("datetime64[s]").astype(np.int_)) else: int_timestamps.append(int(value)) return int_timestamps def _python_value_to_proto_value( feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: """ Converts a Python (native, pandas) value to a Feast Proto Value based on a provided value type Args: feast_value_type: The target value type values: List of Values that will be converted Returns: List of Feast Value Proto """ # ToDo: make a better sample for type checks (more than one element) sample = next(filter(_non_empty_value, values), None) # first not empty value # Detect list type and handle separately if "list" in # Feature can be list but None is still valid if feast_value_type in PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: proto_type, field_name, valid_types = PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE[ feast_value_type ] if sample is not None and not all( type(item) in valid_types for item in sample ): first_invalid = next( item for item in sample if type(item) not in valid_types ) raise _type_err(first_invalid, valid_types[0]) if feast_value_type == ValueType.UNIX_TIMESTAMP_LIST: int_timestamps_lists = ( _python_datetime_to_int_timestamp(value) for value in values ) return [ # ProtoValue does actually accept `np.int_` but the typing complains. ProtoValue(unix_timestamp_list_val=Int64List(val=ts)) # type: ignore for ts in int_timestamps_lists ] if feast_value_type == ValueType.BOOL_LIST: # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. return [ ProtoValue(**{field_name: proto_type(val=[bool(e) for e in value])}) # type: ignore if value is not None else ProtoValue() for value in values ] return [ ProtoValue(**{field_name: proto_type(val=value)}) # type: ignore if value is not None else ProtoValue() for value in values ] # Handle scalar types below else: if sample is None: # all input values are None return [ProtoValue()] * len(values) if feast_value_type == ValueType.UNIX_TIMESTAMP: int_timestamps = _python_datetime_to_int_timestamp(values) # ProtoValue does actually accept `np.int_` but the typing complains. return [ProtoValue(unix_timestamp_val=ts) for ts in int_timestamps] # type: ignore ( field_name, func, valid_scalar_types, ) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type] if valid_scalar_types: assert type(sample) in valid_scalar_types if feast_value_type == ValueType.BOOL: # ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_. return [ ProtoValue( **{ field_name: func( bool(value) if type(value) is np.bool_ else value # type: ignore ) } ) if not pd.isnull(value) else ProtoValue() for value in values ] if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: return [ ProtoValue(**{field_name: func(value)}) if not pd.isnull(value) else ProtoValue() for value in values ] raise Exception(f"Unsupported data type: ${str(type(values[0]))}")
[docs]def python_values_to_proto_values( values: List[Any], feature_type: ValueType = ValueType.UNKNOWN ) -> List[ProtoValue]: value_type = feature_type sample = next(filter(_non_empty_value, values), None) # first not empty value if sample is not None and feature_type == ValueType.UNKNOWN: if isinstance(sample, (list, np.ndarray)): value_type = ( feature_type if len(sample) == 0 else python_type_to_feast_value_type("", sample) ) else: value_type = python_type_to_feast_value_type("", sample) if value_type == ValueType.UNKNOWN: raise TypeError("Couldn't infer value type from empty value") return _python_value_to_proto_value(value_type, values)
def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType: """ Returns Feast ValueType given Feast ValueType string. Args: proto_str: str Returns: A variant of ValueType. """ proto_str = proto_value.WhichOneof("val") type_map = { "int32_val": ValueType.INT32, "int64_val": ValueType.INT64, "double_val": ValueType.DOUBLE, "float_val": ValueType.FLOAT, "string_val": ValueType.STRING, "bytes_val": ValueType.BYTES, "bool_val": ValueType.BOOL, "int32_list_val": ValueType.INT32_LIST, "int64_list_val": ValueType.INT64_LIST, "double_list_val": ValueType.DOUBLE_LIST, "float_list_val": ValueType.FLOAT_LIST, "string_list_val": ValueType.STRING_LIST, "bytes_list_val": ValueType.BYTES_LIST, "bool_list_val": ValueType.BOOL_LIST, None: ValueType.NULL, } return type_map[proto_str]
[docs]def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType: is_list = False if pa_type_as_str.startswith("list<item: "): is_list = True pa_type_as_str = pa_type_as_str.replace("list<item: ", "").replace(">", "") if pa_type_as_str.startswith("timestamp"): value_type = ValueType.UNIX_TIMESTAMP else: type_map = { "int32": ValueType.INT32, "int64": ValueType.INT64, "double": ValueType.DOUBLE, "float": ValueType.FLOAT, "string": ValueType.STRING, "binary": ValueType.BYTES, "bool": ValueType.BOOL, "null": ValueType.NULL, } value_type = type_map[pa_type_as_str] if is_list: value_type = ValueType[ + "_LIST"] return value_type
[docs]def bq_to_feast_value_type(bq_type_as_str: str) -> ValueType: is_list = False if bq_type_as_str.startswith("ARRAY<"): is_list = True bq_type_as_str = bq_type_as_str[6:-1] type_map: Dict[str, ValueType] = { "DATETIME": ValueType.UNIX_TIMESTAMP, "TIMESTAMP": ValueType.UNIX_TIMESTAMP, "INTEGER": ValueType.INT64, "INT64": ValueType.INT64, "STRING": ValueType.STRING, "FLOAT": ValueType.DOUBLE, "FLOAT64": ValueType.DOUBLE, "BYTES": ValueType.BYTES, "BOOL": ValueType.BOOL, "BOOLEAN": ValueType.BOOL, # legacy sql data type "NULL": ValueType.NULL, } value_type = type_map[bq_type_as_str] if is_list: value_type = ValueType[ + "_LIST"] return value_type
[docs]def redshift_to_feast_value_type(redshift_type_as_str: str) -> ValueType: # Type names from type_map = { "int2": ValueType.INT32, "int4": ValueType.INT32, "int8": ValueType.INT64, "numeric": ValueType.DOUBLE, "float4": ValueType.FLOAT, "float8": ValueType.DOUBLE, "bool": ValueType.BOOL, "character": ValueType.STRING, "varchar": ValueType.STRING, "timestamp": ValueType.UNIX_TIMESTAMP, "timestamptz": ValueType.UNIX_TIMESTAMP, # skip date, geometry, hllsketch, time, timetz } return type_map[redshift_type_as_str.lower()]
[docs]def snowflake_python_type_to_feast_value_type( snowflake_python_type_as_str: str, ) -> ValueType: type_map = { "str": ValueType.STRING, "float64": ValueType.DOUBLE, "int64": ValueType.INT64, "uint64": ValueType.INT64, "int32": ValueType.INT32, "uint32": ValueType.INT32, "int16": ValueType.INT32, "uint16": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, "datetime64[ns]": ValueType.UNIX_TIMESTAMP, "object": ValueType.STRING, "bool": ValueType.BOOL, } return type_map[snowflake_python_type_as_str.lower()]
[docs]def pa_to_redshift_value_type(pa_type: pyarrow.DataType) -> str: # PyArrow types: # Redshift type: pa_type_as_str = str(pa_type).lower() if pa_type_as_str.startswith("timestamp"): if "tz=" in pa_type_as_str: return "timestamptz" else: return "timestamp" if pa_type_as_str.startswith("date"): return "date" if pa_type_as_str.startswith("decimal"): # PyArrow decimal types (e.g. "decimal(38,37)") luckily directly map to the Redshift type. return pa_type_as_str if pa_type_as_str.startswith("list"): return "super" # We have to take into account how arrow types map to parquet types as well. # For example, null type maps to int32 in parquet, so we have to use int4 in Redshift. # Other mappings have also been adjusted accordingly. type_map = { "null": "int4", "bool": "bool", "int8": "int4", "int16": "int4", "int32": "int4", "int64": "int8", "uint8": "int4", "uint16": "int4", "uint32": "int8", "uint64": "int8", "float": "float4", "double": "float8", "binary": "varchar", "string": "varchar", } return type_map[pa_type_as_str]
def _non_empty_value(value: Any) -> bool: """ Check that there's enough data we can use for type inference. If primitive type - just checking that it's not None If iterable - checking that there's some elements (len > 0) String is special case: "" - empty string is considered non empty """ return value is not None and ( not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str) )
[docs]def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: # TODO not all spark types are convertible # Current non-convertible types: interval, map, struct, structfield, decimal, binary type_map: Dict[str, ValueType] = { "null": ValueType.UNKNOWN, "byte": ValueType.BYTES, "string": ValueType.STRING, "int": ValueType.INT32, "short": ValueType.INT32, "bigint": ValueType.INT64, "long": ValueType.INT64, "double": ValueType.DOUBLE, "float": ValueType.FLOAT, "boolean": ValueType.BOOL, "timestamp": ValueType.UNIX_TIMESTAMP, "array<byte>": ValueType.BYTES_LIST, "array<string>": ValueType.STRING_LIST, "array<int>": ValueType.INT32_LIST, "array<bigint>": ValueType.INT64_LIST, "array<double>": ValueType.DOUBLE_LIST, "array<float>": ValueType.FLOAT_LIST, "array<boolean>": ValueType.BOOL_LIST, "array<timestamp>": ValueType.UNIX_TIMESTAMP_LIST, } # TODO: Find better way of doing this. if type(spark_type_as_str) != str or spark_type_as_str not in type_map: return ValueType.NULL return type_map[spark_type_as_str.lower()]
[docs]def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[np.dtype]: # TODO recheck all typing (also tz for timestamp) # type_map = defaultdict( lambda: np.dtype("O"), { "boolean": np.dtype("bool"), "double": np.dtype("float64"), "float": np.dtype("float64"), "int": np.dtype("int64"), "bigint": np.dtype("int64"), "smallint": np.dtype("int64"), "timestamp": np.dtype("datetime64[ns]"), }, ) return (type_map[t] for _, t in dtypes)
[docs]def arrow_to_pg_type(t_str: str) -> str: try: if t_str.startswith("timestamp") or t_str.startswith("datetime"): return "timestamptz" if "tz=" in t_str else "timestamp" return { "null": "null", "bool": "boolean", "int8": "tinyint", "int16": "smallint", "int32": "int", "int64": "bigint", "list<item: int32>": "int[]", "list<item: int64>": "bigint[]", "list<item: bool>": "boolean[]", "list<item: double>": "double precision[]", "list<item: timestamp[us]>": "timestamp[]", "uint8": "smallint", "uint16": "int", "uint32": "bigint", "uint64": "bigint", "float": "float", "double": "double precision", "binary": "binary", "string": "text", }[t_str] except KeyError: raise ValueError(f"Unsupported type: {t_str}")
[docs]def pg_type_to_feast_value_type(type_str: str) -> ValueType: type_map: Dict[str, ValueType] = { "boolean": ValueType.BOOL, "bytea": ValueType.BYTES, "char": ValueType.STRING, "bigint": ValueType.INT64, "smallint": ValueType.INT32, "integer": ValueType.INT32, "real": ValueType.DOUBLE, "double precision": ValueType.DOUBLE, "boolean[]": ValueType.BOOL_LIST, "bytea[]": ValueType.BYTES_LIST, "char[]": ValueType.STRING_LIST, "smallint[]": ValueType.INT32_LIST, "integer[]": ValueType.INT32_LIST, "text": ValueType.STRING, "text[]": ValueType.STRING_LIST, "character[]": ValueType.STRING_LIST, "bigint[]": ValueType.INT64_LIST, "real[]": ValueType.DOUBLE_LIST, "double precision[]": ValueType.DOUBLE_LIST, "character": ValueType.STRING, "character varying": ValueType.STRING, "date": ValueType.UNIX_TIMESTAMP, "time without time zone": ValueType.UNIX_TIMESTAMP, "timestamp without time zone": ValueType.UNIX_TIMESTAMP, "timestamp without time zone[]": ValueType.UNIX_TIMESTAMP_LIST, "date[]": ValueType.UNIX_TIMESTAMP_LIST, "time without time zone[]": ValueType.UNIX_TIMESTAMP_LIST, "timestamp with time zone": ValueType.UNIX_TIMESTAMP, "timestamp with time zone[]": ValueType.UNIX_TIMESTAMP_LIST, "numeric[]": ValueType.DOUBLE_LIST, "numeric": ValueType.DOUBLE, "uuid": ValueType.STRING, "uuid[]": ValueType.STRING_LIST, } value = ( type_map[type_str.lower()] if type_str.lower() in type_map else ValueType.UNKNOWN ) if value == ValueType.UNKNOWN: print("unknown type:", type_str) return value
[docs]def feast_value_type_to_pa(feast_type: ValueType) -> pyarrow.DataType: type_map = { ValueType.INT32: pyarrow.int32(), ValueType.INT64: pyarrow.int64(), ValueType.DOUBLE: pyarrow.float64(), ValueType.FLOAT: pyarrow.float32(), ValueType.STRING: pyarrow.string(), ValueType.BYTES: pyarrow.binary(), ValueType.BOOL: pyarrow.bool_(), ValueType.UNIX_TIMESTAMP: pyarrow.timestamp("us"), ValueType.INT32_LIST: pyarrow.list_(pyarrow.int32()), ValueType.INT64_LIST: pyarrow.list_(pyarrow.int64()), ValueType.DOUBLE_LIST: pyarrow.list_(pyarrow.float64()), ValueType.FLOAT_LIST: pyarrow.list_(pyarrow.float32()), ValueType.STRING_LIST: pyarrow.list_(pyarrow.string()), ValueType.BYTES_LIST: pyarrow.list_(pyarrow.binary()), ValueType.BOOL_LIST: pyarrow.list_(pyarrow.bool_()), ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp("us")), ValueType.NULL: pyarrow.null(), } return type_map[feast_type]
[docs]def pg_type_code_to_pg_type(code: int) -> str: return { 16: "boolean", 17: "bytea", 20: "bigint", 21: "smallint", 23: "integer", 25: "text", 700: "real", 701: "double precision", 1000: "boolean[]", 1001: "bytea[]", 1005: "smallint[]", 1007: "integer[]", 1009: "text[]", 1014: "character[]", 1016: "bigint[]", 1021: "real[]", 1022: "double precision[]", 1042: "character", 1043: "character varying", 1082: "date", 1083: "time without time zone", 1114: "timestamp without time zone", 1115: "timestamp without time zone[]", 1182: "date[]", 1183: "time without time zone[]", 1184: "timestamp with time zone", 1185: "timestamp with time zone[]", 1231: "numeric[]", 1700: "numeric", 2950: "uuid", 2951: "uuid[]", }[code]
[docs]def pg_type_code_to_arrow(code: int) -> str: return feast_value_type_to_pa( pg_type_to_feast_value_type(pg_type_code_to_pg_type(code)) )