Source code for feast.data_source

# Copyright 2020 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import enum
import re
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow.parquet import ParquetFile

from feast import type_map
from feast.data_format import FileFormat, StreamFormat
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.value_type import ValueType


[docs]class SourceType(enum.Enum): """ DataSource value type. Used to define source types in DataSource. """ UNKNOWN = 0 BATCH_FILE = 1 BATCH_BIGQUERY = 2 STREAM_KAFKA = 3 STREAM_KINESIS = 4
[docs]class FileOptions: """ DataSource File options used to source features from a file """ def __init__( self, file_format: Optional[FileFormat], file_url: Optional[str], ): self._file_format = file_format self._file_url = file_url @property def file_format(self): """ Returns the file format of this file """ return self._file_format @file_format.setter def file_format(self, file_format): """ Sets the file format of this file """ self._file_format = file_format @property def file_url(self): """ Returns the file url of this file """ return self._file_url @file_url.setter def file_url(self, file_url): """ Sets the file url of this file """ self._file_url = file_url
[docs] @classmethod def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): """ Creates a FileOptions from a protobuf representation of a file option args: file_options_proto: a protobuf representation of a datasource Returns: Returns a FileOptions object based on the file_options protobuf """ file_options = cls( file_format=FileFormat.from_proto(file_options_proto.file_format), file_url=file_options_proto.file_url, ) return file_options
[docs] def to_proto(self) -> DataSourceProto.FileOptions: """ Converts an FileOptionsProto object to its protobuf representation. Returns: FileOptionsProto protobuf """ file_options_proto = DataSourceProto.FileOptions( file_format=( None if self.file_format is None else self.file_format.to_proto() ), file_url=self.file_url, ) return file_options_proto
[docs]class BigQueryOptions: """ DataSource BigQuery options used to source features from BigQuery query """ def __init__(self, table_ref: Optional[str], query: Optional[str]): self._table_ref = table_ref self._query = query @property def query(self): """ Returns the BigQuery SQL query referenced by this source """ return self._query @query.setter def query(self, query): """ Sets the BigQuery SQL query referenced by this source """ self._query = query @property def table_ref(self): """ Returns the table ref of this BQ table """ return self._table_ref @table_ref.setter def table_ref(self, table_ref): """ Sets the table ref of this BQ table """ self._table_ref = table_ref
[docs] @classmethod def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): """ Creates a BigQueryOptions from a protobuf representation of a BigQuery option Args: bigquery_options_proto: A protobuf representation of a DataSource Returns: Returns a BigQueryOptions object based on the bigquery_options protobuf """ bigquery_options = cls( table_ref=bigquery_options_proto.table_ref, query=bigquery_options_proto.query, ) return bigquery_options
[docs] def to_proto(self) -> DataSourceProto.BigQueryOptions: """ Converts an BigQueryOptionsProto object to its protobuf representation. Returns: BigQueryOptionsProto protobuf """ bigquery_options_proto = DataSourceProto.BigQueryOptions( table_ref=self.table_ref, query=self.query, ) return bigquery_options_proto
[docs]class KafkaOptions: """ DataSource Kafka options used to source features from Kafka messages """ def __init__( self, bootstrap_servers: str, message_format: StreamFormat, topic: str, ): self._bootstrap_servers = bootstrap_servers self._message_format = message_format self._topic = topic @property def bootstrap_servers(self): """ Returns a comma-separated list of Kafka bootstrap servers """ return self._bootstrap_servers @bootstrap_servers.setter def bootstrap_servers(self, bootstrap_servers): """ Sets a comma-separated list of Kafka bootstrap servers """ self._bootstrap_servers = bootstrap_servers @property def message_format(self): """ Returns the data format that is used to encode the feature data in Kafka messages """ return self._message_format @message_format.setter def message_format(self, message_format): """ Sets the data format that is used to encode the feature data in Kafka messages """ self._message_format = message_format @property def topic(self): """ Returns the Kafka topic to collect feature data from """ return self._topic @topic.setter def topic(self, topic): """ Sets the Kafka topic to collect feature data from """ self._topic = topic
[docs] @classmethod def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): """ Creates a KafkaOptions from a protobuf representation of a kafka option Args: kafka_options_proto: A protobuf representation of a DataSource Returns: Returns a BigQueryOptions object based on the kafka_options protobuf """ kafka_options = cls( bootstrap_servers=kafka_options_proto.bootstrap_servers, message_format=StreamFormat.from_proto(kafka_options_proto.message_format), topic=kafka_options_proto.topic, ) return kafka_options
[docs] def to_proto(self) -> DataSourceProto.KafkaOptions: """ Converts an KafkaOptionsProto object to its protobuf representation. Returns: KafkaOptionsProto protobuf """ kafka_options_proto = DataSourceProto.KafkaOptions( bootstrap_servers=self.bootstrap_servers, message_format=self.message_format.to_proto(), topic=self.topic, ) return kafka_options_proto
[docs]class KinesisOptions: """ DataSource Kinesis options used to source features from Kinesis records """ def __init__( self, record_format: StreamFormat, region: str, stream_name: str, ): self._record_format = record_format self._region = region self._stream_name = stream_name @property def record_format(self): """ Returns the data format used to encode the feature data in the Kinesis records. """ return self._record_format @record_format.setter def record_format(self, record_format): """ Sets the data format used to encode the feature data in the Kinesis records. """ self._record_format = record_format @property def region(self): """ Returns the AWS region of Kinesis stream """ return self._region @region.setter def region(self, region): """ Sets the AWS region of Kinesis stream """ self._region = region @property def stream_name(self): """ Returns the Kinesis stream name to obtain feature data from """ return self._stream_name @stream_name.setter def stream_name(self, stream_name): """ Sets the Kinesis stream name to obtain feature data from """ self._stream_name = stream_name
[docs] @classmethod def from_proto(cls, kinesis_options_proto: DataSourceProto.KinesisOptions): """ Creates a KinesisOptions from a protobuf representation of a kinesis option Args: kinesis_options_proto: A protobuf representation of a DataSource Returns: Returns a KinesisOptions object based on the kinesis_options protobuf """ kinesis_options = cls( record_format=StreamFormat.from_proto(kinesis_options_proto.record_format), region=kinesis_options_proto.region, stream_name=kinesis_options_proto.stream_name, ) return kinesis_options
[docs] def to_proto(self) -> DataSourceProto.KinesisOptions: """ Converts an KinesisOptionsProto object to its protobuf representation. Returns: KinesisOptionsProto protobuf """ kinesis_options_proto = DataSourceProto.KinesisOptions( record_format=self.record_format.to_proto(), region=self.region, stream_name=self.stream_name, ) return kinesis_options_proto
[docs]class DataSource: """ DataSource that can be used source features """ def __init__( self, event_timestamp_column: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): self._event_timestamp_column = event_timestamp_column self._created_timestamp_column = created_timestamp_column self._field_mapping = field_mapping if field_mapping else {} self._date_partition_column = date_partition_column def __eq__(self, other): if not isinstance(other, DataSource): raise TypeError("Comparisons should only involve DataSource class objects.") if ( self.event_timestamp_column != other.event_timestamp_column or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column ): return False return True @property def field_mapping(self): """ Returns the field mapping of this data source """ return self._field_mapping @field_mapping.setter def field_mapping(self, field_mapping): """ Sets the field mapping of this data source """ self._field_mapping = field_mapping @property def event_timestamp_column(self): """ Returns the event timestamp column of this data source """ return self._event_timestamp_column @event_timestamp_column.setter def event_timestamp_column(self, event_timestamp_column): """ Sets the event timestamp column of this data source """ self._event_timestamp_column = event_timestamp_column @property def created_timestamp_column(self): """ Returns the created timestamp column of this data source """ return self._created_timestamp_column @created_timestamp_column.setter def created_timestamp_column(self, created_timestamp_column): """ Sets the created timestamp column of this data source """ self._created_timestamp_column = created_timestamp_column @property def date_partition_column(self): """ Returns the date partition column of this data source """ return self._date_partition_column @date_partition_column.setter def date_partition_column(self, date_partition_column): """ Sets the date partition column of this data source """ self._date_partition_column = date_partition_column
[docs] @staticmethod def from_proto(data_source): """ Convert data source config in FeatureTable spec to a DataSource class object. """ if data_source.file_options.file_format and data_source.file_options.file_url: data_source_obj = FileSource( field_mapping=data_source.field_mapping, file_format=FileFormat.from_proto(data_source.file_options.file_format), path=data_source.file_options.file_url, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif ( data_source.bigquery_options.table_ref or data_source.bigquery_options.query ): data_source_obj = BigQuerySource( field_mapping=data_source.field_mapping, table_ref=data_source.bigquery_options.table_ref, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, query=data_source.bigquery_options.query, ) elif ( data_source.kafka_options.bootstrap_servers and data_source.kafka_options.topic and data_source.kafka_options.message_format ): data_source_obj = KafkaSource( field_mapping=data_source.field_mapping, bootstrap_servers=data_source.kafka_options.bootstrap_servers, message_format=StreamFormat.from_proto( data_source.kafka_options.message_format ), topic=data_source.kafka_options.topic, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif ( data_source.kinesis_options.record_format and data_source.kinesis_options.region and data_source.kinesis_options.stream_name ): data_source_obj = KinesisSource( field_mapping=data_source.field_mapping, record_format=StreamFormat.from_proto( data_source.kinesis_options.record_format ), region=data_source.kinesis_options.region, stream_name=data_source.kinesis_options.stream_name, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) else: raise ValueError("Could not identify the source type being added") return data_source_obj
[docs] def to_proto(self) -> DataSourceProto: """ Converts an DataSourceProto object to its protobuf representation. """ raise NotImplementedError
def _infer_event_timestamp_column(self, ts_column_type_regex_pattern): ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column" USER_GUIDANCE = "Please specify event_timestamp_column explicitly." if isinstance(self, FileSource) or isinstance(self, BigQuerySource): event_timestamp_column, matched_flag = None, False for col_name, col_datatype in self.get_table_column_names_and_types(): if re.match(ts_column_type_regex_pattern, col_datatype): if matched_flag: raise TypeError( f""" {ERROR_MSG_PREFIX} due to multiple possible columns satisfying the criteria. {USER_GUIDANCE} """ ) matched_flag = True event_timestamp_column = col_name if matched_flag: return event_timestamp_column else: raise TypeError( f""" {ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria. {USER_GUIDANCE} """ ) else: raise TypeError( f""" {ERROR_MSG_PREFIX} because this DataSource currently does not support this inference. {USER_GUIDANCE} """ )
[docs]class FileSource(DataSource): def __init__( self, event_timestamp_column: Optional[str] = None, file_url: Optional[str] = None, path: Optional[str] = None, file_format: FileFormat = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): """Create a FileSource from a file containing feature data. Only Parquet format supported. Args: path: File path to file containing feature data. Must contain an event_timestamp column, entity columns and feature columns. event_timestamp_column: Event timestamp column used for point in time joins of feature values. created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. file_url: [Deprecated] Please see path file_format (optional): Explicitly set the file format. Allows Feast to bypass inferring the file format. field_mapping: A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. Examples: >>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime") """ if path is None and file_url is None: raise ValueError( 'No "path" argument provided. Please set "path" to the location of your file source.' ) if file_url: from warnings import warn warn( 'Argument "file_url" is being deprecated. Please use the "path" argument.' ) else: file_url = path self._file_options = FileOptions(file_format=file_format, file_url=file_url) super().__init__( event_timestamp_column or self._infer_event_timestamp_column(r"timestamp\[\w\w\]"), created_timestamp_column, field_mapping, date_partition_column, ) def __eq__(self, other): if not isinstance(other, FileSource): raise TypeError("Comparisons should only involve FileSource class objects.") return ( self.file_options.file_url == other.file_options.file_url and self.file_options.file_format == other.file_options.file_format and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping ) @property def file_options(self): """ Returns the file options of this data source """ return self._file_options @file_options.setter def file_options(self, file_options): """ Sets the file options of this data source """ self._file_options = file_options @property def path(self): """ Returns the file path of this feature data source """ return self._file_options.file_url
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.BATCH_FILE, field_mapping=self.field_mapping, file_options=self.file_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: return type_map.pa_to_feast_value_type
[docs] def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]: schema = ParquetFile(self.path).schema_arrow return zip(schema.names, map(str, schema.types))
[docs]class BigQuerySource(DataSource): def __init__( self, event_timestamp_column: Optional[str] = None, table_ref: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", query: Optional[str] = None, ): self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) super().__init__( event_timestamp_column or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"), created_timestamp_column, field_mapping, date_partition_column, ) def __eq__(self, other): if not isinstance(other, BigQuerySource): raise TypeError( "Comparisons should only involve BigQuerySource class objects." ) return ( self.bigquery_options.table_ref == other.bigquery_options.table_ref and self.bigquery_options.query == other.bigquery_options.query and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping ) @property def table_ref(self): return self._bigquery_options.table_ref @property def query(self): return self._bigquery_options.query @property def bigquery_options(self): """ Returns the bigquery options of this data source """ return self._bigquery_options @bigquery_options.setter def bigquery_options(self, bigquery_options): """ Sets the bigquery options of this data source """ self._bigquery_options = bigquery_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.BATCH_BIGQUERY, field_mapping=self.field_mapping, bigquery_options=self.bigquery_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs] def get_table_query_string(self) -> str: """Returns a string that can directly be used to reference this table in SQL""" if self.table_ref: return f"`{self.table_ref}`" else: return f"({self.query})"
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: return type_map.bq_to_feast_value_type
[docs] def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]: from google.cloud import bigquery client = bigquery.Client() bq_columns_query = "" name_type_pairs = [] if self.table_ref is not None: project_id, dataset_id, table_id = self.table_ref.split(".") bq_columns_query = f""" SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_id}' """ table_schema = ( client.query(bq_columns_query).result().to_dataframe_iterable() ) for df in table_schema: name_type_pairs.extend( list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list())) ) else: bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1" queryRes = client.query(bq_columns_query).result() name_type_pairs = [ (schema_field.name, schema_field.field_type) for schema_field in queryRes.schema ] return name_type_pairs
[docs]class KafkaSource(DataSource): def __init__( self, event_timestamp_column: str, bootstrap_servers: str, message_format: StreamFormat, topic: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, message_format=message_format, topic=topic, ) def __eq__(self, other): if not isinstance(other, KafkaSource): raise TypeError( "Comparisons should only involve KafkaSource class objects." ) if ( self.kafka_options.bootstrap_servers != other.kafka_options.bootstrap_servers or self.kafka_options.message_format != other.kafka_options.message_format or self.kafka_options.topic != other.kafka_options.topic ): return False return True @property def kafka_options(self): """ Returns the kafka options of this data source """ return self._kafka_options @kafka_options.setter def kafka_options(self, kafka_options): """ Sets the kafka options of this data source """ self._kafka_options = kafka_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KAFKA, field_mapping=self.field_mapping, kafka_options=self.kafka_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs]class KinesisSource(DataSource): def __init__( self, event_timestamp_column: str, created_timestamp_column: str, record_format: StreamFormat, region: str, stream_name: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._kinesis_options = KinesisOptions( record_format=record_format, region=region, stream_name=stream_name ) def __eq__(self, other): if not isinstance(other, KinesisSource): raise TypeError( "Comparisons should only involve KinesisSource class objects." ) if ( self.kinesis_options.record_format != other.kinesis_options.record_format or self.kinesis_options.region != other.kinesis_options.region or self.kinesis_options.stream_name != other.kinesis_options.stream_name ): return False return True @property def kinesis_options(self): """ Returns the kinesis options of this data source """ return self._kinesis_options @kinesis_options.setter def kinesis_options(self, kinesis_options): """ Sets the kinesis options of this data source """ self._kinesis_options = kinesis_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KINESIS, field_mapping=self.field_mapping, kinesis_options=self.kinesis_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto