Source code for feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source

import logging
from typing import Dict, Optional

import pandas as pd
import pytest
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from feast.data_source import DataSource
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import (
    PostgreSQLOfflineStoreConfig,
    PostgreSQLSource,
)
from feast.infra.utils.postgres.connection_utils import df_to_postgres_table
from tests.integration.feature_repos.universal.data_source_creator import (
    DataSourceCreator,
)
from tests.integration.feature_repos.universal.online_store_creator import (
    OnlineStoreCreator,
)

logger = logging.getLogger(__name__)

POSTGRES_USER = "test"
POSTGRES_PASSWORD = "test"
POSTGRES_DB = "test"


[docs]@pytest.fixture(scope="session") def postgres_container(): container = ( DockerContainer("postgres:latest") .with_exposed_ports(5432) .with_env("POSTGRES_USER", POSTGRES_USER) .with_env("POSTGRES_PASSWORD", POSTGRES_PASSWORD) .with_env("POSTGRES_DB", POSTGRES_DB) ) container.start() log_string_to_wait_for = "database system is ready to accept connections" waited = wait_for_logs( container=container, predicate=log_string_to_wait_for, timeout=30, interval=10, ) logger.info("Waited for %s seconds until postgres container was up", waited) yield container container.stop()
[docs]class PostgreSQLDataSourceCreator(DataSourceCreator, OnlineStoreCreator): def __init__( self, project_name: str, fixture_request: pytest.FixtureRequest, **kwargs ): super().__init__( project_name, ) self.project_name = project_name self.container = fixture_request.getfixturevalue("postgres_container") if not self.container: raise RuntimeError( "In order to use this data source " "'feast.infra.offline_stores.contrib.postgres_offline_store.tests' " "must be include into pytest plugins" ) self.offline_store_config = PostgreSQLOfflineStoreConfig( type="postgres", host="localhost", port=self.container.get_exposed_port(5432), database=self.container.env["POSTGRES_DB"], db_schema="public", user=self.container.env["POSTGRES_USER"], password=self.container.env["POSTGRES_PASSWORD"], )
[docs] def create_data_source( self, df: pd.DataFrame, destination_name: str, suffix: Optional[str] = None, timestamp_field="ts", created_timestamp_column="created_ts", field_mapping: Dict[str, str] = None, ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) if self.offline_store_config: df_to_postgres_table(self.offline_store_config, df, destination_name) return PostgreSQLSource( name=destination_name, query=f"SELECT * FROM {destination_name}", timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, )
[docs] def create_offline_store_config(self) -> PostgreSQLOfflineStoreConfig: assert self.offline_store_config return self.offline_store_config
[docs] def get_prefixed_table_name(self, suffix: str) -> str: return f"{self.project_name}_{suffix}"
[docs] def create_online_store(self) -> Dict[str, str]: assert self.container return { "type": "postgres", "host": "localhost", "port": self.container.get_exposed_port(5432), "database": POSTGRES_DB, "db_schema": "feature_store", "user": POSTGRES_USER, "password": POSTGRES_PASSWORD, }
[docs] def create_saved_dataset_destination(self): # FIXME: ... return None
[docs] def teardown(self): pass