Source code for cdp_backend.pipeline.process_event_index_chunk_pipeline

#!/usr/bin/env python

from __future__ import annotations

from pathlib import Path
from typing import NamedTuple

import fireo
import pandas as pd
from prefect import Flow, task, unmapped

from ..database import functions as db_functions
from ..database import models as db_models
from ..file_store import functions as fs_functions
from .generate_event_index_pipeline import REMOTE_INDEX_CHUNK_DIR
from .pipeline_config import EventIndexPipelineConfig

###############################################################################


[docs] class AlmostCompleteIndexedEventGram(NamedTuple): event_id: str unstemmed_gram: str stemmed_gram: str context_span: str value: float datetime_weighted_value: float
@task def pull_chunk( credentials_file: str, bucket_name: str, filename: str, ) -> str: return fs_functions.download_file( credentials_file=credentials_file, bucket=bucket_name, remote_filepath=filename, save_path=filename, ) @task def chunk_n_grams( chunk_path: str, upload_batch_size: int = 500, ) -> list[list[AlmostCompleteIndexedEventGram]]: """ Split the large n_grams dataframe into multiple lists of IndexedEventGram models for batched, mapped, upload. """ # Read index chunk n_grams_df = pd.read_parquet(chunk_path) # Split single large dataframe into many dataframes n_grams_dfs = [ n_grams_df[i : i + upload_batch_size] for i in range(0, n_grams_df.shape[0], upload_batch_size) ] # Convert each dataframe into a list of indexed event gram event_gram_chunks: list[list[AlmostCompleteIndexedEventGram]] = [] for n_gram_df_chunk in n_grams_dfs: event_gram_chunk: list[AlmostCompleteIndexedEventGram] = [] for _, row in n_gram_df_chunk.iterrows(): event_gram_chunk.append( AlmostCompleteIndexedEventGram( event_id=row.event_id, unstemmed_gram=row.unstemmed_gram, stemmed_gram=row.stemmed_gram, context_span=row.context_span, value=row.tfidf, datetime_weighted_value=row.datetime_weighted_tfidf, ) ) event_gram_chunks.append(event_gram_chunk) return event_gram_chunks @task def store_n_gram_chunk( n_gram_chunk: list[AlmostCompleteIndexedEventGram], credentials_file: str, ) -> None: """ Write all IndexedEventGrams in a single batch. This isn't about an atomic batch but reducing the total upload time. """ # Init batch batch = fireo.batch() # Trigger upserts for all items event_lut: dict[str, db_models.Event] = {} for almost_complete_ieg in n_gram_chunk: if almost_complete_ieg.event_id not in event_lut: event_lut[almost_complete_ieg.event_id] = db_models.Event.collection.get( f"{db_models.Event.collection_name}/{almost_complete_ieg.event_id}" ) # Construct true ieg ieg = db_models.IndexedEventGram() ieg.event_ref = event_lut[almost_complete_ieg.event_id] ieg.unstemmed_gram = almost_complete_ieg.unstemmed_gram ieg.stemmed_gram = almost_complete_ieg.stemmed_gram ieg.context_span = almost_complete_ieg.context_span ieg.value = almost_complete_ieg.value ieg.datetime_weighted_value = almost_complete_ieg.datetime_weighted_value db_functions.upload_db_model( db_model=ieg, credentials_file=credentials_file, batch=batch, ) # Commit batch.commit()
[docs] def create_event_index_upload_pipeline( config: EventIndexPipelineConfig, index_chunk: str | Path, upload_batch_size: int = 500, ) -> Flow: """ Create the Prefect Flow object to preview, run, or visualize for uploading a generated index for all events in the database. Parameters ---------- config: EventIndexPipelineConfig Configuration options for the pipeline. index_chunk: int Path to the index chunk file to process. upload_batch_size: int Number of ngrams to upload to database in a single batch. Default: 500 (max) Returns ------- flow: Flow The constructed CDP Event Index Pipeline as a Prefect Flow. """ with Flow("CDP Event Index Pipeline") as flow: # Pull the file index_chunk_local = pull_chunk( credentials_file=config.google_credentials_file, bucket_name=config.validated_gcs_bucket_name, filename=f"{REMOTE_INDEX_CHUNK_DIR}/{index_chunk}", ) # Route to remote database storage chunked_scored_n_grams = chunk_n_grams( index_chunk_local, upload_batch_size=upload_batch_size, ) store_n_gram_chunk.map( n_gram_chunk=chunked_scored_n_grams, credentials_file=unmapped(config.google_credentials_file), ) return flow