import time
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from typing import List, Optional, Union
import pandas
import pyarrow
from jinja2 import BaseLoader, Environment
from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
RetrievalJob,
_get_requested_feature_views_to_features_dict,
)
from feast.registry import Registry
from feast.repo_config import BigQueryOfflineStoreConfig, RepoConfig
try:
from google.api_core.exceptions import NotFound
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import bigquery
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError
raise FeastExtrasDependencyImportError("gcp", str(e))
[docs]class BigQueryOfflineStore(OfflineStore):
[docs] @staticmethod
def pull_latest_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
assert isinstance(data_source, BigQuerySource)
from_expression = data_source.get_table_query_string()
partition_by_join_key_string = ", ".join(join_key_columns)
if partition_by_join_key_string != "":
partition_by_join_key_string = (
"PARTITION BY " + partition_by_join_key_string
)
timestamps = [event_timestamp_column]
if created_timestamp_column:
timestamps.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)
query = f"""
SELECT {field_string}
FROM (
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
)
WHERE _feast_row = 1
"""
return BigQueryOfflineStore._pull_query(query)
@staticmethod
def _pull_query(query: str) -> pyarrow.Table:
client = _get_bigquery_client()
query_job = client.query(query)
return query_job.to_arrow()
[docs] @staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
client = _get_bigquery_client()
if type(entity_df) is str:
entity_df_job = client.query(entity_df)
entity_df_result = entity_df_job.result() # also starts job
entity_df_event_timestamp_col = _infer_event_timestamp_from_bigquery_query(
entity_df_result
)
entity_df_sql_table = f"`{entity_df_job.destination.project}.{entity_df_job.destination.dataset_id}.{entity_df_job.destination.table_id}`"
elif isinstance(entity_df, pandas.DataFrame):
entity_df_event_timestamp_col = _infer_event_timestamp_from_dataframe(
entity_df
)
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
table_id = _upload_entity_df_into_bigquery(
config.project, config.offline_store.dataset, entity_df, client
)
entity_df_sql_table = f"`{table_id}`"
else:
raise ValueError(
f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "
f"but we found: {type(entity_df)} "
)
# Build a query context containing all information required to template the BigQuery SQL query
query_context = get_feature_view_query_context(
feature_refs, feature_views, registry, project
)
# TODO: Infer min_timestamp and max_timestamp from entity_df
# Generate the BigQuery SQL query from the query context
query = build_point_in_time_query(
query_context,
min_timestamp=datetime.now() - timedelta(days=365),
max_timestamp=datetime.now() + timedelta(days=1),
left_table_query_string=entity_df_sql_table,
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
)
job = BigQueryRetrievalJob(query=query, client=client)
return job
def _infer_event_timestamp_from_bigquery_query(entity_df_result) -> str:
if any(
schema_field.name == DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
for schema_field in entity_df_result.schema
):
return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
else:
datetime_columns = list(
filter(
lambda schema_field: schema_field.field_type == "TIMESTAMP",
entity_df_result.schema,
)
)
if len(datetime_columns) == 1:
print(
f"Using {datetime_columns[0].name} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
)
return datetime_columns[0].name
else:
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)
def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str:
if DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL in entity_df.columns:
return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
else:
datetime_columns = entity_df.select_dtypes(
include=["datetime", "datetimetz"]
).columns
if len(datetime_columns) == 1:
print(
f"Using {datetime_columns[0]} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
)
return datetime_columns[0]
else:
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)
[docs]class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query, client):
self.query = query
self.client = client
[docs] def to_df(self):
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df
[docs]@dataclass(frozen=True)
class FeatureViewQueryContext:
"""Context object used to template a BigQuery point-in-time SQL query"""
name: str
ttl: int
entities: List[str]
features: List[str] # feature reference format
table_ref: str
event_timestamp_column: str
created_timestamp_column: Optional[str]
query: str
table_subquery: str
entity_selections: List[str]
def _upload_entity_df_into_bigquery(project, dataset_name, entity_df, client) -> str:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""
# First create the BigQuery dataset if it doesn't exist
dataset = bigquery.Dataset(f"{client.project}.{dataset_name}")
dataset.location = "US"
try:
client.get_dataset(dataset)
except NotFound:
# Only create the dataset if it does not exist
client.create_dataset(dataset, exists_ok=True)
# Drop the index so that we dont have unnecessary columns
entity_df.reset_index(drop=True, inplace=True)
# Upload the dataframe into BigQuery, creating a temporary table
job_config = bigquery.LoadJobConfig()
table_id = f"{client.project}.{dataset_name}.entity_df_{project}_{int(time.time())}"
job = client.load_table_from_dataframe(entity_df, table_id, job_config=job_config,)
job.result()
# Ensure that the table expires after some time
table = client.get_table(table=table_id)
table.expires = datetime.utcnow() + timedelta(minutes=30)
client.update_table(table, ["expires"])
return table_id
[docs]def get_feature_view_query_context(
feature_refs: List[str],
feature_views: List[FeatureView],
registry: Registry,
project: str,
) -> List[FeatureViewQueryContext]:
"""Build a query context containing all information required to template a BigQuery point-in-time SQL query"""
feature_views_to_feature_map = _get_requested_feature_views_to_features_dict(
feature_refs, feature_views
)
query_context = []
for feature_view, features in feature_views_to_feature_map.items():
join_keys = []
entity_selections = []
reverse_field_mapping = {
v: k for k, v in feature_view.input.field_mapping.items()
}
for entity_name in feature_view.entities:
entity = registry.get_entity(entity_name, project)
join_keys.append(entity.join_key)
join_key_column = reverse_field_mapping.get(
entity.join_key, entity.join_key
)
entity_selections.append(f"{join_key_column} AS {entity.join_key}")
if isinstance(feature_view.ttl, timedelta):
ttl_seconds = int(feature_view.ttl.total_seconds())
else:
ttl_seconds = 0
assert isinstance(feature_view.input, BigQuerySource)
event_timestamp_column = feature_view.input.event_timestamp_column
created_timestamp_column = feature_view.input.created_timestamp_column
context = FeatureViewQueryContext(
name=feature_view.name,
ttl=ttl_seconds,
entities=join_keys,
features=features,
table_ref=feature_view.input.table_ref,
event_timestamp_column=reverse_field_mapping.get(
event_timestamp_column, event_timestamp_column
),
created_timestamp_column=reverse_field_mapping.get(
created_timestamp_column, created_timestamp_column
),
# TODO: Make created column optional and not hardcoded
query=feature_view.input.query,
table_subquery=feature_view.input.get_table_query_string(),
entity_selections=entity_selections,
)
query_context.append(context)
return query_context
[docs]def build_point_in_time_query(
feature_view_query_contexts: List[FeatureViewQueryContext],
min_timestamp: datetime,
max_timestamp: datetime,
left_table_query_string: str,
entity_df_event_timestamp_col: str,
):
"""Build point-in-time query between each feature view table and the entity dataframe"""
template = Environment(loader=BaseLoader()).from_string(
source=SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN
)
# Add additional fields to dict
template_context = {
"min_timestamp": min_timestamp,
"max_timestamp": max_timestamp,
"left_table_query_string": left_table_query_string,
"entity_df_event_timestamp_col": entity_df_event_timestamp_col,
"unique_entity_keys": set(
[entity for fv in feature_view_query_contexts for entity in fv.entities]
),
"featureviews": [asdict(context) for context in feature_view_query_contexts],
}
query = template.render(template_context)
return query
def _get_bigquery_client():
try:
client = bigquery.Client()
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account"
)
except EnvironmentError as e:
raise FeastProviderLoginError(
"GCP error: "
+ str(e)
+ "\nIt may be necessary to set a default GCP project by running "
'"gcloud config set project your-project"'
)
return client
# TODO: Optimizations
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
# * Create temporary tables instead of keeping all tables in memory
SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
WITH entity_dataframe AS (
SELECT
*,
CONCAT(
{% for entity_key in unique_entity_keys %}
CAST({{entity_key}} AS STRING),
{% endfor %}
CAST({{entity_df_event_timestamp_col}} AS STRING)
) AS entity_row_unique_id
FROM {{ left_table_query_string }}
),
{% for featureview in featureviews %}
/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.
1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
Feature values are joined to this table later for improved efficiency.
featureview_timestamp is equal to null in rows from the entity dataset.
*/
{{ featureview.name }}__union_features AS (
SELECT
-- unique identifier for each row in the entity dataset.
entity_row_unique_id,
-- event_timestamp contains the timestamps to join onto
{{entity_df_event_timestamp_col}} AS event_timestamp,
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
NULL as {{ featureview.name }}_feature_timestamp,
-- created timestamp of the feature at the corresponding feature_timestamp
{{ 'NULL as created_timestamp,' if featureview.created_timestamp_column else '' }}
-- select only entities belonging to this feature set
{{ featureview.entities | join(', ')}},
-- boolean for filtering the dataset later
true AS is_entity_table
FROM entity_dataframe
UNION ALL
SELECT
NULL as entity_row_unique_id,
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
false AS is_entity_table
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
),
/*
2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
well as is_entity_table.
Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
in the rows from the entity table should now contain the latest timestamps relative to the row's
event_timestamp.
For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
feature_timestamp to null.
*/
{{ featureview.name }}__joined AS (
SELECT
entity_row_unique_id,
event_timestamp,
{{ featureview.entities | join(', ')}},
{% for feature in featureview.features %}
IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM (
SELECT
entity_row_unique_id,
event_timestamp,
{{ featureview.entities | join(', ')}},
{{ 'FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,' if featureview.created_timestamp_column else '' }}
FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp,
is_entity_table
FROM {{ featureview.name }}__union_features
WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC{{', created_timestamp DESC' if featureview.created_timestamp_column else ''}} ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
)
/*
3. Select only the rows from the entity table, and join the features from the original feature set table
to the dataset using the entity values, feature_timestamp, and created_timestamps.
*/
LEFT JOIN (
SELECT
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}},
{% for feature in featureview.features %}
{{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %}
) USING ({{ featureview.name }}_feature_timestamp,{{ ' created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entities | join(', ')}})
WHERE is_entity_table
),
/*
4. Finally, deduplicate the rows by selecting the first occurrence of each entity table entity_row_unique_id.
*/
{{ featureview.name }}__deduped AS (SELECT
k.*
FROM (
SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k
FROM {{ featureview.name }}__joined row
GROUP BY entity_row_unique_id
)){% if loop.last %}{% else %}, {% endif %}
{% endfor %}
/*
Joins the outputs of multiple time travel joins to a single table.
*/
SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (entity_row_unique_id, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf
{% for featureview in featureviews %}
LEFT JOIN (
SELECT
entity_row_unique_id,
{% for feature in featureview.features %}
{{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.name }}__deduped
) USING (entity_row_unique_id)
{% endfor %}
ORDER BY {{entity_df_event_timestamp_col}}
"""