# Copyright 2021 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
from pydantic import StrictBool, StrictStr
from pydantic.typing import Literal, Union
from feast import Entity, FeatureView, utils
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import get_user_agent, log_exceptions_and_usage, tracing_span
try:
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError
raise FeastExtrasDependencyImportError("aws", str(e))
logger = logging.getLogger(__name__)
[docs]class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for DynamoDB store"""
type: Literal["dynamodb"] = "dynamodb"
"""Online store type selector"""
batch_size: int = 40
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
endpoint_url: Union[str, None] = None
"""DynamoDB local development endpoint Url, i.e. http://localhost:8000"""
region: StrictStr
"""AWS Region Name"""
table_name_template: StrictStr = "{project}.{table_name}"
"""DynamoDB table name template"""
consistent_reads: StrictBool = False
"""Whether to read from Dynamodb by forcing consistent reads"""
[docs]class DynamoDBOnlineStore(OnlineStore):
"""
AWS DynamoDB implementation of the online store interface.
Attributes:
_dynamodb_client: Boto3 DynamoDB client.
_dynamodb_resource: Boto3 DynamoDB resource.
"""
_dynamodb_client = None
_dynamodb_resource = None
[docs] @log_exceptions_and_usage(online_store="dynamodb")
def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
"""
Update tables from the DynamoDB Online Store.
Args:
config: The RepoConfig for the current FeatureStore.
tables_to_delete: Tables to delete from the DynamoDB Online Store.
tables_to_keep: Tables to keep in the DynamoDB Online Store.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client = self._get_dynamodb_client(
online_config.region, online_config.endpoint_url
)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
for table_instance in tables_to_keep:
try:
dynamodb_resource.create_table(
TableName=_get_table_name(online_config, config, table_instance),
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "entity_id", "AttributeType": "S"}
],
BillingMode="PAY_PER_REQUEST",
)
except ClientError as ce:
# If the table creation fails with ResourceInUseException,
# it means the table already exists or is being created.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceInUseException":
raise
for table_instance in tables_to_keep:
dynamodb_client.get_waiter("table_exists").wait(
TableName=_get_table_name(online_config, config, table_instance)
)
for table_to_delete in tables_to_delete:
_delete_table_idempotent(
dynamodb_resource,
_get_table_name(online_config, config, table_to_delete),
)
[docs] def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
):
"""
Delete tables from the DynamoDB Online Store.
Args:
config: The RepoConfig for the current FeatureStore.
tables: Tables to delete from the feature repo.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
for table in tables:
_delete_table_idempotent(
dynamodb_resource, _get_table_name(online_config, config, table)
)
[docs] @log_exceptions_and_usage(online_store="dynamodb")
def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
"""
Write a batch of feature rows to online DynamoDB store.
Note: This method applies a ``batch_writer`` to automatically handle any unprocessed items
and resend them as needed, this is useful if you're loading a lot of data at a time.
Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key,
a dict containing feature values, an event timestamp for the row, and
the created timestamp for the row if it exists.
progress: Optional function to be called once every mini-batch of rows is written to
the online store. Can be used to display progress.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
self._write_batch_non_duplicates(table_instance, data, progress, config)
[docs] @log_exceptions_and_usage(online_store="dynamodb")
def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.
Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]
batch_size = online_config.batch_size
entity_ids_iter = iter(entity_ids)
while True:
batch = list(itertools.islice(entity_ids_iter, batch_size))
batch_result: List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = []
# No more items to insert
if len(batch) == 0:
break
batch_entity_ids = {
table_instance.name: {
"Keys": [{"entity_id": entity_id} for entity_id in batch],
"ConsistentRead": online_config.consistent_reads,
}
}
with tracing_span(name="remote_call"):
response = dynamodb_resource.batch_get_item(
RequestItems=batch_entity_ids,
)
response = response.get("Responses")
table_responses = response.get(table_instance.name)
if table_responses:
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
)
entity_idx = 0
for tbl_res in table_responses:
entity_id = tbl_res["entity_id"]
while entity_id != batch[entity_idx]:
batch_result.append((None, None))
entity_idx += 1
res = {}
for feature_name, value_bin in tbl_res["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
batch_result.append(
(datetime.fromisoformat(tbl_res["event_ts"]), res)
)
entity_idx += 1
# Not all entities in a batch may have responses
# Pad with remaining values in batch that were not found
batch_size_nones = ((None, None),) * (len(batch) - len(batch_result))
batch_result.extend(batch_size_nones)
result.extend(batch_result)
return result
def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
return self._dynamodb_client
def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(
region, endpoint_url
)
return self._dynamodb_resource
def _sort_dynamodb_response(self, responses: list, order: list):
"""DynamoDB Batch Get Item doesn't return items in a particular order."""
# Assign an index to order
order_with_index = {value: idx for idx, value in enumerate(order)}
# Sort table responses by index
table_responses_ordered = [
(order_with_index[tbl_res["entity_id"]], tbl_res) for tbl_res in responses
]
table_responses_ordered = sorted(
table_responses_ordered, key=lambda tup: tup[0]
)
_, table_responses_ordered = zip(*table_responses_ordered)
return table_responses_ordered
@log_exceptions_and_usage(online_store="dynamodb")
def _write_batch_non_duplicates(
self,
table_instance,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
config: RepoConfig,
):
"""Deduplicate write batch request items on ``entity_id`` primary key."""
with table_instance.batch_writer(overwrite_by_pkeys=["entity_id"]) as batch:
for entity_key, features, timestamp, created_ts in data:
entity_id = compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
batch.put_item(
Item={
"entity_id": entity_id, # PartitionKey
"event_ts": str(utils.make_tzaware(timestamp)),
"values": {
k: v.SerializeToString()
for k, v in features.items() # Serialized Features
},
}
)
if progress:
progress(1)
def _initialize_dynamodb_client(region: str, endpoint_url: Optional[str] = None):
return boto3.client(
"dynamodb",
region_name=region,
endpoint_url=endpoint_url,
config=Config(user_agent=get_user_agent()),
)
def _initialize_dynamodb_resource(region: str, endpoint_url: Optional[str] = None):
return boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url)
# TODO(achals): This form of user-facing templating is experimental.
# Please refer to https://github.com/feast-dev/feast/issues/2438 before building on top of it,
def _get_table_name(
online_config: DynamoDBOnlineStoreConfig, config: RepoConfig, table: FeatureView
) -> str:
return online_config.table_name_template.format(
project=config.project, table_name=table.name
)
def _delete_table_idempotent(
dynamodb_resource,
table_name: str,
):
try:
table = dynamodb_resource.Table(table_name)
table.delete()
logger.info(f"Dynamo table {table_name} was deleted")
except ClientError as ce:
# If the table deletion fails with ResourceNotFoundException,
# it means the table has already been deleted.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceNotFoundException":
raise
else:
logger.warning(f"Trying to delete table that doesn't exist: {table_name}")
[docs]class DynamoDBTable(InfraObject):
"""
A DynamoDB table managed by Feast.
Attributes:
name: The name of the table.
region: The region of the table.
endpoint_url: Local DynamoDB Endpoint Url.
_dynamodb_client: Boto3 DynamoDB client.
_dynamodb_resource: Boto3 DynamoDB resource.
"""
region: str
endpoint_url = None
_dynamodb_client = None
_dynamodb_resource = None
def __init__(self, name: str, region: str, endpoint_url: Optional[str] = None):
super().__init__(name)
self.region = region
self.endpoint_url = endpoint_url
[docs] def to_infra_object_proto(self) -> InfraObjectProto:
dynamodb_table_proto = self.to_proto()
return InfraObjectProto(
infra_object_class_type=DYNAMODB_INFRA_OBJECT_CLASS_TYPE,
dynamodb_table=dynamodb_table_proto,
)
[docs] def to_proto(self) -> Any:
dynamodb_table_proto = DynamoDBTableProto()
dynamodb_table_proto.name = self.name
dynamodb_table_proto.region = self.region
return dynamodb_table_proto
[docs] @staticmethod
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
return DynamoDBTable(
name=infra_object_proto.dynamodb_table.name,
region=infra_object_proto.dynamodb_table.region,
)
[docs] @staticmethod
def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
return DynamoDBTable(
name=dynamodb_table_proto.name,
region=dynamodb_table_proto.region,
)
[docs] def update(self):
dynamodb_client = self._get_dynamodb_client(self.region, self.endpoint_url)
dynamodb_resource = self._get_dynamodb_resource(self.region, self.endpoint_url)
try:
dynamodb_resource.create_table(
TableName=f"{self.name}",
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "entity_id", "AttributeType": "S"}
],
BillingMode="PAY_PER_REQUEST",
)
except ClientError as ce:
# If the table creation fails with ResourceInUseException,
# it means the table already exists or is being created.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceInUseException":
raise
dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")
[docs] def teardown(self):
dynamodb_resource = self._get_dynamodb_resource(self.region, self.endpoint_url)
_delete_table_idempotent(dynamodb_resource, self.name)
def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
return self._dynamodb_client
def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(
region, endpoint_url
)
return self._dynamodb_resource