Source code for feast.data_format
# Copyright 2020 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY aIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from feast.protos.feast.core.DataFormat_pb2 import FileFormat as FileFormatProto
from feast.protos.feast.core.DataFormat_pb2 import StreamFormat as StreamFormatProto
[docs]class FileFormat(ABC):
"""
Defines an abtract file forma used to encode feature data in files
"""
[docs] @abstractmethod
def to_proto(self):
"""
Convert this FileFormat into its protobuf representation.
"""
pass
def __eq__(self, other):
return self.to_proto() == other.to_proto()
[docs] @classmethod
def from_proto(cls, proto):
"""
Construct this FileFormat from its protobuf representation.
Raises NotImplementedError if FileFormat specified in given proto is not supported.
"""
fmt = proto.WhichOneof("format")
if fmt == "parquet_format":
return ParquetFormat()
if fmt is None:
return None
raise NotImplementedError(f"FileFormat is unsupported: {fmt}")
def __str__(self):
"""
String representation of the file format passed to spark
"""
raise NotImplementedError()
[docs]class ParquetFormat(FileFormat):
"""
Defines the Parquet data format
"""
def __str__(self):
return "parquet"
[docs]class StreamFormat(ABC):
"""
Defines an abtracts streaming data format used to encode feature data in streams
"""
[docs] @abstractmethod
def to_proto(self):
"""
Convert this StreamFormat into its protobuf representation.
"""
pass
def __eq__(self, other):
return self.to_proto() == other.to_proto()
[docs] @classmethod
def from_proto(cls, proto):
"""
Construct this StreamFormat from its protobuf representation.
"""
fmt = proto.WhichOneof("format")
if fmt == "avro_format":
return AvroFormat(schema_json=proto.avro_format.schema_json)
if fmt == "json_format":
return JsonFormat(schema_json=proto.json_format.schema_json)
if fmt == "proto_format":
return ProtoFormat(class_path=proto.proto_format.class_path)
raise NotImplementedError(f"StreamFormat is unsupported: {fmt}")
[docs]class AvroFormat(StreamFormat):
"""
Defines the Avro streaming data format that encodes data in Avro format
"""
def __init__(self, schema_json: str):
"""
Construct a new Avro data format.
Args:
schema_json: Avro schema definition in JSON
"""
self.schema_json = schema_json
[docs] def to_proto(self):
proto = StreamFormatProto.AvroFormat(schema_json=self.schema_json)
return StreamFormatProto(avro_format=proto)
[docs]class JsonFormat(StreamFormat):
"""
Defines the Json streaming data format that encodes data in Json format
"""
def __init__(self, schema_json: str):
"""
Construct a new Json data format.
For spark, uses pyspark ddl string format. Example shown here:
https://vincent.doba.fr/posts/20211004_spark_data_description_language_for_defining_spark_schema/
Args:
schema_json: Json schema definition
"""
self.schema_json = schema_json
[docs] def to_proto(self):
proto = StreamFormatProto.JsonFormat(schema_json=self.schema_json)
return StreamFormatProto(json_format=proto)
[docs]class ProtoFormat(StreamFormat):
"""
Defines the Protobuf data format
"""
def __init__(self, class_path: str):
"""
Construct a new Protobuf data format.
Args:
class_path: Class path to the Java Protobuf class that can be used to decode protobuf messages.;
"""
self.class_path = class_path
[docs] def to_proto(self):
return StreamFormatProto(
proto_format=StreamFormatProto.ProtoFormat(class_path=self.class_path)
)