Source code for feast.infra.offline_stores.file_source

from typing import Callable, Dict, Iterable, List, Optional, Tuple

from pyarrow._fs import FileSystem
from pyarrow._s3fs import S3FileSystem
from pyarrow.parquet import ParquetDataset
from typeguard import typechecked

from feast import type_map
from feast.data_format import FileFormat, ParquetFormat
from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.FeatureService_pb2 import (
    LoggingConfig as LoggingConfigProto,
from feast.protos.feast.core.SavedDataset_pb2 import (
    SavedDatasetStorage as SavedDatasetStorageProto,
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.value_type import ValueType

[docs]@typechecked class FileSource(DataSource): def __init__( self, *, path: str, name: Optional[str] = "", event_timestamp_column: Optional[str] = "", file_format: Optional[FileFormat] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, s3_endpoint_override: Optional[str] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", timestamp_field: Optional[str] = "", ): """ Creates a FileSource object. Args: path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and feature columns. name (optional): Name for the file source. Defaults to the path. event_timestamp_column (optional): (Deprecated in favor of timestamp_field) Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format. field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. s3_endpoint_override (optional): Overrides AWS S3 enpoint with custom S3 storage description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the file source, typically the email of the primary maintainer. timestamp_field (optional): Event timestamp field used for point in time joins of feature values. Examples: >>> from feast import FileSource >>> file_source = FileSource(path="my_features.parquet", timestamp_field="event_timestamp") """ self.file_options = FileOptions( file_format=file_format, uri=path, s3_endpoint_override=s3_endpoint_override, ) super().__init__( name=name if name else path, timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping, description=description, tags=tags, owner=owner, ) # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): return super().__hash__() def __eq__(self, other): if not isinstance(other, FileSource): raise TypeError("Comparisons should only involve FileSource class objects.") return ( super().__eq__(other) and self.path == other.path and self.file_options.file_format == other.file_options.file_format and self.file_options.s3_endpoint_override == other.file_options.s3_endpoint_override ) @property def path(self) -> str: """Returns the path of this file data source.""" return self.file_options.uri @property def file_format(self) -> Optional[FileFormat]: """Returns the file format of this file data source.""" return self.file_options.file_format @property def s3_endpoint_override(self) -> Optional[str]: """Returns the s3 endpoint override of this file data source.""" return self.file_options.s3_endpoint_override
[docs] @staticmethod def from_proto(data_source: DataSourceProto): return FileSource(, field_mapping=dict(data_source.field_mapping), file_format=FileFormat.from_proto(data_source.file_options.file_format), path=data_source.file_options.uri, timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, s3_endpoint_override=data_source.file_options.s3_endpoint_override, description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, )
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto(, type=DataSourceProto.BATCH_FILE, field_mapping=self.field_mapping, file_options=self.file_options.to_proto(), description=self.description, tags=self.tags, owner=self.owner, ) data_source_proto.timestamp_field = self.timestamp_field data_source_proto.created_timestamp_column = self.created_timestamp_column return data_source_proto
[docs] def validate(self, config: RepoConfig): # TODO: validate a FileSource pass
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: return type_map.pa_to_feast_value_type
[docs] def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: filesystem, path = FileSource.create_filesystem_and_path( self.path, self.file_options.s3_endpoint_override ) # Adding support for different file format path # based on S3 filesystem if filesystem is None: schema = ParquetDataset(path, use_legacy_dataset=False).schema if hasattr(schema, "names") and hasattr(schema, "types"): # Newer versions of pyarrow doesn't have this method, # but this field is good enough. pass else: schema = schema.to_arrow_schema() else: schema = ParquetDataset(path, filesystem=filesystem).schema return zip(schema.names, map(str, schema.types))
[docs] @staticmethod def create_filesystem_and_path( path: str, s3_endpoint_override: str ) -> Tuple[Optional[FileSystem], str]: if path.startswith("s3://"): s3fs = S3FileSystem( endpoint_override=s3_endpoint_override if s3_endpoint_override else None ) return s3fs, path.replace("s3://", "") else: return None, path
[docs] def get_table_query_string(self) -> str: pass
[docs]class FileOptions: """ Configuration options for a file data source. Attributes: uri: File source url, e.g. s3:// or local file. s3_endpoint_override: Custom s3 endpoint (used only with s3 uri). file_format: File source format, e.g. parquet. """ uri: str file_format: Optional[FileFormat] s3_endpoint_override: str def __init__( self, uri: str, file_format: Optional[FileFormat], s3_endpoint_override: Optional[str], ): """ Initializes a FileOptions object. Args: uri: File source url, e.g. s3:// or local file. file_format (optional): File source format, e.g. parquet. s3_endpoint_override (optional): Custom s3 endpoint (used only with s3 uri). """ self.uri = uri self.file_format = file_format self.s3_endpoint_override = s3_endpoint_override or ""
[docs] @classmethod def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): """ Creates a FileOptions from a protobuf representation of a file option Args: file_options_proto: a protobuf representation of a datasource Returns: Returns a FileOptions object based on the file_options protobuf """ file_options = cls( file_format=FileFormat.from_proto(file_options_proto.file_format), uri=file_options_proto.uri, s3_endpoint_override=file_options_proto.s3_endpoint_override, ) return file_options
[docs] def to_proto(self) -> DataSourceProto.FileOptions: """ Converts an FileOptionsProto object to its protobuf representation. Returns: FileOptionsProto protobuf """ file_options_proto = DataSourceProto.FileOptions( file_format=( None if self.file_format is None else self.file_format.to_proto() ), uri=self.uri, s3_endpoint_override=self.s3_endpoint_override, ) return file_options_proto
[docs]class SavedDatasetFileStorage(SavedDatasetStorage): _proto_attr_name = "file_storage" file_options: FileOptions def __init__( self, path: str, file_format: FileFormat = ParquetFormat(), s3_endpoint_override: Optional[str] = None, ): self.file_options = FileOptions( file_format=file_format, s3_endpoint_override=s3_endpoint_override, uri=path, )
[docs] @staticmethod def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: file_options = FileOptions.from_proto(storage_proto.file_storage) return SavedDatasetFileStorage( path=file_options.uri, file_format=file_options.file_format, s3_endpoint_override=file_options.s3_endpoint_override, )
[docs] def to_proto(self) -> SavedDatasetStorageProto: return SavedDatasetStorageProto(file_storage=self.file_options.to_proto())
[docs] def to_data_source(self) -> DataSource: return FileSource( path=self.file_options.uri, file_format=self.file_options.file_format, s3_endpoint_override=self.file_options.s3_endpoint_override, )
[docs] @staticmethod def from_data_source(data_source: DataSource) -> "SavedDatasetStorage": assert isinstance(data_source, FileSource) return SavedDatasetFileStorage( path=data_source.path, file_format=data_source.file_format if data_source.file_format else ParquetFormat(), s3_endpoint_override=data_source.s3_endpoint_override, )
[docs]class FileLoggingDestination(LoggingDestination): _proto_kind = "file_destination" path: str s3_endpoint_override: str partition_by: Optional[List[str]] def __init__( self, *, path: str, s3_endpoint_override="", partition_by: Optional[List[str]] = None, ): self.path = path self.s3_endpoint_override = s3_endpoint_override self.partition_by = partition_by
[docs] @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": return FileLoggingDestination( path=config_proto.file_destination.path, s3_endpoint_override=config_proto.file_destination.s3_endpoint_override, partition_by=list(config_proto.file_destination.partition_by) if config_proto.file_destination.partition_by else None, )
[docs] def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( file_destination=LoggingConfigProto.FileDestination( path=self.path, s3_endpoint_override=self.s3_endpoint_override, partition_by=self.partition_by, ) )
[docs] def to_data_source(self) -> DataSource: return FileSource( path=self.path, file_format=ParquetFormat(), s3_endpoint_override=self.s3_endpoint_override, )