Source code for feast.infra.offline_stores.bigquery_source

from typing import Callable, Dict, Iterable, Optional, Tuple

from feast import type_map
from feast.data_source import DataSource
from feast.errors import DataSourceNotFoundException
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig
from feast.value_type import ValueType


[docs]class BigQuerySource(DataSource): def __init__( self, event_timestamp_column: Optional[str] = "", table_ref: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", query: Optional[str] = None, ): self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) def __eq__(self, other): if not isinstance(other, BigQuerySource): raise TypeError( "Comparisons should only involve BigQuerySource class objects." ) return ( self.bigquery_options.table_ref == other.bigquery_options.table_ref and self.bigquery_options.query == other.bigquery_options.query and self.event_timestamp_column == other.event_timestamp_column and self.created_timestamp_column == other.created_timestamp_column and self.field_mapping == other.field_mapping ) @property def table_ref(self): return self._bigquery_options.table_ref @property def query(self): return self._bigquery_options.query @property def bigquery_options(self): """ Returns the bigquery options of this data source """ return self._bigquery_options @bigquery_options.setter def bigquery_options(self, bigquery_options): """ Sets the bigquery options of this data source """ self._bigquery_options = bigquery_options
[docs] @staticmethod def from_proto(data_source: DataSourceProto): assert data_source.HasField("bigquery_options") return BigQuerySource( field_mapping=dict(data_source.field_mapping), table_ref=data_source.bigquery_options.table_ref, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, query=data_source.bigquery_options.query, )
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.BATCH_BIGQUERY, field_mapping=self.field_mapping, bigquery_options=self.bigquery_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs] def validate(self, config: RepoConfig): if not self.query: from google.api_core.exceptions import NotFound from google.cloud import bigquery client = bigquery.Client() try: client.get_table(self.table_ref) except NotFound: raise DataSourceNotFoundException(self.table_ref)
[docs] def get_table_query_string(self) -> str: """Returns a string that can directly be used to reference this table in SQL""" if self.table_ref: return f"`{self.table_ref}`" else: return f"({self.query})"
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: return type_map.bq_to_feast_value_type
[docs] def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: from google.cloud import bigquery client = bigquery.Client() if self.table_ref is not None: table_schema = client.get_table(self.table_ref).schema if not isinstance(table_schema[0], bigquery.schema.SchemaField): raise TypeError("Could not parse BigQuery table schema.") name_type_pairs = [(field.name, field.field_type) for field in table_schema] else: bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1" queryRes = client.query(bq_columns_query).result() name_type_pairs = [ (schema_field.name, schema_field.field_type) for schema_field in queryRes.schema ] return name_type_pairs
class BigQueryOptions: """ DataSource BigQuery options used to source features from BigQuery query """ def __init__(self, table_ref: Optional[str], query: Optional[str]): self._table_ref = table_ref self._query = query @property def query(self): """ Returns the BigQuery SQL query referenced by this source """ return self._query @query.setter def query(self, query): """ Sets the BigQuery SQL query referenced by this source """ self._query = query @property def table_ref(self): """ Returns the table ref of this BQ table """ return self._table_ref @table_ref.setter def table_ref(self, table_ref): """ Sets the table ref of this BQ table """ self._table_ref = table_ref @classmethod def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): """ Creates a BigQueryOptions from a protobuf representation of a BigQuery option Args: bigquery_options_proto: A protobuf representation of a DataSource Returns: Returns a BigQueryOptions object based on the bigquery_options protobuf """ bigquery_options = cls( table_ref=bigquery_options_proto.table_ref, query=bigquery_options_proto.query, ) return bigquery_options def to_proto(self) -> DataSourceProto.BigQueryOptions: """ Converts an BigQueryOptionsProto object to its protobuf representation. Returns: BigQueryOptionsProto protobuf """ bigquery_options_proto = DataSourceProto.BigQueryOptions( table_ref=self.table_ref, query=self.query, ) return bigquery_options_proto