feast.infra.utils package
Subpackages
Submodules
feast.infra.utils.aws_utils module
- exception feast.infra.utils.aws_utils.AthenaTableNameTooLong(table_name: str)[source]
Bases:
Exception
- feast.infra.utils.aws_utils.delete_api_gateway(api_gateway_client, api_gateway_id: str) Dict [source]
Delete the API Gateway given ID. :param api_gateway_client: API Gateway V2 Client. :param api_gateway_id: API Gateway ID to delete.
Returns: The delete_api API response dict.
- feast.infra.utils.aws_utils.delete_lambda_function(lambda_client, function_name: str) Dict [source]
Delete the AWS Lambda function by name. :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function.
Returns: The delete_function API response dict
- feast.infra.utils.aws_utils.delete_redshift_table(redshift_data_client, cluster_id: str, database: str, user: str, table_name: str)[source]
- feast.infra.utils.aws_utils.delete_s3_directory(s3_resource, bucket: str, key: str)[source]
Delete S3 directory recursively
- feast.infra.utils.aws_utils.download_s3_directory(s3_resource, bucket: str, key: str, local_dir: str)[source]
Download the S3 directory to a local disk
- feast.infra.utils.aws_utils.drop_temp_table(athena_data_client, data_source: str, database: str, temp_table: str)[source]
- feast.infra.utils.aws_utils.execute_athena_query(athena_data_client, data_source: str, database: str, query: str, temp_table: Optional[str] = None) str [source]
Execute athena statement synchronously. Waits for the query to finish.
Raises athenaCredentialsError if the statement couldn’t be executed due to the validation error. Raises athenaQueryError if the query runs but finishes with errors.
- Parameters
athena_data_client – athena Data API Service client
data_source – athena data source Name
database – athena Database Name
query – The SQL query to execute
temp_table – temp table name to be deleted after query execution.
Returns: Statement ID
- feast.infra.utils.aws_utils.execute_athena_query_and_unload_to_s3(athena_data_client, data_source: str, database: str, query: str, temp_table: str) None [source]
Unload Athena Query results to S3
- Parameters
athena_data_client – Athena Data API Service client
data_source – Athena data source
database – Redshift Database Name
query – The SQL query to execute
temp_table – temp table name to be deleted after query execution.
- feast.infra.utils.aws_utils.execute_athena_query_async(athena_data_client, data_source: str, database: str, query: str) dict [source]
Execute Athena statement asynchronously. Does not wait for the query to finish.
Raises AthenaCredentialsError if the statement couldn’t be executed due to the validation error.
- Parameters
athena_data_client – athena Data API Service client
data_source – athena Cluster Identifier
database – athena Database Name
query – The SQL query to execute
Returns: JSON response
- feast.infra.utils.aws_utils.execute_redshift_query_and_unload_to_s3(redshift_data_client, cluster_id: str, database: str, user: str, s3_path: str, iam_role: str, query: str) None [source]
Unload Redshift Query results to S3
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
s3_path – S3 directory where the unloaded data is written
iam_role – IAM Role for Redshift to assume during the UNLOAD command. The role must grant permission to write to the S3 location.
query – The SQL query to execute
- feast.infra.utils.aws_utils.execute_redshift_statement(redshift_data_client, cluster_id: str, database: str, user: str, query: str) str [source]
Execute Redshift statement synchronously. Waits for the query to finish.
Raises RedshiftCredentialsError if the statement couldn’t be executed due to the validation error. Raises RedshiftQueryError if the query runs but finishes with errors.
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
query – The SQL query to execute
Returns: Statement ID
- feast.infra.utils.aws_utils.execute_redshift_statement_async(redshift_data_client, cluster_id: str, database: str, user: str, query: str) dict [source]
Execute Redshift statement asynchronously. Does not wait for the query to finish.
Raises RedshiftCredentialsError if the statement couldn’t be executed due to the validation error.
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
query – The SQL query to execute
Returns: JSON response
- feast.infra.utils.aws_utils.get_athena_data_client(aws_region: str)[source]
Get the athena Data API Service client for the given AWS region.
- feast.infra.utils.aws_utils.get_athena_query_result(athena_data_client, query_execution_id: str) dict [source]
Get the athena query result
- feast.infra.utils.aws_utils.get_bucket_and_key(s3_path: str) Tuple[str, str] [source]
Get the S3 bucket and key given the full path.
For example get_bucket_and_key(“s3://foo/bar/test.file”) returns (“foo”, “bar/test.file”)
If the s3_path doesn’t start with “s3://”, it throws ValueError.
- feast.infra.utils.aws_utils.get_first_api_gateway(api_gateway_client, api_gateway_name: str) Optional[Dict] [source]
Get the first API Gateway with the given name. Note, that API Gateways can have the same name. They are identified by AWS-generated ID, which is unique. Therefore this method lists all API Gateways and returns the first one with matching name. If no matching name is found, None is returned. :param api_gateway_client: API Gateway V2 Client. :param api_gateway_name: Name of the API Gateway function.
Returns: Either a dictionary containing the get_api response, or None if it doesn’t exist
- feast.infra.utils.aws_utils.get_lambda_function(lambda_client, function_name: str) Optional[Dict] [source]
Get the AWS Lambda function by name or return None if it doesn’t exist. :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function.
Returns: Either a dictionary containing the get_function API response, or None if it doesn’t exist.
- feast.infra.utils.aws_utils.get_redshift_data_client(aws_region: str)[source]
Get the Redshift Data API Service client for the given AWS region.
- feast.infra.utils.aws_utils.get_redshift_statement_result(redshift_data_client, statement_id: str) dict [source]
Get the Redshift statement result
- feast.infra.utils.aws_utils.get_s3_resource(aws_region: str)[source]
Get the S3 resource for the given AWS region.
- feast.infra.utils.aws_utils.temporarily_upload_arrow_table_to_redshift(table: Union[pyarrow.lib.Table, pathlib.Path], redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, iam_role: str, s3_path: str, table_name: str, schema: Optional[pyarrow.lib.Schema] = None, fail_if_exists: bool = True) Iterator[None] [source]
Uploads a Arrow Table to Redshift as a new table with cleanup logic.
This is essentially the same as upload_arrow_table_to_redshift (check out its docstring for full details), but unlike it this method is a generator and should be used with with block. For example:
>>> with temporarily_upload_arrow_table_to_redshift(...): >>> # Use `table_name` table in Redshift here >>> # `table_name` will not exist at this point, since it's cleaned up by the `with` block
- feast.infra.utils.aws_utils.temporarily_upload_df_to_redshift(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, table_name: str, df: pandas.core.frame.DataFrame) Iterator[None] [source]
Uploads a Pandas DataFrame to Redshift as a new table with cleanup logic.
This is essentially the same as upload_df_to_redshift (check out its docstring for full details), but unlike it this method is a generator and should be used with with block. For example:
>>> with temporarily_upload_df_to_redshift(...): >>> # Use `table_name` table in Redshift here >>> # `table_name` will not exist at this point, since it's cleaned up by the `with` block
- feast.infra.utils.aws_utils.unload_athena_query_to_df(athena_data_client, data_source: str, database: str, s3_resource, s3_path: str, query: str, temp_table: str) pandas.core.frame.DataFrame [source]
Unload Athena Query results to S3 and get the results in Pandas DataFrame format
- feast.infra.utils.aws_utils.unload_athena_query_to_pa(athena_data_client, data_source: str, database: str, s3_resource, s3_path: str, query: str, temp_table: str) pyarrow.lib.Table [source]
Unload Athena Query results to S3 and get the results in PyArrow Table format
- feast.infra.utils.aws_utils.unload_redshift_query_to_df(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, query: str) pandas.core.frame.DataFrame [source]
Unload Redshift Query results to S3 and get the results in Pandas DataFrame format
- feast.infra.utils.aws_utils.unload_redshift_query_to_pa(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, query: str) pyarrow.lib.Table [source]
Unload Redshift Query results to S3 and get the results in PyArrow Table format
- feast.infra.utils.aws_utils.update_lambda_function_environment(lambda_client, function_name: str, environment: Dict[str, Any]) None [source]
Update AWS Lambda function environment. The function is retried multiple times in case another action is currently being run on the lambda (e.g. it’s being created or being updated in parallel). :param lambda_client: AWS Lambda client. :param function_name: Name of the AWS Lambda function. :param environment: The desired lambda environment.
- feast.infra.utils.aws_utils.upload_arrow_table_to_athena(table: Union[pyarrow.lib.Table, pathlib.Path], athena_client, data_source: str, database: str, s3_resource, s3_path: str, table_name: str, schema: Optional[pyarrow.lib.Schema] = None, fail_if_exists: bool = True)[source]
Uploads an Arrow Table to S3(Athena).
- Here’s how the upload process works:
PyArrow Table is serialized into a Parquet format on local disk
The Parquet file is uploaded to S3
an Athena(data catalog) table is created. the S3 directory(in number 2) will be set as an external location.
The local disk & s3 paths are cleaned up
- Parameters
table – The Arrow Table or Path to parquet dataset to upload
athena_client – Athena API Service client
data_source – Athena data source
database – Athena Database Name
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
table_name – The name of the new Athena table where we copy the dataframe
schema – (Optionally) client may provide arrow Schema which will be converted into Athena table schema
fail_if_exists – fail if table with such name exists or append data to existing table
- Raises
AthenaTableNameTooLong – The specified table name is too long.
- feast.infra.utils.aws_utils.upload_arrow_table_to_redshift(table: Union[pyarrow.lib.Table, pathlib.Path], redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, iam_role: str, s3_path: str, table_name: str, schema: Optional[pyarrow.lib.Schema] = None, fail_if_exists: bool = True)[source]
Uploads an Arrow Table to Redshift to a new or existing table.
- Here’s how the upload process works:
PyArrow Table is serialized into a Parquet format on local disk
The Parquet file is uploaded to S3
The S3 file is uploaded to Redshift as a new table through COPY command
The local disk & s3 paths are cleaned up
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
iam_role – IAM Role for Redshift to assume during the COPY command. The role must grant permission to read the S3 location.
table_name – The name of the new Redshift table where we copy the dataframe
table – The Arrow Table or Path to parquet dataset to upload
schema – (Optionally) client may provide arrow Schema which will be converted into redshift table schema
fail_if_exists – fail if table with such name exists or append data to existing table
- Raises
RedshiftTableNameTooLong – The specified table name is too long.
- feast.infra.utils.aws_utils.upload_df_to_athena(athena_client, data_source: str, database: str, s3_resource, s3_path: str, table_name: str, df: pandas.core.frame.DataFrame)[source]
Uploads a Pandas DataFrame to S3(Athena) as a new table.
The caller is responsible for deleting the table when no longer necessary.
- Parameters
athena_client – Athena API Service client
data_source – Athena Data Source
database – Athena Database Name
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
table_name – The name of the new Data Catalog table where we copy the dataframe
df – The Pandas DataFrame to upload
- Raises
AthenaTableNameTooLong – The specified table name is too long.
- feast.infra.utils.aws_utils.upload_df_to_redshift(redshift_data_client, cluster_id: str, database: str, user: str, s3_resource, s3_path: str, iam_role: str, table_name: str, df: pandas.core.frame.DataFrame)[source]
Uploads a Pandas DataFrame to Redshift as a new table.
The caller is responsible for deleting the table when no longer necessary.
- Parameters
redshift_data_client – Redshift Data API Service client
cluster_id – Redshift Cluster Identifier
database – Redshift Database Name
user – Redshift username
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
iam_role – IAM Role for Redshift to assume during the COPY command. The role must grant permission to read the S3 location.
table_name – The name of the new Redshift table where we copy the dataframe
df – The Pandas DataFrame to upload
- Raises
RedshiftTableNameTooLong – The specified table name is too long.
- feast.infra.utils.aws_utils.upload_df_to_s3(s3_resource, s3_path: str, df: pandas.core.frame.DataFrame) None [source]
Uploads a Pandas DataFrame to S3 as a parquet file
- Parameters
s3_resource – S3 Resource object
s3_path – S3 path where the Parquet file is temporarily uploaded
df – The Pandas DataFrame to upload
Returns: None
- feast.infra.utils.aws_utils.wait_for_athena_execution(athena_data_client, execution: dict) None [source]
Waits for the Athena statement to finish. Raises AthenaQueryError if the statement didn’t succeed.
We use exponential backoff for checking the query state until it’s not running. The backoff starts with 0.1 seconds and doubles exponentially until reaching 30 seconds, at which point the backoff is fixed.
- Parameters
athena_data_client – athena Service boto3 client
execution – The athena execution to wait for (result of execute_athena_statement)
Returns: None
- feast.infra.utils.aws_utils.wait_for_redshift_statement(redshift_data_client, statement: dict) None [source]
Waits for the Redshift statement to finish. Raises RedshiftQueryError if the statement didn’t succeed.
We use exponential backoff for checking the query state until it’s not running. The backoff starts with 0.1 seconds and doubles exponentially until reaching 30 seconds, at which point the backoff is fixed.
- Parameters
redshift_data_client – Redshift Data API Service client
statement – The redshift statement to wait for (result of execute_redshift_statement)
Returns: None
feast.infra.utils.hbase_utils module
- class feast.infra.utils.hbase_utils.HbaseConstants[source]
Bases:
object
Constants to be used by the Hbase Online Store.
- CREATED_TS = 'created_ts'
- DEFAULT_COLUMN_FAMILY = 'default'
- DEFAULT_CREATED_TS = 'default:created_ts'
- DEFAULT_EVENT_TS = 'default:event_ts'
- EVENT_TS = 'event_ts'
- class feast.infra.utils.hbase_utils.HbaseUtils(conn: Optional[happybase.connection.Connection] = None, host: Optional[str] = None, port: Optional[int] = None, timeout=None)[source]
Bases:
object
Utils class to manage different Hbase operations.
- conn
happybase Connection to connect to hbase.
- host
hostname of the hbase thrift server.
- port
port in which thrift server is running.
- timeout
socket timeout in milliseconds.
- batch(table_name: str)[source]
Returns a ‘Batch’ instance that can be used for mass data manipulation in the hbase table.
- Parameters
table_name – Name of the Hbase table.
- check_if_table_exist(table_name: str)[source]
Check if table exists in hbase.
- Parameters
table_name – Name of the Hbase table.
- create_table(table_name: str, colm_family: List[str])[source]
Create table in hbase online store.
- Parameters
table_name – Name of the Hbase table.
colm_family – List of names of column families to be created in the hbase table.
- create_table_with_default_cf(table_name: str)[source]
Create table in hbase online store with one column family “default”.
- Parameters
table_name – Name of the Hbase table.
- put(table_name: str, row_key: str, data: dict)[source]
Store data in the hbase table.
- Parameters
table_name – Name of the Hbase table.
row_key – Row key of the row to be inserted to hbase table.
data – Mapping of column family name:column name to column values
- row(table_name: str, row_key, columns=None, timestamp=None, include_timestamp=False)[source]
Fetch a row of data from the hbase table.
- Parameters
table_name – Name of the Hbase table.
row_key – Row key of the row to be inserted to hbase table.
columns – the name of columns that needs to be fetched.
timestamp – timestamp specifies the maximum version the cells can have.
include_timestamp – specifies if (column, timestamp) to be return instead of only column.
- rows(table_name: str, row_keys, columns=None, timestamp=None, include_timestamp=False)[source]
Fetch multiple rows of data from the hbase table.
- Parameters
table_name – Name of the Hbase table.
row_keys – List of row key of the row to be inserted to hbase table.
columns – the name of columns that needs to be fetched.
timestamp – timestamp specifies the maximum version the cells can have.
include_timestamp – specifies if (column, timestamp) to be return instead of only column.