Source code for feast.driver_test_data

# This module generates dummy data to be used for tests and examples.
import itertools
from enum import Enum

import numpy as np
import pandas as pd
from pytz import FixedOffset, timezone, utc

from feast.infra.offline_stores.offline_utils import (
    DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
)


[docs]class EventTimestampType(Enum): TZ_NAIVE = 0 TZ_AWARE_UTC = 1 TZ_AWARE_FIXED_OFFSET = 2 TZ_AWARE_US_PACIFIC = 3
def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampType): if t == EventTimestampType.TZ_NAIVE: return event_timestamp elif t == EventTimestampType.TZ_AWARE_UTC: return event_timestamp.replace(tzinfo=utc) elif t == EventTimestampType.TZ_AWARE_FIXED_OFFSET: return event_timestamp.replace(tzinfo=utc).astimezone(FixedOffset(60)) elif t == EventTimestampType.TZ_AWARE_US_PACIFIC: return event_timestamp.replace(tzinfo=utc).astimezone(timezone("US/Pacific"))
[docs]def create_orders_df( customers, drivers, start_date, end_date, order_count, locations=None, ) -> pd.DataFrame: """ Example df generated by this function (if locations): | order_id | driver_id | customer_id | origin_id | destination_id | order_is_success | event_timestamp | +----------+-----------+-------------+-----------+----------------+------------------+---------------------+ | 100 | 5004 | 1007 | 1 | 18 | 0 | 2021-03-10 19:31:15 | | 101 | 5003 | 1006 | 24 | 42 | 0 | 2021-03-11 22:02:50 | | 102 | 5010 | 1005 | 19 | 12 | 0 | 2021-03-13 00:34:24 | | 103 | 5010 | 1001 | 35 | 8 | 1 | 2021-03-14 03:05:59 | """ df = pd.DataFrame() df["order_id"] = [order_id for order_id in range(100, 100 + order_count)] df["driver_id"] = np.random.choice(drivers, order_count) df["customer_id"] = np.random.choice(customers, order_count) if locations: location_pairs = np.array(list(itertools.permutations(locations, 2))) locations_sample = location_pairs[ np.random.choice(len(location_pairs), order_count) ].T df["origin_id"] = locations_sample[0] df["destination_id"] = locations_sample[1] df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32) df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [ _convert_event_timestamp( pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"), EventTimestampType(idx % 4), ) for idx, dt in enumerate( pd.date_range(start=start_date, end=end_date, periods=order_count) ) ] df.sort_values( by=[ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id", ], inplace=True, ) return df
[docs]def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame: """ Example df generated by this function: | event_timestamp | driver_id | conv_rate | acc_rate | avg_daily_trips | created | |------------------+-----------+-----------+----------+-----------------+------------------| | 2021-03-17 19:31 | 5010 | 0.229297 | 0.685843 | 861 | 2021-03-24 19:34 | | 2021-03-17 20:31 | 5010 | 0.781655 | 0.861280 | 769 | 2021-03-24 19:34 | | 2021-03-17 21:31 | 5010 | 0.150333 | 0.525581 | 778 | 2021-03-24 19:34 | | 2021-03-17 22:31 | 5010 | 0.951701 | 0.228883 | 570 | 2021-03-24 19:34 | | 2021-03-17 23:31 | 5010 | 0.819598 | 0.262503 | 473 | 2021-03-24 19:34 | | | ... | ... | ... | ... | | | 2021-03-24 16:31 | 5001 | 0.061585 | 0.658140 | 477 | 2021-03-24 19:34 | | 2021-03-24 17:31 | 5001 | 0.088949 | 0.303897 | 618 | 2021-03-24 19:34 | | 2021-03-24 18:31 | 5001 | 0.096652 | 0.747421 | 480 | 2021-03-24 19:34 | | 2021-03-17 19:31 | 5005 | 0.142936 | 0.707596 | 466 | 2021-03-24 19:34 | | 2021-03-17 19:31 | 5005 | 0.142936 | 0.707596 | 466 | 2021-03-24 19:34 | """ df_hourly = pd.DataFrame( { "event_timestamp": [ pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") for dt in pd.date_range( start=start_date, end=end_date, freq="1H", closed="left" ) ] # include a fixed timestamp for get_historical_features in the quickstart + [ pd.Timestamp( year=2021, month=4, day=12, hour=7, minute=0, second=0, tz="UTC" ) ] } ) df_all_drivers = pd.DataFrame() for driver in drivers: df_hourly_copy = df_hourly.copy() df_hourly_copy["driver_id"] = driver df_all_drivers = pd.concat([df_hourly_copy, df_all_drivers]) df_all_drivers.reset_index(drop=True, inplace=True) rows = df_all_drivers["event_timestamp"].count() df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32) df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32) df_all_drivers["avg_daily_trips"] = np.random.randint(0, 1000, size=rows).astype( np.int32 ) df_all_drivers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) # Create duplicate rows that should be filtered by created timestamp # TODO: These duplicate rows area indirectly being filtered out by the point in time join already. We need to # inject a bad row at a timestamp where we know it will get joined to the entity dataframe, and then test that # we are actually filtering it with the created timestamp late_row = df_all_drivers[rows // 2 : rows // 2 + 1] df_all_drivers = pd.concat([df_all_drivers, late_row, late_row], ignore_index=True) return df_all_drivers
[docs]def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.DataFrame: """ Example df generated by this function: | event_timestamp | customer_id | current_balance | avg_passenger_count | lifetime_trip_count | created | |------------------+-------------+-----------------+---------------------+---------------------+------------------| | 2021-03-17 19:31 | 1010 | 0.889188 | 0.049057 | 412 | 2021-03-24 19:38 | | 2021-03-18 19:31 | 1010 | 0.979273 | 0.212630 | 639 | 2021-03-24 19:38 | | 2021-03-19 19:31 | 1010 | 0.976549 | 0.176881 | 70 | 2021-03-24 19:38 | | 2021-03-20 19:31 | 1010 | 0.273697 | 0.325012 | 68 | 2021-03-24 19:38 | | 2021-03-21 19:31 | 1010 | 0.438262 | 0.313009 | 192 | 2021-03-24 19:38 | | | ... | ... | ... | ... | | | 2021-03-19 19:31 | 1001 | 0.738860 | 0.857422 | 344 | 2021-03-24 19:38 | | 2021-03-20 19:31 | 1001 | 0.848397 | 0.745989 | 106 | 2021-03-24 19:38 | | 2021-03-21 19:31 | 1001 | 0.301552 | 0.185873 | 812 | 2021-03-24 19:38 | | 2021-03-22 19:31 | 1001 | 0.943030 | 0.561219 | 322 | 2021-03-24 19:38 | | 2021-03-23 19:31 | 1001 | 0.354919 | 0.810093 | 273 | 2021-03-24 19:38 | """ df_daily = pd.DataFrame( { "event_timestamp": [ pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") for dt in pd.date_range( start=start_date, end=end_date, freq="1D", closed="left" ) ] } ) df_all_customers = pd.DataFrame() for customer in customers: df_daily_copy = df_daily.copy() df_daily_copy["customer_id"] = customer df_all_customers = pd.concat([df_daily_copy, df_all_customers]) df_all_customers.reset_index(drop=True, inplace=True) rows = df_all_customers["event_timestamp"].count() df_all_customers["current_balance"] = np.random.random(size=rows).astype(np.float32) df_all_customers["avg_passenger_count"] = np.random.random(size=rows).astype( np.float32 ) df_all_customers["lifetime_trip_count"] = np.random.randint( 0, 1000, size=rows ).astype(np.int32) # TODO: Remove created timestamp in order to test whether its really optional df_all_customers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) return df_all_customers
[docs]def create_location_stats_df(locations, start_date, end_date) -> pd.DataFrame: """ Example df generated by this function: | event_timestamp | location_id | temperature | created | +------------------+-------------+-------------+------------------+ | 2021-03-17 19:31 | 1 | 74 | 2021-03-24 19:38 | | 2021-03-17 20:31 | 24 | 63 | 2021-03-24 19:38 | | 2021-03-17 21:31 | 19 | 65 | 2021-03-24 19:38 | | 2021-03-17 22:31 | 35 | 86 | 2021-03-24 19:38 | """ df_hourly = pd.DataFrame( { "event_timestamp": [ pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") for dt in pd.date_range( start=start_date, end=end_date, freq="1H", closed="left" ) ] } ) df_all_locations = pd.DataFrame() for location in locations: df_hourly_copy = df_hourly.copy() df_hourly_copy["location_id"] = location df_all_locations = pd.concat([df_hourly_copy, df_all_locations]) df_all_locations.reset_index(drop=True, inplace=True) rows = df_all_locations["event_timestamp"].count() df_all_locations["temperature"] = np.random.randint(50, 100, size=rows).astype( np.int32 ) # TODO: Remove created timestamp in order to test whether its really optional df_all_locations["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) return df_all_locations
[docs]def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame: """ Example df generated by this function: | event_timestamp | num_rides | avg_ride_length | created | |------------------+-------------+-----------------+------------------| | 2021-03-17 19:00 | 99 | 0.889188 | 2021-03-24 19:38 | | 2021-03-18 19:00 | 52 | 0.979273 | 2021-03-24 19:38 | | 2021-03-19 19:00 | 66 | 0.976549 | 2021-03-24 19:38 | | 2021-03-20 19:00 | 84 | 0.273697 | 2021-03-24 19:38 | | 2021-03-21 19:00 | 89 | 0.438262 | 2021-03-24 19:38 | | | ... | ... | | | 2021-03-24 19:00 | 54 | 0.738860 | 2021-03-24 19:38 | | 2021-03-25 19:00 | 58 | 0.848397 | 2021-03-24 19:38 | | 2021-03-26 19:00 | 69 | 0.301552 | 2021-03-24 19:38 | | 2021-03-27 19:00 | 63 | 0.943030 | 2021-03-24 19:38 | | 2021-03-28 19:00 | 79 | 0.354919 | 2021-03-24 19:38 | """ df_daily = pd.DataFrame( { "event_timestamp": [ pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") for dt in pd.date_range( start=start_date, end=end_date, freq="1D", closed="left" ) ] } ) rows = df_daily["event_timestamp"].count() df_daily["num_rides"] = np.random.randint(50, 100, size=rows).astype(np.int32) df_daily["avg_ride_length"] = np.random.random(size=rows).astype(np.float32) # TODO: Remove created timestamp in order to test whether its really optional df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) return df_daily
[docs]def create_field_mapping_df(start_date, end_date) -> pd.DataFrame: """ Example df generated by this function: | event_timestamp | column_name | created | |------------------+-------------+------------------| | 2021-03-17 19:00 | 99 | 2021-03-24 19:38 | | 2021-03-17 19:00 | 22 | 2021-03-24 19:38 | | 2021-03-17 19:00 | 7 | 2021-03-24 19:38 | | 2021-03-17 19:00 | 45 | 2021-03-24 19:38 | """ size = 10 df = pd.DataFrame() df["column_name"] = np.random.randint(1, 100, size=size).astype(np.int32) df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [ _convert_event_timestamp( pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"), EventTimestampType(idx % 4), ) for idx, dt in enumerate( pd.date_range(start=start_date, end=end_date, periods=size) ) ] df["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) return df