Source code for feast.type_map

# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Union

import numpy as np
import pandas as pd
import pyarrow as pa
from google.protobuf.json_format import MessageToDict
from google.protobuf.timestamp_pb2 import Timestamp
from pyarrow.lib import TimestampType

from feast.protos.feast.types.Value_pb2 import (
    BoolList,
    BytesList,
    DoubleList,
    FloatList,
    Int32List,
    Int64List,
    StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ProtoValue
from feast.protos.feast.types.Value_pb2 import ValueType as ProtoValueType
from feast.value_type import ValueType


[docs]def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: """ Converts field value Proto to Dict and returns each field's Feast Value Type value in their respective Python value. Args: field_value_proto: Field value Proto Returns: Python native type representation/version of the given field_value_proto """ field_value_dict = MessageToDict(field_value_proto) for k, v in field_value_dict.items(): if k == "int64Val": return int(v) if k == "bytesVal": return bytes(v) if (k == "int64ListVal") or (k == "int32ListVal"): return [int(item) for item in v["val"]] if (k == "floatListVal") or (k == "doubleListVal"): return [float(item) for item in v["val"]] if k == "stringListVal": return [str(item) for item in v["val"]] if k == "bytesListVal": return [bytes(item) for item in v["val"]] if k == "boolListVal": return [bool(item) for item in v["val"]] if k in ["int32Val", "floatVal", "doubleVal", "stringVal", "boolVal"]: return v else: raise TypeError( f"Casting to Python native type for type {k} failed. " f"Type {k} not found" )
[docs]def python_type_to_feast_value_type( name: str, value, recurse: bool = True ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native and Pandas types are supported. This function will recursively look for nested types when arrays are detected. All types must be homogenous. Args: name: Name of the value or field value: Value that will be inspected recurse: Whether to recursively look for nested types in arrays Returns: Feast Value Type """ type_name = type(value).__name__ type_map = { "int": ValueType.INT64, "str": ValueType.STRING, "float": ValueType.DOUBLE, "bytes": ValueType.BYTES, "float64": ValueType.DOUBLE, "float32": ValueType.FLOAT, "int64": ValueType.INT64, "uint64": ValueType.INT64, "int32": ValueType.INT32, "uint32": ValueType.INT32, "uint8": ValueType.INT32, "int8": ValueType.INT32, "bool": ValueType.BOOL, "timedelta": ValueType.UNIX_TIMESTAMP, "datetime64[ns]": ValueType.UNIX_TIMESTAMP, "datetime64[ns, tz]": ValueType.UNIX_TIMESTAMP, "category": ValueType.STRING, } if type_name in type_map: return type_map[type_name] if type_name == "ndarray" or isinstance(value, list): if recurse: # Convert to list type list_items = pd.core.series.Series(value) # This is the final type which we infer from the list common_item_value_type = None for item in list_items: if isinstance(item, ProtoValue): current_item_value_type = _proto_str_to_value_type( item.WhichOneof("val") ) else: # Get the type from the current item, only one level deep current_item_value_type = python_type_to_feast_value_type( name=name, value=item, recurse=False ) # Validate whether the type stays consistent if ( common_item_value_type and not common_item_value_type == current_item_value_type ): raise ValueError( f"List value type for field {name} is inconsistent. " f"{common_item_value_type} different from " f"{current_item_value_type}." ) common_item_value_type = current_item_value_type if common_item_value_type is None: raise ValueError( f"field {name} cannot have null values for type inference." ) return ValueType[common_item_value_type.name + "_LIST"] else: raise ValueError( f"Value type for field {name} is {value.dtype.__str__()} but " f"recursion is not allowed. Array types can only be one level " f"deep." ) return type_map[value.dtype.__str__()]
def _pd_datetime_to_timestamp_proto(dtype, value) -> Timestamp: """ Converts a Pandas datetime to a Timestamp Proto Args: dtype: Pandas datatype value: Value of datetime Returns: Timestamp protobuf value """ if type(value) in [np.float64, np.float32, np.int32, np.int64]: return Timestamp(seconds=int(value)) if dtype.__str__() == "datetime64[ns]": # If timestamp does not contain timezone, we assume it is of local # timezone and adjust it to UTC local_timezone = datetime.now(timezone.utc).astimezone().tzinfo value = value.tz_localize(local_timezone).tz_convert("UTC").tz_localize(None) return Timestamp(seconds=int(value.timestamp())) if dtype.__str__() == "datetime64[ns, UTC]": return Timestamp(seconds=int(value.timestamp())) else: return Timestamp(seconds=np.datetime64(value).astype("int64") // 1000000) # type: ignore def _type_err(item, dtype): raise ValueError(f'Value "{item}" is of type {type(item)} not of type {dtype}') def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: """ Converts a Python (native, pandas) value to a Feast Proto Value based on a provided value type Args: feast_value_type: The target value type value: Value that will be converted Returns: Feast Value Proto """ # Detect list type and handle separately if "list" in feast_value_type.name.lower(): if feast_value_type == ValueType.FLOAT_LIST: return ProtoValue( float_list_val=FloatList( val=[ item if type(item) in [np.float32, np.float64] else _type_err(item, np.float32) for item in value ] ) ) if feast_value_type == ValueType.DOUBLE_LIST: return ProtoValue( double_list_val=DoubleList( val=[ item if type(item) in [np.float64, np.float32] else _type_err(item, np.float64) for item in value ] ) ) if feast_value_type == ValueType.INT32_LIST: return ProtoValue( int32_list_val=Int32List( val=[ item if type(item) is np.int32 else _type_err(item, np.int32) for item in value ] ) ) if feast_value_type == ValueType.INT64_LIST: return ProtoValue( int64_list_val=Int64List( val=[ item if type(item) in [np.int64, np.int32] else _type_err(item, np.int64) for item in value ] ) ) if feast_value_type == ValueType.UNIX_TIMESTAMP_LIST: return ProtoValue( int64_list_val=Int64List( val=[ item if type(item) in [np.int64, np.int32] else _type_err(item, np.int64) for item in value ] ) ) if feast_value_type == ValueType.STRING_LIST: return ProtoValue( string_list_val=StringList( val=[ item if type(item) in [np.str_, str] else _type_err(item, np.str_) for item in value ] ) ) if feast_value_type == ValueType.BOOL_LIST: return ProtoValue( bool_list_val=BoolList( val=[ item if type(item) in [np.bool_, bool] else _type_err(item, np.bool_) for item in value ] ) ) if feast_value_type == ValueType.BYTES_LIST: return ProtoValue( bytes_list_val=BytesList( val=[ item if type(item) in [np.bytes_, bytes] else _type_err(item, np.bytes_) for item in value ] ) ) # Handle scalar types below else: if pd.isnull(value): return ProtoValue() elif feast_value_type == ValueType.INT32: return ProtoValue(int32_val=int(value)) elif feast_value_type == ValueType.INT64: return ProtoValue(int64_val=int(value)) elif feast_value_type == ValueType.UNIX_TIMESTAMP: return ProtoValue(int64_val=int(value)) elif feast_value_type == ValueType.FLOAT: return ProtoValue(float_val=float(value)) elif feast_value_type == ValueType.DOUBLE: assert type(value) is float or np.float64 return ProtoValue(double_val=value) elif feast_value_type == ValueType.STRING: return ProtoValue(string_val=str(value)) elif feast_value_type == ValueType.BYTES: assert type(value) is bytes return ProtoValue(bytes_val=value) elif feast_value_type == ValueType.BOOL: assert type(value) is bool return ProtoValue(bool_val=value) raise Exception(f"Unsupported data type: ${str(type(value))}")
[docs]def python_value_to_proto_value( value: Any, feature_type: ValueType = None ) -> ProtoValue: value_type = ( python_type_to_feast_value_type("", value) if value is not None else feature_type ) return _python_value_to_proto_value(value_type, value)
def _proto_str_to_value_type(proto_str: str) -> ValueType: """ Returns Feast ValueType given Feast ValueType string. Args: proto_str: str Returns: A variant of ValueType. """ type_map = { "int32_val": ValueType.INT32, "int64_val": ValueType.INT64, "double_val": ValueType.DOUBLE, "float_val": ValueType.FLOAT, "string_val": ValueType.STRING, "bytes_val": ValueType.BYTES, "bool_val": ValueType.BOOL, "int32_list_val": ValueType.INT32_LIST, "int64_list_val": ValueType.INT64_LIST, "double_list_val": ValueType.DOUBLE_LIST, "float_list_val": ValueType.FLOAT_LIST, "string_list_val": ValueType.STRING_LIST, "bytes_list_val": ValueType.BYTES_LIST, "bool_list_val": ValueType.BOOL_LIST, } return type_map[proto_str]
[docs]def pa_to_feast_value_attr(pa_type: object): """ Returns the equivalent Feast ValueType string for the given pa.lib type. Args: pa_type (object): PyArrow type. Returns: str: Feast attribute name in Feast ValueType string-ed representation. """ # Mapping of PyArrow type to attribute name in Feast ValueType strings type_map = { "timestamp[ms]": "int64_val", "int32": "int32_val", "int64": "int64_val", "double": "double_val", "float": "float_val", "string": "string_val", "binary": "bytes_val", "bool": "bool_val", "list<item: int32>": "int32_list_val", "list<item: int64>": "int64_list_val", "list<item: double>": "double_list_val", "list<item: float>": "float_list_val", "list<item: string>": "string_list_val", "list<item: binary>": "bytes_list_val", "list<item: bool>": "bool_list_val", } return type_map[pa_type.__str__()]
[docs]def pa_to_value_type(pa_type: object): """ Returns the equivalent Feast ValueType for the given pa.lib type. Args: pa_type (object): PyArrow type. Returns: feast.types.Value_pb2.ValueType: Feast ValueType. """ # Mapping of PyArrow to attribute name in Feast ValueType type_map = { "timestamp[ms]": ProtoValueType.INT64, "int32": ProtoValueType.INT32, "int64": ProtoValueType.INT64, "double": ProtoValueType.DOUBLE, "float": ProtoValueType.FLOAT, "string": ProtoValueType.STRING, "binary": ProtoValueType.BYTES, "bool": ProtoValueType.BOOL, "list<item: int32>": ProtoValueType.INT32_LIST, "list<item: int64>": ProtoValueType.INT64_LIST, "list<item: double>": ProtoValueType.DOUBLE_LIST, "list<item: float>": ProtoValueType.FLOAT_LIST, "list<item: string>": ProtoValueType.STRING_LIST, "list<item: binary>": ProtoValueType.BYTES_LIST, "list<item: bool>": ProtoValueType.BOOL_LIST, } return type_map[pa_type.__str__()]
[docs]def pa_to_feast_value_type(value: Union[pa.lib.ChunkedArray, str]) -> ValueType: value_type = ( value.type.__str__() if isinstance(value, pa.lib.ChunkedArray) else value ) if re.match(r"^timestamp", value_type): return ValueType.INT64 type_map = { "int32": ValueType.INT32, "int64": ValueType.INT64, "double": ValueType.DOUBLE, "float": ValueType.FLOAT, "string": ValueType.STRING, "binary": ValueType.BYTES, "bool": ValueType.BOOL, "list<item: int32>": ValueType.INT32_LIST, "list<item: int64>": ValueType.INT64_LIST, "list<item: double>": ValueType.DOUBLE_LIST, "list<item: float>": ValueType.FLOAT_LIST, "list<item: string>": ValueType.STRING_LIST, "list<item: binary>": ValueType.BYTES_LIST, "list<item: bool>": ValueType.BOOL_LIST, } return type_map[value_type]
[docs]def pa_column_to_timestamp_proto_column(column: pa.lib.ChunkedArray) -> List[Timestamp]: if not isinstance(column.type, TimestampType): raise Exception("Only TimestampType columns are allowed") proto_column = [] for val in column: timestamp = Timestamp() timestamp.FromMicroseconds(micros=int(val.as_py().timestamp() * 1_000_000)) proto_column.append(timestamp) return proto_column
[docs]def pa_column_to_proto_column( feast_value_type: ValueType, column: pa.lib.ChunkedArray ) -> List[ProtoValue]: type_map: Dict[ValueType, Union[str, Dict[str, Any]]] = { ValueType.INT32: "int32_val", ValueType.INT64: "int64_val", ValueType.FLOAT: "float_val", ValueType.DOUBLE: "double_val", ValueType.STRING: "string_val", ValueType.BYTES: "bytes_val", ValueType.BOOL: "bool_val", ValueType.BOOL_LIST: {"bool_list_val": BoolList}, ValueType.BYTES_LIST: {"bytes_list_val": BytesList}, ValueType.STRING_LIST: {"string_list_val": StringList}, ValueType.FLOAT_LIST: {"float_list_val": FloatList}, ValueType.DOUBLE_LIST: {"double_list_val": DoubleList}, ValueType.INT32_LIST: {"int32_list_val": Int32List}, ValueType.INT64_LIST: {"int64_list_val": Int64List}, } value: Union[str, Dict[str, Any]] = type_map[feast_value_type] # Process list types if isinstance(value, dict): list_param_name = list(value.keys())[0] return [ ProtoValue(**{list_param_name: value[list_param_name](val=x.as_py())}) for x in column ] else: return [ProtoValue(**{value: x.as_py()}) for x in column]
[docs]def bq_to_feast_value_type(bq_type_as_str): type_map: Dict[ValueType, Union[str, Dict[str, Any]]] = { "DATETIME": ValueType.STRING, # Update to ValueType.UNIX_TIMESTAMP once #1520 lands. "TIMESTAMP": ValueType.STRING, # Update to ValueType.UNIX_TIMESTAMP once #1520 lands. "INTEGER": ValueType.INT64, "INT64": ValueType.INT64, "STRING": ValueType.STRING, "FLOAT": ValueType.DOUBLE, "FLOAT64": ValueType.DOUBLE, "BYTES": ValueType.BYTES, "BOOL": ValueType.BOOL, "ARRAY<INT64>": ValueType.INT64_LIST, "ARRAY<FLOAT64>": ValueType.DOUBLE_LIST, "ARRAY<STRING>": ValueType.STRING_LIST, "ARRAY<BYTES>": ValueType.BYTES_LIST, "ARRAY<BOOL>": ValueType.BOOL_LIST, } return type_map[bq_type_as_str]