Source code for feast.data_source

# Copyright 2020 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import enum
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple

from feast import type_map
from feast.data_format import StreamFormat
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig, get_data_source_class_from_type
from feast.value_type import ValueType


[docs]class SourceType(enum.Enum): """ DataSource value type. Used to define source types in DataSource. """ UNKNOWN = 0 BATCH_FILE = 1 BATCH_BIGQUERY = 2 STREAM_KAFKA = 3 STREAM_KINESIS = 4
[docs]class KafkaOptions: """ DataSource Kafka options used to source features from Kafka messages """ def __init__( self, bootstrap_servers: str, message_format: StreamFormat, topic: str, ): self._bootstrap_servers = bootstrap_servers self._message_format = message_format self._topic = topic @property def bootstrap_servers(self): """ Returns a comma-separated list of Kafka bootstrap servers """ return self._bootstrap_servers @bootstrap_servers.setter def bootstrap_servers(self, bootstrap_servers): """ Sets a comma-separated list of Kafka bootstrap servers """ self._bootstrap_servers = bootstrap_servers @property def message_format(self): """ Returns the data format that is used to encode the feature data in Kafka messages """ return self._message_format @message_format.setter def message_format(self, message_format): """ Sets the data format that is used to encode the feature data in Kafka messages """ self._message_format = message_format @property def topic(self): """ Returns the Kafka topic to collect feature data from """ return self._topic @topic.setter def topic(self, topic): """ Sets the Kafka topic to collect feature data from """ self._topic = topic
[docs] @classmethod def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): """ Creates a KafkaOptions from a protobuf representation of a kafka option Args: kafka_options_proto: A protobuf representation of a DataSource Returns: Returns a BigQueryOptions object based on the kafka_options protobuf """ kafka_options = cls( bootstrap_servers=kafka_options_proto.bootstrap_servers, message_format=StreamFormat.from_proto(kafka_options_proto.message_format), topic=kafka_options_proto.topic, ) return kafka_options
[docs] def to_proto(self) -> DataSourceProto.KafkaOptions: """ Converts an KafkaOptionsProto object to its protobuf representation. Returns: KafkaOptionsProto protobuf """ kafka_options_proto = DataSourceProto.KafkaOptions( bootstrap_servers=self.bootstrap_servers, message_format=self.message_format.to_proto(), topic=self.topic, ) return kafka_options_proto
[docs]class KinesisOptions: """ DataSource Kinesis options used to source features from Kinesis records """ def __init__( self, record_format: StreamFormat, region: str, stream_name: str, ): self._record_format = record_format self._region = region self._stream_name = stream_name @property def record_format(self): """ Returns the data format used to encode the feature data in the Kinesis records. """ return self._record_format @record_format.setter def record_format(self, record_format): """ Sets the data format used to encode the feature data in the Kinesis records. """ self._record_format = record_format @property def region(self): """ Returns the AWS region of Kinesis stream """ return self._region @region.setter def region(self, region): """ Sets the AWS region of Kinesis stream """ self._region = region @property def stream_name(self): """ Returns the Kinesis stream name to obtain feature data from """ return self._stream_name @stream_name.setter def stream_name(self, stream_name): """ Sets the Kinesis stream name to obtain feature data from """ self._stream_name = stream_name
[docs] @classmethod def from_proto(cls, kinesis_options_proto: DataSourceProto.KinesisOptions): """ Creates a KinesisOptions from a protobuf representation of a kinesis option Args: kinesis_options_proto: A protobuf representation of a DataSource Returns: Returns a KinesisOptions object based on the kinesis_options protobuf """ kinesis_options = cls( record_format=StreamFormat.from_proto(kinesis_options_proto.record_format), region=kinesis_options_proto.region, stream_name=kinesis_options_proto.stream_name, ) return kinesis_options
[docs] def to_proto(self) -> DataSourceProto.KinesisOptions: """ Converts an KinesisOptionsProto object to its protobuf representation. Returns: KinesisOptionsProto protobuf """ kinesis_options_proto = DataSourceProto.KinesisOptions( record_format=self.record_format.to_proto(), region=self.region, stream_name=self.stream_name, ) return kinesis_options_proto
[docs]class DataSource(ABC): """ DataSource that can be used to source features. Args: event_timestamp_column (optional): Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column indicating when the row was created, used for deduplicating rows. field_mapping (optional): A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entity or timestamp columns. date_partition_column (optional): Timestamp column used for partitioning. """ _event_timestamp_column: str _created_timestamp_column: str _field_mapping: Dict[str, str] _date_partition_column: str def __init__( self, event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, ): """Creates a DataSource object.""" self._event_timestamp_column = ( event_timestamp_column if event_timestamp_column else "" ) self._created_timestamp_column = ( created_timestamp_column if created_timestamp_column else "" ) self._field_mapping = field_mapping if field_mapping else {} self._date_partition_column = ( date_partition_column if date_partition_column else "" ) def __eq__(self, other): if not isinstance(other, DataSource): raise TypeError("Comparisons should only involve DataSource class objects.") if ( self.event_timestamp_column != other.event_timestamp_column or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column ): return False return True @property def field_mapping(self) -> Dict[str, str]: """ Returns the field mapping of this data source. """ return self._field_mapping @field_mapping.setter def field_mapping(self, field_mapping): """ Sets the field mapping of this data source. """ self._field_mapping = field_mapping @property def event_timestamp_column(self) -> str: """ Returns the event timestamp column of this data source. """ return self._event_timestamp_column @event_timestamp_column.setter def event_timestamp_column(self, event_timestamp_column): """ Sets the event timestamp column of this data source. """ self._event_timestamp_column = event_timestamp_column @property def created_timestamp_column(self) -> str: """ Returns the created timestamp column of this data source. """ return self._created_timestamp_column @created_timestamp_column.setter def created_timestamp_column(self, created_timestamp_column): """ Sets the created timestamp column of this data source. """ self._created_timestamp_column = created_timestamp_column @property def date_partition_column(self) -> str: """ Returns the date partition column of this data source. """ return self._date_partition_column @date_partition_column.setter def date_partition_column(self, date_partition_column): """ Sets the date partition column of this data source. """ self._date_partition_column = date_partition_column
[docs] @staticmethod @abstractmethod def from_proto(data_source: DataSourceProto) -> Any: """ Converts data source config in protobuf spec to a DataSource class object. Args: data_source: A protobuf representation of a DataSource. Returns: A DataSource class object. Raises: ValueError: The type of DataSource could not be identified. """ if data_source.data_source_class_type: cls = get_data_source_class_from_type(data_source.data_source_class_type) return cls.from_proto(data_source) if data_source.file_options.file_format and data_source.file_options.file_url: from feast.infra.offline_stores.file_source import FileSource data_source_obj = FileSource.from_proto(data_source) elif ( data_source.bigquery_options.table_ref or data_source.bigquery_options.query ): from feast.infra.offline_stores.bigquery_source import BigQuerySource data_source_obj = BigQuerySource.from_proto(data_source) elif data_source.redshift_options.table or data_source.redshift_options.query: from feast.infra.offline_stores.redshift_source import RedshiftSource data_source_obj = RedshiftSource.from_proto(data_source) elif data_source.snowflake_options.table or data_source.snowflake_options.query: from feast.infra.offline_stores.snowflake_source import SnowflakeSource data_source_obj = SnowflakeSource.from_proto(data_source) elif ( data_source.kafka_options.bootstrap_servers and data_source.kafka_options.topic and data_source.kafka_options.message_format ): data_source_obj = KafkaSource.from_proto(data_source) elif ( data_source.kinesis_options.record_format and data_source.kinesis_options.region and data_source.kinesis_options.stream_name ): data_source_obj = KinesisSource.from_proto(data_source) else: raise ValueError("Could not identify the source type being added.") return data_source_obj
[docs] @abstractmethod def to_proto(self) -> DataSourceProto: """ Converts an DataSourceProto object to its protobuf representation. """ raise NotImplementedError
[docs] def validate(self, config: RepoConfig): """ Validates the underlying data source. Args: config: Configuration object used to configure a feature store. """ raise NotImplementedError
[docs] @staticmethod @abstractmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: """ Returns the callable method that returns Feast type given the raw column type. """ raise NotImplementedError
[docs] def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: """ Returns the list of column names and raw column types. Args: config: Configuration object used to configure a feature store. """ raise NotImplementedError
[docs] def get_table_query_string(self) -> str: """ Returns a string that can directly be used to reference this table in SQL. """ raise NotImplementedError
[docs]class KafkaSource(DataSource):
[docs] def validate(self, config: RepoConfig): pass
[docs] def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: pass
def __init__( self, event_timestamp_column: str, bootstrap_servers: str, message_format: StreamFormat, topic: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, message_format=message_format, topic=topic, ) def __eq__(self, other): if not isinstance(other, KafkaSource): raise TypeError( "Comparisons should only involve KafkaSource class objects." ) if ( self.kafka_options.bootstrap_servers != other.kafka_options.bootstrap_servers or self.kafka_options.message_format != other.kafka_options.message_format or self.kafka_options.topic != other.kafka_options.topic ): return False return True @property def kafka_options(self): """ Returns the kafka options of this data source """ return self._kafka_options @kafka_options.setter def kafka_options(self, kafka_options): """ Sets the kafka options of this data source """ self._kafka_options = kafka_options
[docs] @staticmethod def from_proto(data_source: DataSourceProto): return KafkaSource( field_mapping=dict(data_source.field_mapping), bootstrap_servers=data_source.kafka_options.bootstrap_servers, message_format=StreamFormat.from_proto( data_source.kafka_options.message_format ), topic=data_source.kafka_options.topic, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, )
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KAFKA, field_mapping=self.field_mapping, kafka_options=self.kafka_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: return type_map.redshift_to_feast_value_type
[docs]class RequestDataSource(DataSource): """ RequestDataSource that can be used to provide input features for on demand transforms Args: name: Name of the request data source schema: Schema mapping from the input feature name to a ValueType """
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: raise NotImplementedError
_name: str _schema: Dict[str, ValueType] def __init__( self, name: str, schema: Dict[str, ValueType], ): """Creates a RequestDataSource object.""" super().__init__() self._name = name self._schema = schema @property def name(self) -> str: """ Returns the name of this data source """ return self._name @property def schema(self) -> Dict[str, ValueType]: """ Returns the schema for this request data source """ return self._schema
[docs] def validate(self, config: RepoConfig): pass
[docs] def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: pass
[docs] @staticmethod def from_proto(data_source: DataSourceProto): schema_pb = data_source.request_data_options.schema schema = {} for key in schema_pb.keys(): schema[key] = ValueType(schema_pb.get(key)) return RequestDataSource( name=data_source.request_data_options.name, schema=schema )
[docs] def to_proto(self) -> DataSourceProto: schema_pb = {} for key, value in self._schema.items(): schema_pb[key] = value.value options = DataSourceProto.RequestDataOptions(name=self._name, schema=schema_pb) data_source_proto = DataSourceProto( type=DataSourceProto.REQUEST_SOURCE, request_data_options=options ) return data_source_proto
[docs]class KinesisSource(DataSource):
[docs] def validate(self, config: RepoConfig): pass
[docs] def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: pass
[docs] @staticmethod def from_proto(data_source: DataSourceProto): return KinesisSource( field_mapping=dict(data_source.field_mapping), record_format=StreamFormat.from_proto( data_source.kinesis_options.record_format ), region=data_source.kinesis_options.region, stream_name=data_source.kinesis_options.stream_name, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, )
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: pass
def __init__( self, event_timestamp_column: str, created_timestamp_column: str, record_format: StreamFormat, region: str, stream_name: str, field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._kinesis_options = KinesisOptions( record_format=record_format, region=region, stream_name=stream_name ) def __eq__(self, other): if other is None: return False if not isinstance(other, KinesisSource): raise TypeError( "Comparisons should only involve KinesisSource class objects." ) if ( self.kinesis_options.record_format != other.kinesis_options.record_format or self.kinesis_options.region != other.kinesis_options.region or self.kinesis_options.stream_name != other.kinesis_options.stream_name ): return False return True @property def kinesis_options(self): """ Returns the kinesis options of this data source """ return self._kinesis_options @kinesis_options.setter def kinesis_options(self, kinesis_options): """ Sets the kinesis options of this data source """ self._kinesis_options = kinesis_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KINESIS, field_mapping=self.field_mapping, kinesis_options=self.kinesis_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto