Source code for feast.infra.registry.gcs

import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse

from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage


[docs]class GCSRegistryStore(RegistryStore): def __init__(self, registry_config: RegistryConfig, repo_path: Path): uri = registry_config.path try: import google.cloud.storage as storage except ImportError as e: from feast.errors import FeastExtrasDependencyImportError raise FeastExtrasDependencyImportError("gcp", str(e)) self.gcs_client = storage.Client() self._uri = urlparse(uri) self._bucket = self._uri.hostname self._blob = self._uri.path.lstrip("/")
[docs] @log_exceptions_and_usage(registry="gs") def get_registry_proto(self): import google.cloud.storage as storage from google.cloud.exceptions import NotFound file_obj = TemporaryFile() registry_proto = RegistryProto() try: bucket = self.gcs_client.get_bucket(self._bucket) except NotFound: raise Exception( f"No bucket named {self._bucket} exists; please create it first." ) if storage.Blob(bucket=bucket, name=self._blob).exists(self.gcs_client): self.gcs_client.download_blob_to_file( self._uri.geturl(), file_obj, timeout=30 ) file_obj.seek(0) registry_proto.ParseFromString(file_obj.read()) return registry_proto raise FileNotFoundError( f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?' )
[docs] @log_exceptions_and_usage(registry="gs") def update_registry_proto(self, registry_proto: RegistryProto): self._write_registry(registry_proto)
[docs] def teardown(self): from google.cloud.exceptions import NotFound gs_bucket = self.gcs_client.get_bucket(self._bucket) try: gs_bucket.delete_blob(self._blob) except NotFound: # If the blob deletion fails with NotFound, it has already been deleted. pass
def _write_registry(self, registry_proto: RegistryProto): registry_proto.version_id = str(uuid.uuid4()) registry_proto.last_updated.FromDatetime(datetime.utcnow()) # we have already checked the bucket exists so no need to do it again gs_bucket = self.gcs_client.get_bucket(self._bucket) blob = gs_bucket.blob(self._blob) file_obj = TemporaryFile() file_obj.write(registry_proto.SerializeToString()) file_obj.seek(0) blob.upload_from_file(file_obj)