feast.infra.offline_stores.contrib.trino_offline_store package

Subpackages

Submodules

feast.infra.offline_stores.contrib.trino_offline_store.trino module

class feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStore[source]

Bases: feast.infra.offline_stores.offline_store.OfflineStore

static get_historical_features(config: feast.repo_config.RepoConfig, feature_views: List[feast.feature_view.FeatureView], feature_refs: List[str], entity_df: Union[pandas.core.frame.DataFrame, str], registry: feast.registry.Registry, project: str, full_feature_names: bool = False, user: str = 'user', auth: Optional[trino.auth.Authentication] = None, http_scheme: Optional[str] = None) feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoRetrievalJob[source]
static pull_all_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], event_timestamp_column: str, start_date: datetime.datetime, end_date: datetime.datetime, user: str = 'user', auth: Optional[trino.auth.Authentication] = None, http_scheme: Optional[str] = None) feast.infra.offline_stores.offline_store.RetrievalJob[source]

Returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date.

Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • event_timestamp_column – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

static pull_latest_from_table_or_query(config: feast.repo_config.RepoConfig, data_source: feast.data_source.DataSource, join_key_columns: List[str], feature_name_columns: List[str], event_timestamp_column: str, created_timestamp_column: Optional[str], start_date: datetime.datetime, end_date: datetime.datetime, user: str = 'user', auth: Optional[trino.auth.Authentication] = None, http_scheme: Optional[str] = None) feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoRetrievalJob[source]

This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store. This method is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pulls data from the offline store, and the FeatureStore class is used to write this data into the online store.

Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed into this function.

Parameters
  • config – Repo configuration object

  • data_source – Data source to pull all of the columns from

  • join_key_columns – Columns of the join keys

  • feature_name_columns – Columns of the feature names needed

  • event_timestamp_column – Timestamp column

  • start_date – Starting date of query

  • end_date – Ending date of query

class feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoOfflineStoreConfig(*, type: pydantic.types.StrictStr = 'trino', host: pydantic.types.StrictStr, port: int, catalog: pydantic.types.StrictStr, connector: Dict[str, str], dataset: pydantic.types.StrictStr = 'feast')[source]

Bases: feast.repo_config.FeastConfigBaseModel

Online store config for Trino

catalog: pydantic.types.StrictStr

Catalog of the Trino cluster

connector: Dict[str, str]

Trino connector to use as well as potential extra parameters. Needs to contain at least the path, for example {“type”: “bigquery”} or {“type”: “hive”, “file_format”: “parquet”}

dataset: pydantic.types.StrictStr

(optional) Trino Dataset name for temporary tables

host: pydantic.types.StrictStr

Host of the Trino cluster

port: int

Port of the Trino cluster

type: pydantic.types.StrictStr

Offline store type selector

class feast.infra.offline_stores.contrib.trino_offline_store.trino.TrinoRetrievalJob(query: str, client: feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.Trino, config: feast.repo_config.RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]] = None, metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata] = None)[source]

Bases: feast.infra.offline_stores.offline_store.RetrievalJob

property full_feature_names: bool
property metadata: Optional[feast.infra.offline_stores.offline_store.RetrievalMetadata]

Return metadata information about retrieval. Should be available even before materializing the dataset itself.

property on_demand_feature_views: Optional[List[feast.on_demand_feature_view.OnDemandFeatureView]]
persist(storage: feast.saved_dataset.SavedDatasetStorage)[source]

Run the retrieval and persist the results in the same offline store used for read.

to_sql() str[source]

Returns the SQL query that will be executed in Trino to build the historical feature table

to_trino(destination_table: Optional[str] = None, timeout: int = 1800, retry_cadence: int = 10) Optional[str][source]

Triggers the execution of a historical feature retrieval query and exports the results to a Trino table. Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes). :param timeout: An optional number of seconds for setting the time limit of the QueryJob. :param retry_cadence: An optional number of seconds for setting how long the job should checked for completion.

Returns

Returns the destination table name.

feast.infra.offline_stores.contrib.trino_offline_store.trino_queries module

class feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.Query(query_text: str, cursor: trino.dbapi.Cursor)[source]

Bases: object

cancel(*args)[source]
close()[source]
execute() feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.Results[source]
class feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.QueryStatus(value)[source]

Bases: enum.Enum

An enumeration.

CANCELLED = 4
COMPLETED = 3
ERROR = 2
PENDING = 0
RUNNING = 1
class feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.Results(data: List[List[Any]], columns: List[Dict])[source]

Bases: object

Class for keeping track of the results of a Trino query

columns: List[Dict]
property columns_names: List[str]
data: List[List[Any]]
property pyarrow_schema: pyarrow.lib.Schema
property schema: Dict[str, str]
to_dataframe() pandas.core.frame.DataFrame[source]
class feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.Trino(host: Optional[str] = None, port: Optional[int] = None, user: Optional[str] = None, catalog: Optional[str] = None, auth: Optional[Any] = None, http_scheme: Optional[str] = None)[source]

Bases: object

create_query(query_text: str) feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.Query[source]

Create a Query object without executing it.

execute_query(query_text: str) feast.infra.offline_stores.contrib.trino_offline_store.trino_queries.Results[source]

Create a Query object and execute it.

feast.infra.offline_stores.contrib.trino_offline_store.trino_source module

class feast.infra.offline_stores.contrib.trino_offline_store.trino_source.SavedDatasetTrinoStorage(table: Optional[str] = None, query: Optional[str] = None)[source]

Bases: feast.saved_dataset.SavedDatasetStorage

static from_proto(storage_proto: feast.core.SavedDataset_pb2.SavedDatasetStorage) feast.saved_dataset.SavedDatasetStorage[source]
to_data_source() feast.data_source.DataSource[source]
to_proto() feast.core.SavedDataset_pb2.SavedDatasetStorage[source]
trino_options: feast.infra.offline_stores.contrib.trino_offline_store.trino_source.TrinoOptions
class feast.infra.offline_stores.contrib.trino_offline_store.trino_source.TrinoOptions(table: Optional[str], query: Optional[str])[source]

Bases: object

DataSource Trino options used to source features from Trino query

classmethod from_proto(trino_options_proto: feast.core.DataSource_pb2.TrinoOptions)[source]

Creates a TrinoOptions from a protobuf representation of a Trino option :param trino_options_proto: A protobuf representation of a DataSource

Returns

Returns a TrinoOptions object based on the trino_options protobuf

property query

Returns the Trino SQL query referenced by this source

property table

Returns the table ref of this Trino table

to_proto() feast.core.DataSource_pb2.TrinoOptions[source]

Converts an TrinoOptionsProto object to its protobuf representation. :returns: TrinoOptionsProto protobuf

class feast.infra.offline_stores.contrib.trino_offline_store.trino_source.TrinoSource(*, event_timestamp_column: Optional[str] = '', table: Optional[str] = None, created_timestamp_column: Optional[str] = '', field_mapping: Optional[Dict[str, str]] = None, query: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = '', tags: Optional[Dict[str, str]] = None, owner: Optional[str] = '', timestamp_field: Optional[str] = None)[source]

Bases: feast.data_source.DataSource

created_timestamp_column: str
date_partition_column: str
description: str
field_mapping: Dict[str, str]
static from_proto(data_source: feast.core.DataSource_pb2.DataSource)[source]

Converts data source config in protobuf spec to a DataSource class object.

Parameters

data_source – A protobuf representation of a DataSource.

Returns

A DataSource class object.

Raises

ValueError – The type of DataSource could not be identified.

get_table_column_names_and_types(config: feast.repo_config.RepoConfig) Iterable[Tuple[str, str]][source]

Returns the list of column names and raw column types.

Parameters

config – Configuration object used to configure a feature store.

get_table_query_string() str[source]

Returns a string that can directly be used to reference this table in SQL

name: str
owner: str
property query
static source_datatype_to_feast_value_type() Callable[[str], feast.value_type.ValueType][source]

Returns the callable method that returns Feast type given the raw column type.

property table
tags: Dict[str, str]
timestamp_field: str
to_proto() feast.core.DataSource_pb2.DataSource[source]

Converts a DataSourceProto object to its protobuf representation.

property trino_options

Returns the Trino options of this data source

validate(config: feast.repo_config.RepoConfig)[source]

Validates the underlying data source.

Parameters

config – Configuration object used to configure a feature store.

feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map module

feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map.pa_to_trino_value_type(pa_type_as_str: str) str[source]
feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map.trino_to_feast_value_type(trino_type_as_str: str) feast.value_type.ValueType[source]
feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map.trino_to_pa_value_type(trino_type_as_str: str) pyarrow.lib.DataType[source]

Module contents