feast.infra.utils package
Subpackages
Submodules
feast.infra.utils.aws_utils module
- feast.infra.utils.aws_utils.delete_api_gateway(api_gateway_client, api_gateway_id: str) Dict [source]
Delete the API Gateway given ID. :param api_gateway_client: API Gateway V2 Client. :param api_gateway_id: API Gateway ID to delete.
Returns: The delete_api API response dict.
- feast.infra.utils.aws_utils.delete_lambda_function(lambda_client, function_name: str) Dict [source]
Delete the AWS Lambda function by name. :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function.
Returns: The delete_function API response dict
- feast.infra.utils.aws_utils.delete_s3_directory(s3_resource, bucket: str, key: str)[source]
Delete S3 directory recursively
- feast.infra.utils.aws_utils.download_s3_directory(s3_resource, bucket: str, key: str, local_dir: str)[source]
Download the S3 directory to a local disk
- feast.infra.utils.aws_utils.execute_redshift_query_and_unload_to_s3(redshift_data_client, cluster_id: str, database: str, user: str, s3_path: str, iam_role: str, query: str) None [source]
Unload Redshift Query results to S3
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
s3_path – S3 directory where the unloaded data is written
iam_role – IAM Role for Redshift to assume during the UNLOAD command. The role must grant permission to write to the S3 location.
query – The SQL query to execute
- feast.infra.utils.aws_utils.execute_redshift_statement(redshift_data_client, cluster_id: str, database: str, user: str, query: str) str [source]
Execute Redshift statement synchronously. Waits for the query to finish.
Raises RedshiftCredentialsError if the statement couldn’t be executed due to the validation error. Raises RedshiftQueryError if the query runs but finishes with errors.
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
query – The SQL query to execute
Returns: Statement ID
- feast.infra.utils.aws_utils.execute_redshift_statement_async(redshift_data_client, cluster_id: str, database: str, user: str, query: str) dict [source]
Execute Redshift statement asynchronously. Does not wait for the query to finish.
Raises RedshiftCredentialsError if the statement couldn’t be executed due to the validation error.
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
query – The SQL query to execute
Returns: JSON response
- feast.infra.utils.aws_utils.get_bucket_and_key(s3_path: str) Tuple[str, str] [source]
Get the S3 bucket and key given the full path.
For example get_bucket_and_key(“s3://foo/bar/test.file”) returns (“foo”, “bar/test.file”)
If the s3_path doesn’t start with “s3://”, it throws ValueError.
- feast.infra.utils.aws_utils.get_first_api_gateway(api_gateway_client, api_gateway_name: str) Optional[Dict] [source]
Get the first API Gateway with the given name. Note, that API Gateways can have the same name. They are identified by AWS-generated ID, which is unique. Therefore this method lists all API Gateways and returns the first one with matching name. If no matching name is found, None is returned. :param api_gateway_client: API Gateway V2 Client. :param api_gateway_name: Name of the API Gateway function.
Returns: Either a dictionary containing the get_api response, or None if it doesn’t exist
- feast.infra.utils.aws_utils.get_lambda_function(lambda_client, function_name: str) Optional[Dict] [source]
Get the AWS Lambda function by name or return None if it doesn’t exist. :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function.
Returns: Either a dictionary containing the get_function API response, or None if it doesn’t exist.
- feast.infra.utils.aws_utils.get_redshift_data_client(aws_region: str)[source]
Get the Redshift Data API Service client for the given AWS region.
- feast.infra.utils.aws_utils.get_redshift_statement_result(redshift_data_client, statement_id: str) dict [source]
Get the Redshift statement result
- feast.infra.utils.aws_utils.get_s3_resource(aws_region: str)[source]
Get the S3 resource for the given AWS region.
- feast.infra.utils.aws_utils.temporarily_upload_df_to_redshift(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, table_name: str, df: pandas.core.frame.DataFrame) Iterator[None] [source]
Uploads a Pandas DataFrame to Redshift as a new table with cleanup logic.
This is essentially the same as upload_df_to_redshift (check out its docstring for full details), but unlike it this method is a generator and should be used with with block. For example:
>>> with temporarily_upload_df_to_redshift(...): >>> # Use `table_name` table in Redshift here >>> # `table_name` will not exist at this point, since it's cleaned up by the `with` block
- feast.infra.utils.aws_utils.unload_redshift_query_to_df(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, query: str) pandas.core.frame.DataFrame [source]
Unload Redshift Query results to S3 and get the results in Pandas DataFrame format
- feast.infra.utils.aws_utils.unload_redshift_query_to_pa(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, query: str) pyarrow.lib.Table [source]
Unload Redshift Query results to S3 and get the results in PyArrow Table format
- feast.infra.utils.aws_utils.update_lambda_function_environment(lambda_client, function_name: str, environment: Dict[str, Any]) None [source]
Update AWS Lambda function environment. The function is retried multiple times in case another action is currently being run on the lambda (e.g. it’s being created or being updated in parallel). :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function. :param environment: The desired lambda environment.
- feast.infra.utils.aws_utils.upload_arrow_table_to_redshift(table: Union[pyarrow.lib.Table, pathlib.Path], redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, iam_role: str, s3_path: str, table_name: str, schema: Optional[pyarrow.lib.Schema] = None, fail_if_exists: bool = True)[source]
Uploads an Arrow Table to Redshift to a new or existing table.
- Here’s how the upload process works:
PyArrow Table is serialized into a Parquet format on local disk
The Parquet file is uploaded to S3
The S3 file is uploaded to Redshift as a new table through COPY command
The local disk & s3 paths are cleaned up
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
iam_role – IAM Role for Redshift to assume during the COPY command. The role must grant permission to read the S3 location.
table_name – The name of the new Redshift table where we copy the dataframe
table – The Arrow Table or Path to parquet dataset to upload
schema – (Optionally) client may provide arrow Schema which will be converted into redshift table schema
fail_if_exists – fail if table with such name exists or append data to existing table
- Raises
RedshiftTableNameTooLong – The specified table name is too long.
- feast.infra.utils.aws_utils.upload_df_to_redshift(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, table_name: str, df: pandas.core.frame.DataFrame)[source]
Uploads a Pandas DataFrame to Redshift as a new table.
The caller is responsible for deleting the table when no longer necessary.
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
iam_role – IAM Role for Redshift to assume during the COPY command. The role must grant permission to read the S3 location.
table_name – The name of the new Redshift table where we copy the dataframe
df – The Pandas DataFrame to upload
- Raises
RedshiftTableNameTooLong – The specified table name is too long.
- feast.infra.utils.aws_utils.upload_df_to_s3(s3_resource, s3_path: str, df: pandas.core.frame.DataFrame) None [source]
Uploads a Pandas DataFrame to S3 as a parquet file
- Parameters
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
df – The Pandas DataFrame to upload
Returns: None
- feast.infra.utils.aws_utils.wait_for_redshift_statement(redshift_data_client, statement: dict) None [source]
Waits for the Redshift statement to finish. Raises RedshiftQueryError if the statement didn’t succeed.
We use exponential backoff for checking the query state until it’s not running. The backoff starts with 0.1 seconds and doubles exponentially until reaching 30 seconds, at which point the backoff is fixed.
- Parameters
redshift_data_client – Redshift Data API Service client
statement – The redshift statement to wait for (result of execute_redshift_statement)
Returns: None
feast.infra.utils.hbase_utils module
- class feast.infra.utils.hbase_utils.HbaseConstants[source]
Bases:
object
Constants to be used by the Hbase Online Store.
- CREATED_TS = 'created_ts'
- DEFAULT_COLUMN_FAMILY = 'default'
- DEFAULT_CREATED_TS = 'default:created_ts'
- DEFAULT_EVENT_TS = 'default:event_ts'
- EVENT_TS = 'event_ts'
- class feast.infra.utils.hbase_utils.HbaseUtils(conn: Optional[happybase.connection.Connection] = None, host: Optional[str] = None, port: Optional[int] = None, timeout=None)[source]
Bases:
object
Utils class to manage different Hbase operations.
- conn
happybase Connection to connect to hbase.
- host
hostname of the hbase thrift server.
- port
port in which thrift server is running.
- timeout
socket timeout in milliseconds.
- batch(table_name: str)[source]
Returns a ‘Batch’ instance that can be used for mass data manipulation in the hbase table.
- Parameters
table_name – Name of the Hbase table.
- check_if_table_exist(table_name: str)[source]
Check if table exists in hbase.
- Parameters
table_name – Name of the Hbase table.
- create_table(table_name: str, colm_family: List[str])[source]
Create table in hbase online store.
- Parameters
table_name – Name of the Hbase table.
colm_family – List of names of column families to be created in the hbase table.
- create_table_with_default_cf(table_name: str)[source]
Create table in hbase online store with one column family “default”.
- Parameters
table_name – Name of the Hbase table.
- put(table_name: str, row_key: str, data: dict)[source]
Store data in the hbase table.
- Parameters
table_name – Name of the Hbase table.
row_key – Row key of the row to be inserted to hbase table.
data – Mapping of column family name:column name to column values
- row(table_name: str, row_key, columns=None, timestamp=None, include_timestamp=False)[source]
Fetch a row of data from the hbase table.
- Parameters
table_name – Name of the Hbase table.
row_key – Row key of the row to be inserted to hbase table.
columns – the name of columns that needs to be fetched.
timestamp – timestamp specifies the maximum version the cells can have.
include_timestamp – specifies if (column, timestamp) to be return instead of only column.
- rows(table_name: str, row_keys, columns=None, timestamp=None, include_timestamp=False)[source]
Fetch multiple rows of data from the hbase table.
- Parameters
table_name – Name of the Hbase table.
row_keys – List of row key of the row to be inserted to hbase table.
columns – the name of columns that needs to be fetched.
timestamp – timestamp specifies the maximum version the cells can have.
include_timestamp – specifies if (column, timestamp) to be return instead of only column.
feast.infra.utils.snowflake_utils module
- feast.infra.utils.snowflake_utils.chunk_helper(lst: pandas.core.frame.DataFrame, n: int) Iterator[Tuple[int, pandas.core.frame.DataFrame]] [source]
Helper generator to chunk a sequence efficiently with current index like if enumerate was called on sequence.
- feast.infra.utils.snowflake_utils.copy_uploaded_data_to_table(cursor: snowflake.connector.cursor.SnowflakeCursor, stage_name: str, columns: List[str], table_name: str, database: Optional[str] = None, schema: Optional[str] = None, compression: str = 'gzip', on_error: str = 'abort_statement', quote_identifiers: bool = True, auto_create_table: bool = False, create_temp_table: bool = False)[source]
- feast.infra.utils.snowflake_utils.create_file_format(compression: str, compression_map: Dict[str, str], cursor: snowflake.connector.cursor.SnowflakeCursor) str [source]
- feast.infra.utils.snowflake_utils.create_temporary_sfc_stage(cursor: snowflake.connector.cursor.SnowflakeCursor) str [source]
- feast.infra.utils.snowflake_utils.execute_snowflake_statement(conn: snowflake.connector.connection.SnowflakeConnection, query) snowflake.connector.cursor.SnowflakeCursor [source]
- feast.infra.utils.snowflake_utils.get_snowflake_conn(config, autocommit=True) snowflake.connector.connection.SnowflakeConnection [source]
- feast.infra.utils.snowflake_utils.parse_private_key_path(key_path: str, private_key_passphrase: str) bytes [source]
- feast.infra.utils.snowflake_utils.upload_df(df: pandas.core.frame.DataFrame, cursor: snowflake.connector.cursor.SnowflakeCursor, stage_name: str, chunk_size: Optional[int] = None, parallel: int = 4, compression: str = 'gzip')[source]
- Parameters
df – Dataframe we’d like to write back.
cursor – cursor to be used to communicate with Snowflake.
stage_name – stage name in Snowflake connection.
chunk_size – Number of elements to be inserted once, if not provided all elements will be dumped once (Default value = None).
parallel – Number of threads to be used when uploading chunks, default follows documentation at: https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4).
compression – The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a better compression, while snappy is faster. Use whichever is more appropriate (Default value = ‘gzip’).
- feast.infra.utils.snowflake_utils.upload_local_pq(path: pathlib.Path, cursor: snowflake.connector.cursor.SnowflakeCursor, stage_name: str, parallel: int = 4)[source]
- Parameters
path – Path to parquet dataset on disk
cursor – cursor to be used to communicate with Snowflake.
stage_name – stage name in Snowflake connection.
parallel – Number of threads to be used when uploading chunks, default follows documentation at: https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4).
- feast.infra.utils.snowflake_utils.write_pandas(conn: snowflake.connector.connection.SnowflakeConnection, df: pandas.core.frame.DataFrame, table_name: str, database: Optional[str] = None, schema: Optional[str] = None, chunk_size: Optional[int] = None, compression: str = 'gzip', on_error: str = 'abort_statement', parallel: int = 4, quote_identifiers: bool = True, auto_create_table: bool = False, create_temp_table: bool = False)[source]
Allows users to most efficiently write back a pandas DataFrame to Snowflake.
It works by dumping the DataFrame into Parquet files, uploading them and finally copying their data into the table.
Returns whether all files were ingested correctly, number of chunks uploaded, and number of rows ingested with all of the COPY INTO command’s output for debugging purposes.
- Example usage:
import pandas from snowflake.connector.pandas_tools import write_pandas
df = pandas.DataFrame([(‘Mark’, 10), (‘Luke’, 20)], columns=[‘name’, ‘balance’]) success, nchunks, nrows, _ = write_pandas(cnx, df, ‘customers’)
- Parameters
conn – Connection to be used to communicate with Snowflake.
df – Dataframe we’d like to write back.
table_name – Table name where we want to insert into.
database – Database schema and table is in, if not provided the default one will be used (Default value = None).
schema – Schema table is in, if not provided the default one will be used (Default value = None).
chunk_size – Number of elements to be inserted once, if not provided all elements will be dumped once (Default value = None).
compression – The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a better compression, while snappy is faster. Use whichever is more appropriate (Default value = ‘gzip’).
on_error – Action to take when COPY INTO statements fail, default follows documentation at: https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions (Default value = ‘abort_statement’).
parallel – Number of threads to be used when uploading chunks, default follows documentation at: https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4).
quote_identifiers – By default, identifiers, specifically database, schema, table and column names (from df.columns) will be quoted. If set to False, identifiers are passed on to Snowflake without quoting. I.e. identifiers will be coerced to uppercase by Snowflake. (Default value = True)
auto_create_table – When true, will automatically create a table with corresponding columns for each column in the passed in DataFrame. The table will not be created if it already exists
create_temp_table – Will make the auto-created table as a temporary table
- feast.infra.utils.snowflake_utils.write_parquet(conn: snowflake.connector.connection.SnowflakeConnection, path: pathlib.Path, dataset_schema: pyarrow.lib.Schema, table_name: str, database: Optional[str] = None, schema: Optional[str] = None, compression: str = 'gzip', on_error: str = 'abort_statement', parallel: int = 4, quote_identifiers: bool = True, auto_create_table: bool = False, create_temp_table: bool = False)[source]