Source code for cdp_backend.pipeline.pipeline_config
#!/usr/bin/env python
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, Union
from dataclasses_json import DataClassJsonMixin
from gcsfs import GCSFileSystem
###############################################################################
[docs]
@dataclass
class EventGatherPipelineConfig(DataClassJsonMixin):
"""
Configuration options for the CDP event gather pipeline.
Parameters
----------
google_credentials_file: str
Path to the Google Service Account Credentials JSON file.
get_events_function_path: str
Path to the function (including function name) that supplies event data to the
CDP event gather pipeline.
gcs_bucket_name: Optional[str]
The name of the Google Storage bucket for CDP generated files.
Default: None (parse from the Google Service Account Credentials JSON file)
whisper_model_name: str
Passthrough to sr_models.whisper.WhisperModel.
whisper_model_confidence: Optional[float]
Passthrough to sr_models.whisper.WhisperModel.
default_event_gather_from_days_timedelta: int
Default number of days to subtract from current time to then pass to the
provided get_events function as the `from_dt` datetime.
Default: 2 (from_dt will be set to current datetime - 2 days)
"""
google_credentials_file: str
get_events_function_path: str
gcs_bucket_name: Optional[str] = None
_validated_gcs_bucket_name: Optional[str] = field(
init=False,
repr=False,
default=None,
)
whisper_model_name: str = "medium"
whisper_model_confidence: Optional[float] = None
default_event_gather_from_days_timedelta: int = 2
@property
def validated_gcs_bucket_name(self) -> str:
"""GCS bucket name after it has been validated to exist."""
if self._validated_gcs_bucket_name is None:
if self.gcs_bucket_name is not None:
bucket = self.gcs_bucket_name
else:
# Open the key to get the project id
with open(
self.google_credentials_file,
encoding="utf-8",
) as open_resource:
creds = json.load(open_resource, strict=False)
project_id = creds["project_id"]
# Remove all files in bucket
bucket = f"{project_id}.appspot.com"
# Validate
fs = GCSFileSystem(token=self.google_credentials_file)
try:
fs.ls(bucket)
self._validated_gcs_bucket_name = bucket
except FileNotFoundError as e:
raise ValueError(
f"Provided or infered GCS bucket name does not exist. ('{bucket}')"
) from e
return self._validated_gcs_bucket_name
[docs]
@dataclass
class EventIndexPipelineConfig(DataClassJsonMixin):
"""
Configuration options for the CDP event index pipeline.
Parameters
----------
google_credentials_file: str
Path to the Google Service Account Credentials JSON file.
gcs_bucket_name: Optional[str]
The name of the Google Storage bucket for CDP generated files.
Default: None (parse from the Google Service Account Credentials JSON file)
datetime_weighting_days_decay: int
The number of days that grams from an event should be labeled as more relevant.
Default: 30 (grams from events less than 30 days old will generally be valued
higher than their pure relevance score)
local_storage_dir: Union[str, Path]
The local storage directory to store the chunked index prior to upload.
Default: "index/" (current working directory / index)
"""
google_credentials_file: str
gcs_bucket_name: Optional[str] = None
_validated_gcs_bucket_name: Optional[str] = field(
init=False,
repr=False,
default=None,
)
datetime_weighting_days_decay: int = 30
local_storage_dir: Union[str, Path] = "index/"
@property
def validated_gcs_bucket_name(self) -> str:
"""GCS bucket name after it has been validated to exist."""
if self._validated_gcs_bucket_name is None:
if self.gcs_bucket_name is not None:
bucket = self.gcs_bucket_name
else:
# Open the key to get the project id
with open(
self.google_credentials_file,
encoding="utf-8",
) as open_resource:
creds = json.load(open_resource, strict=False)
project_id = creds["project_id"]
# Remove all files in bucket
bucket = f"{project_id}.appspot.com"
# Validate
fs = GCSFileSystem(token=self.google_credentials_file)
try:
fs.ls(bucket)
self._validated_gcs_bucket_name = bucket
except FileNotFoundError as e:
raise ValueError(
f"Provided or infered GCS bucket name does not exist. ('{bucket}')"
) from e
return self._validated_gcs_bucket_name