cdp_data.utils package

Submodules

cdp_data.utils.db_utils module

cdp_data.utils.db_utils.connect_to_database(infrastructure_slug: str) None[source]

Simple function to shorten how many imports and code it takes to connect to a CDP database.

cdp_data.utils.db_utils.expand_models_from_pd_column(data: DataFrame, model_col: str, model_attr_rename_lut: Dict[str, str], tqdm_kws: Dict[str, Any] | None = None) DataFrame[source]
cdp_data.utils.db_utils.load_from_model_reference(model_ref: ReferenceDocLoader) Model[source]

Load a CDP database model from a ReferenceDocLoader or potentially from cache.

Parameters:
model_ref: fireo.queries.query_wrapper.ReferenceDocLoader

The model reference to load.

Returns:
model: Model

The loaded (retrieved from database or cache) CDP database model.

See also

cdp_data.utils.db_utils.load_model_from_reference_joiner
cdp_data.utils.db_utils.load_model_from_pd_columns

Notes

LRU cache size is 1024 items.

cdp_data.utils.db_utils.load_model_from_pd_columns(data: DataFrame, join_id_col: str, model_ref_col: str, drop_original_model_ref: bool = True, tqdm_kws: Dict[str, Any] | None = None) DataFrame[source]

Load a model reference and attach the loaded model back to the original DataFrame.

Parameters:
data: pd.DataFrame

The DataFrame which contains a model ReferenceDocLoader to fetch and reattach the loaded model to.

join_id_col: str

The column name to use for joining the original provided DataFrame to the loaded models DataFrame.

model_ref_col: str

The column name which contains the model ReferenceDocLoader objects.

drop_original_model_ref: bool

After loading and joining all models to the DataFrame, should the original model_ref_col be dropped. Default: True (drop the original model_ref_column)

tqdm_kws: Dict[str, Any]

A dictionary with extra keyword arguments to provide to tqdm progress bars. Must not include the desc keyword argument.

Returns:
data: pd.DataFrame

A DataFrame with all of the original data and all the models loaded from the original DataFrame’s model_ref_col ReferenceDocLoader objects.

Notes

This function loads all models using a threadpool. Because of this threading, the order of the rows may be different from the original DataFrame to the result DataFrame.

Additionally, this function utilizes an LRU cache during model loading.

Examples

Fetch sessions from a CDP database and then fetch and attach all referenced events to each session.

>>> from cdp_backend.database import models as db_models
... from cdp_data.utils import db_utils
... import pandas as pd
... # Connect, fetch sessions and unpack, threaded event attachment to session df
... db_utils.connect_to_database("cdp-seattle-21723dcf")
... sessions = pd.DataFrame([
...     s.to_dict() for s in db_models.Session.collection.fetch()
... ])
... # Fetch all models in the `event_ref` column and join on session id
... event_attached = db_utils.load_model_from_pd_columns(
...     sessions,
...     join_id_col="id",
...     model_ref_col="event_ref",
... )

cdp_data.utils.fs_utils module

cdp_data.utils.fs_utils.connect_to_filestore(infrastructure_slug: str) GCSFileSystem[source]

Simple function to shorten how many imports and code it takes to connect to a CDP file store.

cdp_data.utils.incremental_average module

class cdp_data.utils.incremental_average.IncrementalAverage[source]

Bases: object

add(addition: int | float | ndarray) int | float | ndarray[source]

Add a new value to the average.

property current_mean: int | float | ndarray | None

Get the current mean.

property current_size: int

Get the number of objects that have been averaged.

class cdp_data.utils.incremental_average.IncrementalStats[source]

Bases: object

add(addition: int | float | ndarray) IncrementalStats[source]

Add a new value to mean (and check for new max and min).

property current_max: int | float | ndarray | None

Get the current max.

property current_mean: int | float | ndarray | None

Get the current mean.

property current_min: int | float | ndarray | None

Get the current min.

property current_size: int

Get the number of objects that have been managed.

cdp_data.utils.incremental_average.update_average(current_average: int | float | ndarray, current_size: int, addition: int | float | ndarray) int | float | ndarray[source]

Module contents

Utils package for cdp_data.

cdp_data.utils.connect_to_infrastructure(infrastructure_slug: str) GCSFileSystem[source]

Simple function to shorten how many imports and code it takes to connect to a CDP infrastructure.