Source code for feast.type_map

# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from typing import Any, Dict, Union

import numpy as np
import pandas as pd
from google.protobuf.json_format import MessageToDict

from feast.protos.feast.types.Value_pb2 import (
    BoolList,
    BytesList,
    DoubleList,
    FloatList,
    Int32List,
    Int64List,
    StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ProtoValue
from feast.value_type import ValueType


[docs]def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: """ Converts field value Proto to Dict and returns each field's Feast Value Type value in their respective Python value. Args: field_value_proto: Field value Proto Returns: Python native type representation/version of the given field_value_proto """ field_value_dict = MessageToDict(field_value_proto) for k, v in field_value_dict.items(): if k == "int64Val": return int(v) if k == "bytesVal": return bytes(v) if (k == "int64ListVal") or (k == "int32ListVal"): return [int(item) for item in v["val"]] if (k == "floatListVal") or (k == "doubleListVal"): return [float(item) for item in v["val"]] if k == "stringListVal": return [str(item) for item in v["val"]] if k == "bytesListVal": return [bytes(item) for item in v["val"]] if k == "boolListVal": return [bool(item) for item in v["val"]] if k in ["int32Val", "floatVal", "doubleVal", "stringVal", "boolVal"]: return v else: raise TypeError( f"Casting to Python native type for type {k} failed. " f"Type {k} not found" )
[docs]def python_type_to_feast_value_type( name: str, value, recurse: bool = True ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native and Pandas types are supported. This function will recursively look for nested types when arrays are detected. All types must be homogenous. Args: name: Name of the value or field value: Value that will be inspected recurse: Whether to recursively look for nested types in arrays Returns: Feast Value Type """ type_name = type(value).__name__ type_map = { "int": ValueType.INT64, "str": ValueType.STRING, "float": ValueType.DOUBLE, "bytes": ValueType.BYTES, "float64": ValueType.DOUBLE, "float32": ValueType.FLOAT, "int64": ValueType.INT64, "uint64": ValueType.INT64, "int32": ValueType.INT32, "uint32": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, "bool": ValueType.BOOL, "timedelta": ValueType.UNIX_TIMESTAMP, "datetime64[ns]": ValueType.UNIX_TIMESTAMP, "datetime64[ns, tz]": ValueType.UNIX_TIMESTAMP, "category": ValueType.STRING, } if type_name in type_map: return type_map[type_name] if type_name == "ndarray" or isinstance(value, list): if recurse: # Convert to list type list_items = pd.core.series.Series(value) # This is the final type which we infer from the list common_item_value_type = None for item in list_items: if isinstance(item, ProtoValue): current_item_value_type = _proto_str_to_value_type( str(item.WhichOneof("val")) ) else: # Get the type from the current item, only one level deep current_item_value_type = python_type_to_feast_value_type( name=name, value=item, recurse=False ) # Validate whether the type stays consistent if ( common_item_value_type and not common_item_value_type == current_item_value_type ): raise ValueError( f"List value type for field {name} is inconsistent. " f"{common_item_value_type} different from " f"{current_item_value_type}." ) common_item_value_type = current_item_value_type if common_item_value_type is None: raise ValueError( f"field {name} cannot have null values for type inference." ) return ValueType[common_item_value_type.name + "_LIST"] else: raise ValueError( f"Value type for field {name} is {value.dtype.__str__()} but " f"recursion is not allowed. Array types can only be one level " f"deep." ) return type_map[value.dtype.__str__()]
def _type_err(item, dtype): raise ValueError(f'Value "{item}" is of type {type(item)} not of type {dtype}') def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: """ Converts a Python (native, pandas) value to a Feast Proto Value based on a provided value type Args: feast_value_type: The target value type value: Value that will be converted Returns: Feast Value Proto """ # Detect list type and handle separately if "list" in feast_value_type.name.lower(): if feast_value_type == ValueType.FLOAT_LIST: return ProtoValue( float_list_val=FloatList( val=[ item if type(item) in [np.float32, np.float64, float] else _type_err(item, np.float32) for item in value ] ) ) if feast_value_type == ValueType.DOUBLE_LIST: return ProtoValue( double_list_val=DoubleList( val=[ item if type(item) in [np.float64, np.float32, float] else _type_err(item, np.float64) for item in value ] ) ) if feast_value_type == ValueType.INT32_LIST: return ProtoValue( int32_list_val=Int32List( val=[ item if type(item) is np.int32 else _type_err(item, np.int32) for item in value ] ) ) if feast_value_type == ValueType.INT64_LIST: return ProtoValue( int64_list_val=Int64List( val=[ item if type(item) in [np.int64, np.int32] else _type_err(item, np.int64) for item in value ] ) ) if feast_value_type == ValueType.UNIX_TIMESTAMP_LIST: return ProtoValue( int64_list_val=Int64List( val=[ item if type(item) in [np.int64, np.int32] else _type_err(item, np.int64) for item in value ] ) ) if feast_value_type == ValueType.STRING_LIST: return ProtoValue( string_list_val=StringList( val=[ item if type(item) in [np.str_, str] else _type_err(item, np.str_) for item in value ] ) ) if feast_value_type == ValueType.BOOL_LIST: return ProtoValue( bool_list_val=BoolList( val=[ item if type(item) in [np.bool_, bool] else _type_err(item, np.bool_) for item in value ] ) ) if feast_value_type == ValueType.BYTES_LIST: return ProtoValue( bytes_list_val=BytesList( val=[ item if type(item) in [np.bytes_, bytes] else _type_err(item, np.bytes_) for item in value ] ) ) # Handle scalar types below else: if pd.isnull(value): return ProtoValue() elif feast_value_type == ValueType.INT32: return ProtoValue(int32_val=int(value)) elif feast_value_type == ValueType.INT64: return ProtoValue(int64_val=int(value)) elif feast_value_type == ValueType.UNIX_TIMESTAMP: return ProtoValue(int64_val=int(value)) elif feast_value_type == ValueType.FLOAT: return ProtoValue(float_val=float(value)) elif feast_value_type == ValueType.DOUBLE: assert type(value) is float or np.float64 return ProtoValue(double_val=value) elif feast_value_type == ValueType.STRING: return ProtoValue(string_val=str(value)) elif feast_value_type == ValueType.BYTES: assert type(value) is bytes return ProtoValue(bytes_val=value) elif feast_value_type == ValueType.BOOL: assert type(value) is bool return ProtoValue(bool_val=value) raise Exception(f"Unsupported data type: ${str(type(value))}")
[docs]def python_value_to_proto_value( value: Any, feature_type: ValueType = None ) -> ProtoValue: value_type = ( python_type_to_feast_value_type("", value) if value is not None else feature_type ) return _python_value_to_proto_value(value_type, value)
def _proto_str_to_value_type(proto_str: str) -> ValueType: """ Returns Feast ValueType given Feast ValueType string. Args: proto_str: str Returns: A variant of ValueType. """ type_map = { "int32_val": ValueType.INT32, "int64_val": ValueType.INT64, "double_val": ValueType.DOUBLE, "float_val": ValueType.FLOAT, "string_val": ValueType.STRING, "bytes_val": ValueType.BYTES, "bool_val": ValueType.BOOL, "int32_list_val": ValueType.INT32_LIST, "int64_list_val": ValueType.INT64_LIST, "double_list_val": ValueType.DOUBLE_LIST, "float_list_val": ValueType.FLOAT_LIST, "string_list_val": ValueType.STRING_LIST, "bytes_list_val": ValueType.BYTES_LIST, "bool_list_val": ValueType.BOOL_LIST, } return type_map[proto_str]
[docs]def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType: if re.match(r"^timestamp", pa_type_as_str): return ValueType.INT64 type_map = { "int32": ValueType.INT32, "int64": ValueType.INT64, "double": ValueType.DOUBLE, "float": ValueType.FLOAT, "string": ValueType.STRING, "binary": ValueType.BYTES, "bool": ValueType.BOOL, "list<item: int32>": ValueType.INT32_LIST, "list<item: int64>": ValueType.INT64_LIST, "list<item: double>": ValueType.DOUBLE_LIST, "list<item: float>": ValueType.FLOAT_LIST, "list<item: string>": ValueType.STRING_LIST, "list<item: binary>": ValueType.BYTES_LIST, "list<item: bool>": ValueType.BOOL_LIST, } return type_map[pa_type_as_str]
[docs]def bq_to_feast_value_type(bq_type_as_str): type_map: Dict[ValueType, Union[str, Dict[str, Any]]] = { "DATETIME": ValueType.STRING, # Update to ValueType.UNIX_TIMESTAMP once #1520 lands. "TIMESTAMP": ValueType.STRING, # Update to ValueType.UNIX_TIMESTAMP once #1520 lands. "INTEGER": ValueType.INT64, "INT64": ValueType.INT64, "STRING": ValueType.STRING, "FLOAT": ValueType.DOUBLE, "FLOAT64": ValueType.DOUBLE, "BYTES": ValueType.BYTES, "BOOL": ValueType.BOOL, "ARRAY<INT64>": ValueType.INT64_LIST, "ARRAY<FLOAT64>": ValueType.DOUBLE_LIST, "ARRAY<STRING>": ValueType.STRING_LIST, "ARRAY<BYTES>": ValueType.BYTES_LIST, "ARRAY<BOOL>": ValueType.BOOL_LIST, } return type_map[bq_type_as_str]
[docs]def redshift_to_feast_value_type(redshift_type_as_str: str) -> ValueType: # Type names from https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html type_map = { "int2": ValueType.INT32, "int4": ValueType.INT32, "int8": ValueType.INT64, "numeric": ValueType.DOUBLE, "float4": ValueType.FLOAT, "float8": ValueType.DOUBLE, "bool": ValueType.BOOL, "character": ValueType.STRING, "varchar": ValueType.STRING, "timestamp": ValueType.UNIX_TIMESTAMP, "timestamptz": ValueType.UNIX_TIMESTAMP, # skip date, geometry, hllsketch, time, timetz } return type_map[redshift_type_as_str.lower()]
[docs]def pa_to_redshift_value_type(pa_type_as_str: str) -> str: # PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html # Redshift type: https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html pa_type_as_str = pa_type_as_str.lower() if pa_type_as_str.startswith("timestamp"): if "tz=" in pa_type_as_str: return "timestamptz" else: return "timestamp" if pa_type_as_str.startswith("date"): return "date" if pa_type_as_str.startswith("decimal"): # PyArrow decimal types (e.g. "decimal(38,37)") luckily directly map to the Redshift type. return pa_type_as_str # We have to take into account how arrow types map to parquet types as well. # For example, null type maps to int32 in parquet, so we have to use int4 in Redshift. # Other mappings have also been adjusted accordingly. type_map = { "null": "int4", "bool": "bool", "int8": "int4", "int16": "int4", "int32": "int4", "int64": "int8", "uint8": "int4", "uint16": "int4", "uint32": "int8", "uint64": "int8", "float": "float4", "double": "float8", "binary": "varchar", "string": "varchar", } return type_map[pa_type_as_str]