Source code for feast.infra.registry.s3

import os
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse

from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist
from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage

try:
    import boto3
except ImportError as e:
    from feast.errors import FeastExtrasDependencyImportError

    raise FeastExtrasDependencyImportError("aws", str(e))


[docs]class S3RegistryStore(RegistryStore): def __init__(self, registry_config: RegistryConfig, repo_path: Path): uri = registry_config.path self._uri = urlparse(uri) self._bucket = self._uri.hostname self._key = self._uri.path.lstrip("/") self._boto_extra_args = registry_config.s3_additional_kwargs or {} self.s3_client = boto3.resource( "s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL") )
[docs] @log_exceptions_and_usage(registry="s3") def get_registry_proto(self): file_obj = TemporaryFile() registry_proto = RegistryProto() try: from botocore.exceptions import ClientError except ImportError as e: from feast.errors import FeastExtrasDependencyImportError raise FeastExtrasDependencyImportError("aws", str(e)) try: bucket = self.s3_client.Bucket(self._bucket) self.s3_client.meta.client.head_bucket(Bucket=bucket.name) except ClientError as e: # If a client error is thrown, then check that it was a 404 error. # If it was a 404 error, then the bucket does not exist. error_code = int(e.response["Error"]["Code"]) if error_code == 404: raise S3RegistryBucketNotExist(self._bucket) else: raise S3RegistryBucketForbiddenAccess(self._bucket) from e try: obj = bucket.Object(self._key) obj.download_fileobj(file_obj) file_obj.seek(0) registry_proto.ParseFromString(file_obj.read()) return registry_proto except ClientError as e: raise FileNotFoundError( f"Error while trying to locate Registry at path {self._uri.geturl()}" ) from e
[docs] @log_exceptions_and_usage(registry="s3") def update_registry_proto(self, registry_proto: RegistryProto): self._write_registry(registry_proto)
[docs] def teardown(self): self.s3_client.Object(self._bucket, self._key).delete()
def _write_registry(self, registry_proto: RegistryProto): registry_proto.version_id = str(uuid.uuid4()) registry_proto.last_updated.FromDatetime(datetime.utcnow()) # we have already checked the bucket exists so no need to do it again file_obj = TemporaryFile() file_obj.write(registry_proto.SerializeToString()) file_obj.seek(0) self.s3_client.Bucket(self._bucket).put_object( Body=file_obj, Key=self._key, **self._boto_extra_args )