Source code for feast.infra.offline_stores.bigquery_source

from typing import Callable, Dict, Iterable, List, Optional, Tuple

from typeguard import typechecked

from feast import type_map
from feast.data_source import DataSource
from feast.errors import DataSourceNoNameException, DataSourceNotFoundException
from feast.feature_logging import LoggingDestination
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.FeatureService_pb2 import (
    LoggingConfig as LoggingConfigProto,
from feast.protos.feast.core.SavedDataset_pb2 import (
    SavedDatasetStorage as SavedDatasetStorageProto,
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import get_user_agent
from feast.value_type import ValueType

[docs]@typechecked class BigQuerySource(DataSource): def __init__( self, *, name: Optional[str] = None, timestamp_field: Optional[str] = None, table: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, query: Optional[str] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", ): """Create a BigQuerySource from an existing table or query. Args: name (optional): Name for the source. Defaults to the table if not specified, in which case the table must be specified. timestamp_field (optional): Event timestamp field used for point in time joins of feature values. table (optional): BigQuery table where the features are stored. Exactly one of 'table' and 'query' must be specified. table (optional): The BigQuery table where features can be found. created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. field_mapping (optional): A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. query (optional): The query to be executed to obtain the features. Exactly one of 'table' and 'query' must be specified. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the bigquery source, typically the email of the primary maintainer. Example: >>> from feast import BigQuerySource >>> my_bigquery_source = BigQuerySource(table="gcp_project:bq_dataset.bq_table") """ if table is None and query is None: raise ValueError('No "table" or "query" argument provided.') self.bigquery_options = BigQueryOptions(table=table, query=query) # If no name, use the table as the default name. if name is None and table is None: raise DataSourceNoNameException() name = name or table assert name super().__init__( name=name, timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping, description=description, tags=tags, owner=owner, ) # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): return super().__hash__() def __eq__(self, other): if not isinstance(other, BigQuerySource): raise TypeError( "Comparisons should only involve BigQuerySource class objects." ) return ( super().__eq__(other) and self.table == other.table and self.query == other.query ) @property def table(self): return self.bigquery_options.table @property def query(self): return self.bigquery_options.query
[docs] @staticmethod def from_proto(data_source: DataSourceProto): assert data_source.HasField("bigquery_options") return BigQuerySource(, field_mapping=dict(data_source.field_mapping), table=data_source.bigquery_options.table, timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, query=data_source.bigquery_options.query, description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, )
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto(, type=DataSourceProto.BATCH_BIGQUERY, field_mapping=self.field_mapping, bigquery_options=self.bigquery_options.to_proto(), description=self.description, tags=self.tags, owner=self.owner, timestamp_field=self.timestamp_field, created_timestamp_column=self.created_timestamp_column, ) return data_source_proto
[docs] def validate(self, config: RepoConfig): if not self.query: from google.api_core.exceptions import NotFound from import bigquery client = bigquery.Client() try: client.get_table(self.table) except NotFound: raise DataSourceNotFoundException(self.table)
[docs] def get_table_query_string(self) -> str: """Returns a string that can directly be used to reference this table in SQL""" if self.table: return f"`{self.table}`" else: return f"({self.query})"
[docs] @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: return type_map.bq_to_feast_value_type
[docs] def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: try: from google.api_core import client_info as http_client_info except ImportError as e: from feast.errors import FeastExtrasDependencyImportError raise FeastExtrasDependencyImportError("gcp", str(e)) from import bigquery project_id = ( config.offline_store.billing_project_id or config.offline_store.project_id ) client = bigquery.Client( project=project_id, location=config.offline_store.location, client_info=http_client_info.ClientInfo(user_agent=get_user_agent()), ) if self.table: schema = client.get_table(self.table).schema if not isinstance(schema[0], bigquery.schema.SchemaField): raise TypeError("Could not parse BigQuery table schema.") else: bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 0" query_res = client.query(bq_columns_query).result() schema = query_res.schema name_type_pairs: List[Tuple[str, str]] = [] for field in schema: bq_type_as_str = field.field_type if field.mode == "REPEATED": bq_type_as_str = "ARRAY<" + bq_type_as_str + ">" name_type_pairs.append((, bq_type_as_str)) return name_type_pairs
[docs]class BigQueryOptions: """ Configuration options for a BigQuery data source. """ def __init__( self, table: Optional[str], query: Optional[str], ): self.table = table or "" self.query = query or ""
[docs] @classmethod def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): """ Creates a BigQueryOptions from a protobuf representation of a BigQuery option Args: bigquery_options_proto: A protobuf representation of a DataSource Returns: Returns a BigQueryOptions object based on the bigquery_options protobuf """ bigquery_options = cls( table=bigquery_options_proto.table, query=bigquery_options_proto.query, ) return bigquery_options
[docs] def to_proto(self) -> DataSourceProto.BigQueryOptions: """ Converts an BigQueryOptionsProto object to its protobuf representation. Returns: BigQueryOptionsProto protobuf """ bigquery_options_proto = DataSourceProto.BigQueryOptions( table=self.table, query=self.query, ) return bigquery_options_proto
[docs]class SavedDatasetBigQueryStorage(SavedDatasetStorage): _proto_attr_name = "bigquery_storage" bigquery_options: BigQueryOptions def __init__(self, table: str): self.bigquery_options = BigQueryOptions(table=table, query=None)
[docs] @staticmethod def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: return SavedDatasetBigQueryStorage( table=BigQueryOptions.from_proto(storage_proto.bigquery_storage).table )
[docs] def to_proto(self) -> SavedDatasetStorageProto: return SavedDatasetStorageProto( bigquery_storage=self.bigquery_options.to_proto() )
[docs] def to_data_source(self) -> DataSource: return BigQuerySource(table=self.bigquery_options.table)
[docs]class BigQueryLoggingDestination(LoggingDestination): _proto_kind = "bigquery_destination" table: str def __init__(self, *, table_ref): self.table = table_ref
[docs] @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": return BigQueryLoggingDestination( table_ref=config_proto.bigquery_destination.table_ref, )
[docs] def to_data_source(self) -> DataSource: return BigQuerySource(table=self.table)
[docs] def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( bigquery_destination=LoggingConfigProto.BigQueryDestination( table_ref=self.table ) )