feast.infra.utils package

Subpackages

Submodules

feast.infra.utils.aws_utils module

exception feast.infra.utils.aws_utils.RedshiftStatementNotFinishedError[source]

Bases: Exception

feast.infra.utils.aws_utils.delete_api_gateway(api_gateway_client, api_gateway_id: str) Dict[source]

Delete the API Gateway given ID. :param api_gateway_client: API Gateway V2 Client. :param api_gateway_id: API Gateway ID to delete.

Returns: The delete_api API response dict.

feast.infra.utils.aws_utils.delete_lambda_function(lambda_client, function_name: str) Dict[source]

Delete the AWS Lambda function by name. :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function.

Returns: The delete_function API response dict

feast.infra.utils.aws_utils.delete_redshift_table(redshift_data_client, cluster_id: str, database: str, user: str, table_name: str)[source]
feast.infra.utils.aws_utils.delete_s3_directory(s3_resource, bucket: str, key: str)[source]

Delete S3 directory recursively

feast.infra.utils.aws_utils.download_s3_directory(s3_resource, bucket: str, key: str, local_dir: str)[source]

Download the S3 directory to a local disk

feast.infra.utils.aws_utils.execute_redshift_query_and_unload_to_s3(redshift_data_client, cluster_id: str, database: str, user: str, s3_path: str, iam_role: str, query: str) None[source]

Unload Redshift Query results to S3

Parameters
  • redshift_data_client – Redshift Data API Service client

  • cluster_id – Redshift Cluster Identifier

  • database – Redshift Database Name

  • user – Redshift username

  • s3_path – S3 directory where the unloaded data is written

  • iam_role – IAM Role for Redshift to assume during the UNLOAD command. The role must grant permission to write to the S3 location.

  • query – The SQL query to execute

feast.infra.utils.aws_utils.execute_redshift_statement(redshift_data_client, cluster_id: str, database: str, user: str, query: str) str[source]

Execute Redshift statement synchronously. Waits for the query to finish.

Raises RedshiftCredentialsError if the statement couldn’t be executed due to the validation error. Raises RedshiftQueryError if the query runs but finishes with errors.

Parameters
  • redshift_data_client – Redshift Data API Service client

  • cluster_id – Redshift Cluster Identifier

  • database – Redshift Database Name

  • user – Redshift username

  • query – The SQL query to execute

Returns: Statement ID

feast.infra.utils.aws_utils.execute_redshift_statement_async(redshift_data_client, cluster_id: str, database: str, user: str, query: str) dict[source]

Execute Redshift statement asynchronously. Does not wait for the query to finish.

Raises RedshiftCredentialsError if the statement couldn’t be executed due to the validation error.

Parameters
  • redshift_data_client – Redshift Data API Service client

  • cluster_id – Redshift Cluster Identifier

  • database – Redshift Database Name

  • user – Redshift username

  • query – The SQL query to execute

Returns: JSON response

feast.infra.utils.aws_utils.get_account_id() str[source]

Get AWS Account ID

feast.infra.utils.aws_utils.get_bucket_and_key(s3_path: str) Tuple[str, str][source]

Get the S3 bucket and key given the full path.

For example get_bucket_and_key(“s3://foo/bar/test.file”) returns (“foo”, “bar/test.file”)

If the s3_path doesn’t start with “s3://”, it throws ValueError.

feast.infra.utils.aws_utils.get_first_api_gateway(api_gateway_client, api_gateway_name: str) Optional[Dict][source]

Get the first API Gateway with the given name. Note, that API Gateways can have the same name. They are identified by AWS-generated ID, which is unique. Therefore this method lists all API Gateways and returns the first one with matching name. If no matching name is found, None is returned. :param api_gateway_client: API Gateway V2 Client. :param api_gateway_name: Name of the API Gateway function.

Returns: Either a dictionary containing the get_api response, or None if it doesn’t exist

feast.infra.utils.aws_utils.get_lambda_function(lambda_client, function_name: str) Optional[Dict][source]

Get the AWS Lambda function by name or return None if it doesn’t exist. :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function.

Returns: Either a dictionary containing the get_function API response, or None if it doesn’t exist.

feast.infra.utils.aws_utils.get_redshift_data_client(aws_region: str)[source]

Get the Redshift Data API Service client for the given AWS region.

feast.infra.utils.aws_utils.get_redshift_statement_result(redshift_data_client, statement_id: str) dict[source]

Get the Redshift statement result

feast.infra.utils.aws_utils.get_s3_resource(aws_region: str)[source]

Get the S3 resource for the given AWS region.

feast.infra.utils.aws_utils.temporarily_upload_arrow_table_to_redshift(table: Union[pyarrow.lib.Table, pathlib.Path], redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, iam_role: str, s3_path: str, table_name: str, schema: Optional[pyarrow.lib.Schema] = None, fail_if_exists: bool = True) Iterator[None][source]

Uploads a Arrow Table to Redshift as a new table with cleanup logic.

This is essentially the same as upload_arrow_table_to_redshift (check out its docstring for full details), but unlike it this method is a generator and should be used with with block. For example:

>>> with temporarily_upload_arrow_table_to_redshift(...): 
>>>     # Use `table_name` table in Redshift here
>>> # `table_name` will not exist at this point, since it's cleaned up by the `with` block
feast.infra.utils.aws_utils.temporarily_upload_df_to_redshift(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, table_name: str, df: pandas.core.frame.DataFrame) Iterator[None][source]

Uploads a Pandas DataFrame to Redshift as a new table with cleanup logic.

This is essentially the same as upload_df_to_redshift (check out its docstring for full details), but unlike it this method is a generator and should be used with with block. For example:

>>> with temporarily_upload_df_to_redshift(...): 
>>>     # Use `table_name` table in Redshift here
>>> # `table_name` will not exist at this point, since it's cleaned up by the `with` block
feast.infra.utils.aws_utils.unload_redshift_query_to_df(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, query: str) pandas.core.frame.DataFrame[source]

Unload Redshift Query results to S3 and get the results in Pandas DataFrame format

feast.infra.utils.aws_utils.unload_redshift_query_to_pa(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, query: str) pyarrow.lib.Table[source]

Unload Redshift Query results to S3 and get the results in PyArrow Table format

feast.infra.utils.aws_utils.update_lambda_function_environment(lambda_client, function_name: str, environment: Dict[str, Any]) None[source]

Update AWS Lambda function environment. The function is retried multiple times in case another action is currently being run on the lambda (e.g. it’s being created or being updated in parallel). :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function. :param environment: The desired lambda environment.

feast.infra.utils.aws_utils.upload_arrow_table_to_redshift(table: Union[pyarrow.lib.Table, pathlib.Path], redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, iam_role: str, s3_path: str, table_name: str, schema: Optional[pyarrow.lib.Schema] = None, fail_if_exists: bool = True)[source]

Uploads an Arrow Table to Redshift to a new or existing table.

Here’s how the upload process works:
  1. PyArrow Table is serialized into a Parquet format on local disk

  2. The Parquet file is uploaded to S3

  3. The S3 file is uploaded to Redshift as a new table through COPY command

  4. The local disk & s3 paths are cleaned up

Parameters
  • redshift_data_client – Redshift Data API Service client

  • cluster_id – Redshift Cluster Identifier

  • database – Redshift Database Name

  • user – Redshift username

  • s3_resource – S3 Resource object

  • s3_path – S3 path where the Parquet file is temporarily uploaded

  • iam_role – IAM Role for Redshift to assume during the COPY command. The role must grant permission to read the S3 location.

  • table_name – The name of the new Redshift table where we copy the dataframe

  • table – The Arrow Table or Path to parquet dataset to upload

  • schema – (Optionally) client may provide arrow Schema which will be converted into redshift table schema

  • fail_if_exists – fail if table with such name exists or append data to existing table

Raises

RedshiftTableNameTooLong – The specified table name is too long.

feast.infra.utils.aws_utils.upload_df_to_redshift(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, table_name: str, df: pandas.core.frame.DataFrame)[source]

Uploads a Pandas DataFrame to Redshift as a new table.

The caller is responsible for deleting the table when no longer necessary.

Parameters
  • redshift_data_client – Redshift Data API Service client

  • cluster_id – Redshift Cluster Identifier

  • database – Redshift Database Name

  • user – Redshift username

  • s3_resource – S3 Resource object

  • s3_path – S3 path where the Parquet file is temporarily uploaded

  • iam_role – IAM Role for Redshift to assume during the COPY command. The role must grant permission to read the S3 location.

  • table_name – The name of the new Redshift table where we copy the dataframe

  • df – The Pandas DataFrame to upload

Raises

RedshiftTableNameTooLong – The specified table name is too long.

feast.infra.utils.aws_utils.upload_df_to_s3(s3_resource, s3_path: str, df: pandas.core.frame.DataFrame) None[source]

Uploads a Pandas DataFrame to S3 as a parquet file

Parameters
  • s3_resource – S3 Resource object

  • s3_path – S3 path where the Parquet file is temporarily uploaded

  • df – The Pandas DataFrame to upload

Returns: None

feast.infra.utils.aws_utils.wait_for_redshift_statement(redshift_data_client, statement: dict) None[source]

Waits for the Redshift statement to finish. Raises RedshiftQueryError if the statement didn’t succeed.

We use exponential backoff for checking the query state until it’s not running. The backoff starts with 0.1 seconds and doubles exponentially until reaching 30 seconds, at which point the backoff is fixed.

Parameters
  • redshift_data_client – Redshift Data API Service client

  • statement – The redshift statement to wait for (result of execute_redshift_statement)

Returns: None

feast.infra.utils.hbase_utils module

class feast.infra.utils.hbase_utils.HbaseConstants[source]

Bases: object

Constants to be used by the Hbase Online Store.

CREATED_TS = 'created_ts'
DEFAULT_COLUMN_FAMILY = 'default'
DEFAULT_CREATED_TS = 'default:created_ts'
DEFAULT_EVENT_TS = 'default:event_ts'
EVENT_TS = 'event_ts'
static get_col_from_feature(feature)[source]

Given the feature name, add the column family to get the column name.

static get_feature_from_col(col)[source]

Given the column name, exclude the column family to get the feature name.

class feast.infra.utils.hbase_utils.HbaseUtils(conn: Optional[happybase.connection.Connection] = None, host: Optional[str] = None, port: Optional[int] = None, timeout=None)[source]

Bases: object

Utils class to manage different Hbase operations.

conn

happybase Connection to connect to hbase.

host

hostname of the hbase thrift server.

port

port in which thrift server is running.

timeout

socket timeout in milliseconds.

batch(table_name: str)[source]

Returns a ‘Batch’ instance that can be used for mass data manipulation in the hbase table.

Parameters

table_name – Name of the Hbase table.

check_if_table_exist(table_name: str)[source]

Check if table exists in hbase.

Parameters

table_name – Name of the Hbase table.

close_conn()[source]

Closes the happybase connection.

create_table(table_name: str, colm_family: List[str])[source]

Create table in hbase online store.

Parameters
  • table_name – Name of the Hbase table.

  • colm_family – List of names of column families to be created in the hbase table.

create_table_with_default_cf(table_name: str)[source]

Create table in hbase online store with one column family “default”.

Parameters

table_name – Name of the Hbase table.

delete_table(table: str)[source]

Deletes the hbase table given the table name.

print_table(table_name)[source]

Prints the table scanning all the rows of the hbase table.

put(table_name: str, row_key: str, data: dict)[source]

Store data in the hbase table.

Parameters
  • table_name – Name of the Hbase table.

  • row_key – Row key of the row to be inserted to hbase table.

  • data – Mapping of column family name:column name to column values

row(table_name: str, row_key, columns=None, timestamp=None, include_timestamp=False)[source]

Fetch a row of data from the hbase table.

Parameters
  • table_name – Name of the Hbase table.

  • row_key – Row key of the row to be inserted to hbase table.

  • columns – the name of columns that needs to be fetched.

  • timestamp – timestamp specifies the maximum version the cells can have.

  • include_timestamp – specifies if (column, timestamp) to be return instead of only column.

rows(table_name: str, row_keys, columns=None, timestamp=None, include_timestamp=False)[source]

Fetch multiple rows of data from the hbase table.

Parameters
  • table_name – Name of the Hbase table.

  • row_keys – List of row key of the row to be inserted to hbase table.

  • columns – the name of columns that needs to be fetched.

  • timestamp – timestamp specifies the maximum version the cells can have.

  • include_timestamp – specifies if (column, timestamp) to be return instead of only column.

feast.infra.utils.hbase_utils.main()[source]

feast.infra.utils.snowflake_utils module

feast.infra.utils.snowflake_utils.chunk_helper(lst: pandas.core.frame.DataFrame, n: int) Iterator[Tuple[int, pandas.core.frame.DataFrame]][source]

Helper generator to chunk a sequence efficiently with current index like if enumerate was called on sequence.

feast.infra.utils.snowflake_utils.copy_uploaded_data_to_table(cursor: snowflake.connector.cursor.SnowflakeCursor, stage_name: str, columns: List[str], table_name: str, database: Optional[str] = None, schema: Optional[str] = None, compression: str = 'gzip', on_error: str = 'abort_statement', quote_identifiers: bool = True, auto_create_table: bool = False, create_temp_table: bool = False)[source]
feast.infra.utils.snowflake_utils.create_file_format(compression: str, compression_map: Dict[str, str], cursor: snowflake.connector.cursor.SnowflakeCursor) str[source]
feast.infra.utils.snowflake_utils.create_temporary_sfc_stage(cursor: snowflake.connector.cursor.SnowflakeCursor) str[source]
feast.infra.utils.snowflake_utils.execute_snowflake_statement(conn: snowflake.connector.connection.SnowflakeConnection, query) snowflake.connector.cursor.SnowflakeCursor[source]
feast.infra.utils.snowflake_utils.get_snowflake_conn(config, autocommit=True) snowflake.connector.connection.SnowflakeConnection[source]
feast.infra.utils.snowflake_utils.parse_private_key_path(key_path: str, private_key_passphrase: str) bytes[source]
feast.infra.utils.snowflake_utils.upload_df(df: pandas.core.frame.DataFrame, cursor: snowflake.connector.cursor.SnowflakeCursor, stage_name: str, chunk_size: Optional[int] = None, parallel: int = 4, compression: str = 'gzip')[source]
Parameters
  • df – Dataframe we’d like to write back.

  • cursor – cursor to be used to communicate with Snowflake.

  • stage_name – stage name in Snowflake connection.

  • chunk_size – Number of elements to be inserted once, if not provided all elements will be dumped once (Default value = None).

  • parallel – Number of threads to be used when uploading chunks, default follows documentation at: https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4).

  • compression – The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a better compression, while snappy is faster. Use whichever is more appropriate (Default value = ‘gzip’).

feast.infra.utils.snowflake_utils.upload_local_pq(path: pathlib.Path, cursor: snowflake.connector.cursor.SnowflakeCursor, stage_name: str, parallel: int = 4)[source]
Parameters
feast.infra.utils.snowflake_utils.write_pandas(conn: snowflake.connector.connection.SnowflakeConnection, df: pandas.core.frame.DataFrame, table_name: str, database: Optional[str] = None, schema: Optional[str] = None, chunk_size: Optional[int] = None, compression: str = 'gzip', on_error: str = 'abort_statement', parallel: int = 4, quote_identifiers: bool = True, auto_create_table: bool = False, create_temp_table: bool = False)[source]

Allows users to most efficiently write back a pandas DataFrame to Snowflake.

It works by dumping the DataFrame into Parquet files, uploading them and finally copying their data into the table.

Returns whether all files were ingested correctly, number of chunks uploaded, and number of rows ingested with all of the COPY INTO command’s output for debugging purposes.

Example usage:

import pandas from snowflake.connector.pandas_tools import write_pandas

df = pandas.DataFrame([(‘Mark’, 10), (‘Luke’, 20)], columns=[‘name’, ‘balance’]) success, nchunks, nrows, _ = write_pandas(cnx, df, ‘customers’)

Parameters
  • conn – Connection to be used to communicate with Snowflake.

  • df – Dataframe we’d like to write back.

  • table_name – Table name where we want to insert into.

  • database – Database schema and table is in, if not provided the default one will be used (Default value = None).

  • schema – Schema table is in, if not provided the default one will be used (Default value = None).

  • chunk_size – Number of elements to be inserted once, if not provided all elements will be dumped once (Default value = None).

  • compression – The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a better compression, while snappy is faster. Use whichever is more appropriate (Default value = ‘gzip’).

  • on_error – Action to take when COPY INTO statements fail, default follows documentation at: https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions (Default value = ‘abort_statement’).

  • parallel – Number of threads to be used when uploading chunks, default follows documentation at: https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4).

  • quote_identifiers – By default, identifiers, specifically database, schema, table and column names (from df.columns) will be quoted. If set to False, identifiers are passed on to Snowflake without quoting. I.e. identifiers will be coerced to uppercase by Snowflake. (Default value = True)

  • auto_create_table – When true, will automatically create a table with corresponding columns for each column in the passed in DataFrame. The table will not be created if it already exists

  • create_temp_table – Will make the auto-created table as a temporary table

feast.infra.utils.snowflake_utils.write_parquet(conn: snowflake.connector.connection.SnowflakeConnection, path: pathlib.Path, dataset_schema: pyarrow.lib.Schema, table_name: str, database: Optional[str] = None, schema: Optional[str] = None, compression: str = 'gzip', on_error: str = 'abort_statement', parallel: int = 4, quote_identifiers: bool = True, auto_create_table: bool = False, create_temp_table: bool = False)[source]

Module contents