# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Optional,
Sequence,
Set,
Sized,
Tuple,
Type,
Union,
cast,
)
import numpy as np
import pandas as pd
from google.protobuf.timestamp_pb2 import Timestamp
from feast.protos.feast.types.Value_pb2 import (
BoolList,
BytesList,
DoubleList,
FloatList,
Int32List,
Int64List,
StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ProtoValue
from feast.value_type import ListType, ValueType
if TYPE_CHECKING:
import pyarrow
# null timestamps get converted to -9223372036854775808
NULL_TIMESTAMP_INT_VALUE = np.datetime64("NaT").astype(int)
[docs]def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any:
"""
Converts field value Proto to Dict and returns each field's Feast Value Type value
in their respective Python value.
Args:
field_value_proto: Field value Proto
Returns:
Python native type representation/version of the given field_value_proto
"""
val_attr = field_value_proto.WhichOneof("val")
if val_attr is None:
return None
val = getattr(field_value_proto, val_attr)
# If it's a _LIST type extract the list.
if hasattr(val, "val"):
val = list(val.val)
# Convert UNIX_TIMESTAMP values to `datetime`
if val_attr == "unix_timestamp_list_val":
val = [
datetime.fromtimestamp(v, tz=timezone.utc)
if v != NULL_TIMESTAMP_INT_VALUE
else None
for v in val
]
elif val_attr == "unix_timestamp_val":
val = (
datetime.fromtimestamp(val, tz=timezone.utc)
if val != NULL_TIMESTAMP_INT_VALUE
else None
)
return val
[docs]def feast_value_type_to_pandas_type(value_type: ValueType) -> Any:
value_type_to_pandas_type: Dict[ValueType, str] = {
ValueType.FLOAT: "float",
ValueType.INT32: "int",
ValueType.INT64: "int",
ValueType.STRING: "str",
ValueType.DOUBLE: "float",
ValueType.BYTES: "bytes",
ValueType.BOOL: "bool",
ValueType.UNIX_TIMESTAMP: "datetime64[ns]",
}
if value_type.name.endswith("_LIST"):
return "object"
if value_type in value_type_to_pandas_type:
return value_type_to_pandas_type[value_type]
raise TypeError(
f"Casting to pandas type for type {value_type} failed. "
f"Type {value_type} not found"
)
[docs]def python_type_to_feast_value_type(
name: str, value: Any = None, recurse: bool = True, type_name: Optional[str] = None
) -> ValueType:
"""
Finds the equivalent Feast Value Type for a Python value. Both native
and Pandas types are supported. This function will recursively look
for nested types when arrays are detected. All types must be homogenous.
Args:
name: Name of the value or field
value: Value that will be inspected
recurse: Whether to recursively look for nested types in arrays
Returns:
Feast Value Type
"""
type_name = (type_name or type(value).__name__).lower()
type_map = {
"int": ValueType.INT64,
"str": ValueType.STRING,
"string": ValueType.STRING, # pandas.StringDtype
"float": ValueType.DOUBLE,
"bytes": ValueType.BYTES,
"float64": ValueType.DOUBLE,
"float32": ValueType.FLOAT,
"int64": ValueType.INT64,
"uint64": ValueType.INT64,
"int32": ValueType.INT32,
"uint32": ValueType.INT32,
"int16": ValueType.INT32,
"uint16": ValueType.INT32,
"uint8": ValueType.INT32,
"int8": ValueType.INT32,
"bool": ValueType.BOOL,
"boolean": ValueType.BOOL,
"timedelta": ValueType.UNIX_TIMESTAMP,
"timestamp": ValueType.UNIX_TIMESTAMP,
"datetime": ValueType.UNIX_TIMESTAMP,
"datetime64[ns]": ValueType.UNIX_TIMESTAMP,
"datetime64[ns, tz]": ValueType.UNIX_TIMESTAMP,
"category": ValueType.STRING,
}
if type_name in type_map:
return type_map[type_name]
if isinstance(value, np.ndarray) and str(value.dtype) in type_map:
item_type = type_map[str(value.dtype)]
return ValueType[item_type.name + "_LIST"]
if isinstance(value, (list, np.ndarray)):
# if the value's type is "ndarray" and we couldn't infer from "value.dtype"
# this is most probably array of "object",
# so we need to iterate over objects and try to infer type of each item
if not recurse:
raise ValueError(
f"Value type for field {name} is {type(value)} but "
f"recursion is not allowed. Array types can only be one level "
f"deep."
)
# This is the final type which we infer from the list
common_item_value_type = None
for item in value:
if isinstance(item, ProtoValue):
current_item_value_type: ValueType = _proto_value_to_value_type(item)
else:
# Get the type from the current item, only one level deep
current_item_value_type = python_type_to_feast_value_type(
name=name, value=item, recurse=False
)
# Validate whether the type stays consistent
if (
common_item_value_type
and not common_item_value_type == current_item_value_type
):
raise ValueError(
f"List value type for field {name} is inconsistent. "
f"{common_item_value_type} different from "
f"{current_item_value_type}."
)
common_item_value_type = current_item_value_type
if common_item_value_type is None:
return ValueType.UNKNOWN
return ValueType[common_item_value_type.name + "_LIST"]
raise ValueError(
f"Value with native type {type_name} "
f"cannot be converted into Feast value type"
)
[docs]def python_values_to_feast_value_type(
name: str, values: Any, recurse: bool = True
) -> ValueType:
inferred_dtype = ValueType.UNKNOWN
for row in values:
current_dtype = python_type_to_feast_value_type(
name, value=row, recurse=recurse
)
if inferred_dtype is ValueType.UNKNOWN:
inferred_dtype = current_dtype
else:
if current_dtype != inferred_dtype and current_dtype not in (
ValueType.UNKNOWN,
ValueType.NULL,
):
raise TypeError(
f"Input entity {name} has mixed types, {current_dtype} and {inferred_dtype}. That is not allowed. "
)
if inferred_dtype in (ValueType.UNKNOWN, ValueType.NULL):
raise ValueError(
f"field {name} cannot have all null values for type inference."
)
return inferred_dtype
def _convert_value_type_str_to_value_type(type_str: str) -> ValueType:
type_map = {
"UNKNOWN": ValueType.UNKNOWN,
"BYTES": ValueType.BYTES,
"STRING": ValueType.STRING,
"INT32": ValueType.INT32,
"INT64": ValueType.INT64,
"DOUBLE": ValueType.DOUBLE,
"FLOAT": ValueType.FLOAT,
"BOOL": ValueType.BOOL,
"NULL": ValueType.NULL,
"UNIX_TIMESTAMP": ValueType.UNIX_TIMESTAMP,
"BYTES_LIST": ValueType.BYTES_LIST,
"STRING_LIST": ValueType.STRING_LIST,
"INT32_LIST ": ValueType.INT32_LIST,
"INT64_LIST": ValueType.INT64_LIST,
"DOUBLE_LIST": ValueType.DOUBLE_LIST,
"FLOAT_LIST": ValueType.FLOAT_LIST,
"BOOL_LIST": ValueType.BOOL_LIST,
"UNIX_TIMESTAMP_LIST": ValueType.UNIX_TIMESTAMP_LIST,
}
return type_map[type_str]
def _type_err(item, dtype):
raise TypeError(f'Value "{item}" is of type {type(item)} not of type {dtype}')
PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE: Dict[
ValueType, Tuple[ListType, str, List[Type]]
] = {
ValueType.FLOAT_LIST: (
FloatList,
"float_list_val",
[np.float32, np.float64, float],
),
ValueType.DOUBLE_LIST: (
DoubleList,
"double_list_val",
[np.float64, np.float32, float],
),
ValueType.INT32_LIST: (Int32List, "int32_list_val", [np.int64, np.int32, int]),
ValueType.INT64_LIST: (Int64List, "int64_list_val", [np.int64, np.int32, int]),
ValueType.UNIX_TIMESTAMP_LIST: (
Int64List,
"int64_list_val",
[np.datetime64, np.int64, np.int32, int, datetime, Timestamp],
),
ValueType.STRING_LIST: (StringList, "string_list_val", [np.str_, str]),
ValueType.BOOL_LIST: (BoolList, "bool_list_val", [np.bool_, bool]),
ValueType.BYTES_LIST: (BytesList, "bytes_list_val", [np.bytes_, bytes]),
}
PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: Dict[
ValueType, Tuple[str, Any, Optional[Set[Type]]]
] = {
ValueType.INT32: ("int32_val", lambda x: int(x), None),
ValueType.INT64: (
"int64_val",
lambda x: int(x.timestamp())
if isinstance(x, pd._libs.tslibs.timestamps.Timestamp)
else int(x),
None,
),
ValueType.FLOAT: ("float_val", lambda x: float(x), None),
ValueType.DOUBLE: ("double_val", lambda x: x, {float, np.float64}),
ValueType.STRING: ("string_val", lambda x: str(x), None),
ValueType.BYTES: ("bytes_val", lambda x: x, {bytes}),
ValueType.BOOL: ("bool_val", lambda x: x, {bool, np.bool_, int, np.int_}),
}
def _python_datetime_to_int_timestamp(
values: Sequence[Any],
) -> Sequence[Union[int, np.int_]]:
# Fast path for Numpy array.
if isinstance(values, np.ndarray) and isinstance(values.dtype, np.datetime64):
if values.ndim != 1:
raise ValueError("Only 1 dimensional arrays are supported.")
return cast(Sequence[np.int_], values.astype("datetime64[s]").astype(np.int_))
int_timestamps = []
for value in values:
if isinstance(value, datetime):
int_timestamps.append(int(value.timestamp()))
elif isinstance(value, Timestamp):
int_timestamps.append(int(value.ToSeconds()))
elif isinstance(value, np.datetime64):
int_timestamps.append(value.astype("datetime64[s]").astype(np.int_))
elif isinstance(value, type(np.nan)):
int_timestamps.append(NULL_TIMESTAMP_INT_VALUE)
else:
int_timestamps.append(int(value))
return int_timestamps
def _python_value_to_proto_value(
feast_value_type: ValueType, values: List[Any]
) -> List[ProtoValue]:
"""
Converts a Python (native, pandas) value to a Feast Proto Value based
on a provided value type
Args:
feast_value_type: The target value type
values: List of Values that will be converted
Returns:
List of Feast Value Proto
"""
# ToDo: make a better sample for type checks (more than one element)
sample = next(filter(_non_empty_value, values), None) # first not empty value
# Detect list type and handle separately
if "list" in feast_value_type.name.lower():
# Feature can be list but None is still valid
if feast_value_type in PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE:
proto_type, field_name, valid_types = PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE[
feast_value_type
]
if sample is not None and not all(
type(item) in valid_types for item in sample
):
first_invalid = next(
item for item in sample if type(item) not in valid_types
)
raise _type_err(first_invalid, valid_types[0])
if feast_value_type == ValueType.UNIX_TIMESTAMP_LIST:
int_timestamps_lists = (
_python_datetime_to_int_timestamp(value) for value in values
)
return [
# ProtoValue does actually accept `np.int_` but the typing complains.
ProtoValue(unix_timestamp_list_val=Int64List(val=ts)) # type: ignore
for ts in int_timestamps_lists
]
if feast_value_type == ValueType.BOOL_LIST:
# ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_.
return [
ProtoValue(**{field_name: proto_type(val=[bool(e) for e in value])}) # type: ignore
if value is not None
else ProtoValue()
for value in values
]
return [
ProtoValue(**{field_name: proto_type(val=value)}) # type: ignore
if value is not None
else ProtoValue()
for value in values
]
# Handle scalar types below
else:
if sample is None:
# all input values are None
return [ProtoValue()] * len(values)
if feast_value_type == ValueType.UNIX_TIMESTAMP:
int_timestamps = _python_datetime_to_int_timestamp(values)
# ProtoValue does actually accept `np.int_` but the typing complains.
return [ProtoValue(unix_timestamp_val=ts) for ts in int_timestamps] # type: ignore
(
field_name,
func,
valid_scalar_types,
) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type]
if valid_scalar_types:
if (sample == 0 or sample == 0.0) and feast_value_type != ValueType.BOOL:
# Numpy convert 0 to int. However, in the feature view definition, the type of column may be a float.
# So, if value is 0, type validation must pass if scalar_types are either int or float.
allowed_types = {np.int64, int, np.float64, float}
assert (
type(sample) in allowed_types
), f"Type `{type(sample)}` not in {allowed_types}"
else:
assert (
type(sample) in valid_scalar_types
), f"Type `{type(sample)}` not in {valid_scalar_types}"
if feast_value_type == ValueType.BOOL:
# ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_.
return [
ProtoValue(
**{
field_name: func(
bool(value) if type(value) is np.bool_ else value # type: ignore
)
}
)
if not pd.isnull(value)
else ProtoValue()
for value in values
]
if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE:
return [
ProtoValue(**{field_name: func(value)})
if not pd.isnull(value)
else ProtoValue()
for value in values
]
raise Exception(f"Unsupported data type: ${str(type(values[0]))}")
[docs]def python_values_to_proto_values(
values: List[Any], feature_type: ValueType = ValueType.UNKNOWN
) -> List[ProtoValue]:
value_type = feature_type
sample = next(filter(_non_empty_value, values), None) # first not empty value
if sample is not None and feature_type == ValueType.UNKNOWN:
if isinstance(sample, (list, np.ndarray)):
value_type = (
feature_type
if len(sample) == 0
else python_type_to_feast_value_type("", sample)
)
else:
value_type = python_type_to_feast_value_type("", sample)
if value_type == ValueType.UNKNOWN:
raise TypeError("Couldn't infer value type from empty value")
return _python_value_to_proto_value(value_type, values)
def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType:
"""
Returns Feast ValueType given Feast ValueType string.
Args:
proto_str: str
Returns:
A variant of ValueType.
"""
proto_str = proto_value.WhichOneof("val")
type_map = {
"int32_val": ValueType.INT32,
"int64_val": ValueType.INT64,
"double_val": ValueType.DOUBLE,
"float_val": ValueType.FLOAT,
"string_val": ValueType.STRING,
"bytes_val": ValueType.BYTES,
"bool_val": ValueType.BOOL,
"int32_list_val": ValueType.INT32_LIST,
"int64_list_val": ValueType.INT64_LIST,
"double_list_val": ValueType.DOUBLE_LIST,
"float_list_val": ValueType.FLOAT_LIST,
"string_list_val": ValueType.STRING_LIST,
"bytes_list_val": ValueType.BYTES_LIST,
"bool_list_val": ValueType.BOOL_LIST,
None: ValueType.NULL,
}
return type_map[proto_str]
[docs]def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType:
is_list = False
if pa_type_as_str.startswith("list<item: "):
is_list = True
pa_type_as_str = pa_type_as_str.replace("list<item: ", "").replace(">", "")
if pa_type_as_str.startswith("timestamp"):
value_type = ValueType.UNIX_TIMESTAMP
else:
type_map = {
"int32": ValueType.INT32,
"int64": ValueType.INT64,
"double": ValueType.DOUBLE,
"float": ValueType.FLOAT,
"string": ValueType.STRING,
"binary": ValueType.BYTES,
"bool": ValueType.BOOL,
"null": ValueType.NULL,
}
value_type = type_map[pa_type_as_str]
if is_list:
value_type = ValueType[value_type.name + "_LIST"]
return value_type
[docs]def bq_to_feast_value_type(bq_type_as_str: str) -> ValueType:
is_list = False
if bq_type_as_str.startswith("ARRAY<"):
is_list = True
bq_type_as_str = bq_type_as_str[6:-1]
type_map: Dict[str, ValueType] = {
"DATETIME": ValueType.UNIX_TIMESTAMP,
"TIMESTAMP": ValueType.UNIX_TIMESTAMP,
"INTEGER": ValueType.INT64,
"INT64": ValueType.INT64,
"STRING": ValueType.STRING,
"FLOAT": ValueType.DOUBLE,
"FLOAT64": ValueType.DOUBLE,
"BYTES": ValueType.BYTES,
"BOOL": ValueType.BOOL,
"BOOLEAN": ValueType.BOOL, # legacy sql data type
"NULL": ValueType.NULL,
}
value_type = type_map[bq_type_as_str]
if is_list:
value_type = ValueType[value_type.name + "_LIST"]
return value_type
[docs]def mssql_to_feast_value_type(mssql_type_as_str: str) -> ValueType:
type_map = {
"bigint": ValueType.FLOAT,
"binary": ValueType.BYTES,
"bit": ValueType.BOOL,
"char": ValueType.STRING,
"date": ValueType.UNIX_TIMESTAMP,
"datetime": ValueType.UNIX_TIMESTAMP,
"float": ValueType.FLOAT,
"nchar": ValueType.STRING,
"nvarchar": ValueType.STRING,
"nvarchar(max)": ValueType.STRING,
"real": ValueType.FLOAT,
"smallint": ValueType.INT32,
"tinyint": ValueType.INT32,
"varbinary": ValueType.BYTES,
"varchar": ValueType.STRING,
"None": ValueType.NULL,
# skip date, geometry, hllsketch, time, timetz
}
if mssql_type_as_str.lower() not in type_map:
raise ValueError(f"Mssql type not supported by feast {mssql_type_as_str}")
return type_map[mssql_type_as_str.lower()]
[docs]def pa_to_mssql_type(pa_type: "pyarrow.DataType") -> str:
# PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html
# MS Sql types: https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16
pa_type_as_str = str(pa_type).lower()
if pa_type_as_str.startswith("timestamp"):
if "tz=" in pa_type_as_str:
return "datetime2"
else:
return "datetime"
if pa_type_as_str.startswith("date"):
return "date"
if pa_type_as_str.startswith("decimal"):
return pa_type_as_str
# We have to take into account how arrow types map to parquet types as well.
# For example, null type maps to int32 in parquet, so we have to use int4 in Redshift.
# Other mappings have also been adjusted accordingly.
type_map = {
"null": "None",
"bool": "bit",
"int8": "tinyint",
"int16": "smallint",
"int32": "int",
"int64": "bigint",
"uint8": "tinyint",
"uint16": "smallint",
"uint32": "int",
"uint64": "bigint",
"float": "float",
"double": "real",
"binary": "binary",
"string": "varchar",
}
if pa_type_as_str.lower() not in type_map:
raise ValueError(f"MS SQL Server type not supported by feast {pa_type_as_str}")
return type_map[pa_type_as_str]
[docs]def redshift_to_feast_value_type(redshift_type_as_str: str) -> ValueType:
# Type names from https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html
type_map = {
"int2": ValueType.INT32,
"int4": ValueType.INT32,
"int8": ValueType.INT64,
"numeric": ValueType.DOUBLE,
"float4": ValueType.FLOAT,
"float8": ValueType.DOUBLE,
"bool": ValueType.BOOL,
"character": ValueType.STRING,
"varchar": ValueType.STRING,
"timestamp": ValueType.UNIX_TIMESTAMP,
"timestamptz": ValueType.UNIX_TIMESTAMP,
# skip date, geometry, hllsketch, time, timetz
}
return type_map[redshift_type_as_str.lower()]
[docs]def snowflake_type_to_feast_value_type(snowflake_type: str) -> ValueType:
type_map = {
"BINARY": ValueType.BYTES,
"VARCHAR": ValueType.STRING,
"NUMBER32": ValueType.INT32,
"NUMBER64": ValueType.INT64,
"NUMBERwSCALE": ValueType.DOUBLE,
"DOUBLE": ValueType.DOUBLE,
"BOOLEAN": ValueType.BOOL,
"DATE": ValueType.UNIX_TIMESTAMP,
"TIMESTAMP": ValueType.UNIX_TIMESTAMP,
"TIMESTAMP_TZ": ValueType.UNIX_TIMESTAMP,
"TIMESTAMP_LTZ": ValueType.UNIX_TIMESTAMP,
"TIMESTAMP_NTZ": ValueType.UNIX_TIMESTAMP,
}
return type_map[snowflake_type]
def _convert_value_name_to_snowflake_udf(value_name: str, project_name: str) -> str:
name_map = {
"BYTES": f"feast_{project_name}_snowflake_binary_to_bytes_proto",
"STRING": f"feast_{project_name}_snowflake_varchar_to_string_proto",
"INT32": f"feast_{project_name}_snowflake_number_to_int32_proto",
"INT64": f"feast_{project_name}_snowflake_number_to_int64_proto",
"DOUBLE": f"feast_{project_name}_snowflake_float_to_double_proto",
"FLOAT": f"feast_{project_name}_snowflake_float_to_double_proto",
"BOOL": f"feast_{project_name}_snowflake_boolean_to_bool_proto",
"UNIX_TIMESTAMP": f"feast_{project_name}_snowflake_timestamp_to_unix_timestamp_proto",
}
return name_map[value_name].upper()
[docs]def pa_to_redshift_value_type(pa_type: "pyarrow.DataType") -> str:
# PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html
# Redshift type: https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html
pa_type_as_str = str(pa_type).lower()
if pa_type_as_str.startswith("timestamp"):
if "tz=" in pa_type_as_str:
return "timestamptz"
else:
return "timestamp"
if pa_type_as_str.startswith("date"):
return "date"
if pa_type_as_str.startswith("decimal"):
# PyArrow decimal types (e.g. "decimal(38,37)") luckily directly map to the Redshift type.
return pa_type_as_str
if pa_type_as_str.startswith("list"):
return "super"
# We have to take into account how arrow types map to parquet types as well.
# For example, null type maps to int32 in parquet, so we have to use int4 in Redshift.
# Other mappings have also been adjusted accordingly.
type_map = {
"null": "int4",
"bool": "bool",
"int8": "int4",
"int16": "int4",
"int32": "int4",
"int64": "int8",
"uint8": "int4",
"uint16": "int4",
"uint32": "int8",
"uint64": "int8",
"float": "float4",
"double": "float8",
"binary": "varchar",
"string": "varchar",
}
return type_map[pa_type_as_str]
def _non_empty_value(value: Any) -> bool:
"""
Check that there's enough data we can use for type inference.
If primitive type - just checking that it's not None
If iterable - checking that there's some elements (len > 0)
String is special case: "" - empty string is considered non empty
"""
return value is not None and (
not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str)
)
[docs]def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType:
# TODO not all spark types are convertible
# Current non-convertible types: interval, map, struct, structfield, decimal, binary
type_map: Dict[str, ValueType] = {
"null": ValueType.UNKNOWN,
"byte": ValueType.BYTES,
"string": ValueType.STRING,
"int": ValueType.INT32,
"short": ValueType.INT32,
"bigint": ValueType.INT64,
"long": ValueType.INT64,
"double": ValueType.DOUBLE,
"float": ValueType.FLOAT,
"boolean": ValueType.BOOL,
"timestamp": ValueType.UNIX_TIMESTAMP,
"array<byte>": ValueType.BYTES_LIST,
"array<string>": ValueType.STRING_LIST,
"array<int>": ValueType.INT32_LIST,
"array<bigint>": ValueType.INT64_LIST,
"array<double>": ValueType.DOUBLE_LIST,
"array<float>": ValueType.FLOAT_LIST,
"array<boolean>": ValueType.BOOL_LIST,
"array<timestamp>": ValueType.UNIX_TIMESTAMP_LIST,
}
# TODO: Find better way of doing this.
if type(spark_type_as_str) != str or spark_type_as_str not in type_map:
return ValueType.NULL
return type_map[spark_type_as_str.lower()]
[docs]def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[np.dtype]:
# TODO recheck all typing (also tz for timestamp)
# https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics
type_map = defaultdict(
lambda: np.dtype("O"),
{
"boolean": np.dtype("bool"),
"double": np.dtype("float64"),
"float": np.dtype("float64"),
"int": np.dtype("int64"),
"bigint": np.dtype("int64"),
"smallint": np.dtype("int64"),
"timestamp": np.dtype("datetime64[ns]"),
},
)
return (type_map[t] for _, t in dtypes)
[docs]def arrow_to_pg_type(t_str: str) -> str:
try:
if t_str.startswith("timestamp") or t_str.startswith("datetime"):
return "timestamptz" if "tz=" in t_str else "timestamp"
return {
"null": "null",
"bool": "boolean",
"int8": "tinyint",
"int16": "smallint",
"int32": "int",
"int64": "bigint",
"list<item: int32>": "int[]",
"list<item: int64>": "bigint[]",
"list<item: bool>": "boolean[]",
"list<item: double>": "double precision[]",
"list<item: timestamp[us]>": "timestamp[]",
"uint8": "smallint",
"uint16": "int",
"uint32": "bigint",
"uint64": "bigint",
"float": "float",
"double": "double precision",
"binary": "binary",
"string": "text",
}[t_str]
except KeyError:
raise ValueError(f"Unsupported type: {t_str}")
[docs]def pg_type_to_feast_value_type(type_str: str) -> ValueType:
type_map: Dict[str, ValueType] = {
"boolean": ValueType.BOOL,
"bytea": ValueType.BYTES,
"char": ValueType.STRING,
"bigint": ValueType.INT64,
"smallint": ValueType.INT32,
"integer": ValueType.INT32,
"real": ValueType.DOUBLE,
"double precision": ValueType.DOUBLE,
"boolean[]": ValueType.BOOL_LIST,
"bytea[]": ValueType.BYTES_LIST,
"char[]": ValueType.STRING_LIST,
"smallint[]": ValueType.INT32_LIST,
"integer[]": ValueType.INT32_LIST,
"text": ValueType.STRING,
"text[]": ValueType.STRING_LIST,
"character[]": ValueType.STRING_LIST,
"bigint[]": ValueType.INT64_LIST,
"real[]": ValueType.DOUBLE_LIST,
"double precision[]": ValueType.DOUBLE_LIST,
"character": ValueType.STRING,
"character varying": ValueType.STRING,
"date": ValueType.UNIX_TIMESTAMP,
"time without time zone": ValueType.UNIX_TIMESTAMP,
"timestamp without time zone": ValueType.UNIX_TIMESTAMP,
"timestamp without time zone[]": ValueType.UNIX_TIMESTAMP_LIST,
"date[]": ValueType.UNIX_TIMESTAMP_LIST,
"time without time zone[]": ValueType.UNIX_TIMESTAMP_LIST,
"timestamp with time zone": ValueType.UNIX_TIMESTAMP,
"timestamp with time zone[]": ValueType.UNIX_TIMESTAMP_LIST,
"numeric[]": ValueType.DOUBLE_LIST,
"numeric": ValueType.DOUBLE,
"uuid": ValueType.STRING,
"uuid[]": ValueType.STRING_LIST,
}
value = (
type_map[type_str.lower()]
if type_str.lower() in type_map
else ValueType.UNKNOWN
)
if value == ValueType.UNKNOWN:
print("unknown type:", type_str)
return value
[docs]def feast_value_type_to_pa(
feast_type: ValueType, timestamp_unit: str = "us"
) -> "pyarrow.DataType":
import pyarrow
type_map = {
ValueType.INT32: pyarrow.int32(),
ValueType.INT64: pyarrow.int64(),
ValueType.DOUBLE: pyarrow.float64(),
ValueType.FLOAT: pyarrow.float32(),
ValueType.STRING: pyarrow.string(),
ValueType.BYTES: pyarrow.binary(),
ValueType.BOOL: pyarrow.bool_(),
ValueType.UNIX_TIMESTAMP: pyarrow.timestamp(timestamp_unit),
ValueType.INT32_LIST: pyarrow.list_(pyarrow.int32()),
ValueType.INT64_LIST: pyarrow.list_(pyarrow.int64()),
ValueType.DOUBLE_LIST: pyarrow.list_(pyarrow.float64()),
ValueType.FLOAT_LIST: pyarrow.list_(pyarrow.float32()),
ValueType.STRING_LIST: pyarrow.list_(pyarrow.string()),
ValueType.BYTES_LIST: pyarrow.list_(pyarrow.binary()),
ValueType.BOOL_LIST: pyarrow.list_(pyarrow.bool_()),
ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp(timestamp_unit)),
ValueType.NULL: pyarrow.null(),
}
return type_map[feast_type]
[docs]def pg_type_code_to_pg_type(code: int) -> str:
return {
16: "boolean",
17: "bytea",
20: "bigint",
21: "smallint",
23: "integer",
25: "text",
700: "real",
701: "double precision",
1000: "boolean[]",
1001: "bytea[]",
1005: "smallint[]",
1007: "integer[]",
1009: "text[]",
1014: "character[]",
1016: "bigint[]",
1021: "real[]",
1022: "double precision[]",
1042: "character",
1043: "character varying",
1082: "date",
1083: "time without time zone",
1114: "timestamp without time zone",
1115: "timestamp without time zone[]",
1182: "date[]",
1183: "time without time zone[]",
1184: "timestamp with time zone",
1185: "timestamp with time zone[]",
1231: "numeric[]",
1700: "numeric",
2950: "uuid",
2951: "uuid[]",
}[code]
[docs]def pg_type_code_to_arrow(code: int) -> str:
return feast_value_type_to_pa(
pg_type_to_feast_value_type(pg_type_code_to_pg_type(code))
)
[docs]def athena_to_feast_value_type(athena_type_as_str: str) -> ValueType:
# Type names from https://docs.aws.amazon.com/athena/latest/ug/data-types.html
type_map = {
"null": ValueType.UNKNOWN,
"boolean": ValueType.BOOL,
"tinyint": ValueType.INT32,
"smallint": ValueType.INT32,
"int": ValueType.INT32,
"bigint": ValueType.INT64,
"double": ValueType.DOUBLE,
"float": ValueType.FLOAT,
"binary": ValueType.BYTES,
"char": ValueType.STRING,
"varchar": ValueType.STRING,
"string": ValueType.STRING,
"timestamp": ValueType.UNIX_TIMESTAMP,
# skip date,decimal,array,map,struct
}
return type_map[athena_type_as_str.lower()]
[docs]def pa_to_athena_value_type(pa_type: "pyarrow.DataType") -> str:
# PyArrow types: https://arrow.apache.org/docs/python/api/datatypes.html
# Type names from https://docs.aws.amazon.com/athena/latest/ug/data-types.html
pa_type_as_str = str(pa_type).lower()
if pa_type_as_str.startswith("timestamp"):
return "timestamp"
if pa_type_as_str.startswith("date"):
return "date"
if pa_type_as_str.startswith("python_values_to_proto_values"):
return pa_type_as_str
# We have to take into account how arrow types map to parquet types as well.
# For example, null type maps to int32 in parquet, so we have to use int4 in Redshift.
# Other mappings have also been adjusted accordingly.
type_map = {
"null": "null",
"bool": "boolean",
"int8": "tinyint",
"int16": "smallint",
"int32": "int",
"int64": "bigint",
"uint8": "tinyint",
"uint16": "tinyint",
"uint32": "tinyint",
"uint64": "tinyint",
"float": "float",
"double": "double",
"binary": "binary",
"string": "string",
}
return type_map[pa_type_as_str]