Source code for cdp_data.datasets

#!/usr/bin/env python

import logging
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import pandas as pd
from cdp_backend.database import models as db_models
from cdp_backend.pipeline.transcript_model import Transcript
from cdp_backend.utils.file_utils import resource_copy
from dataclasses_json import dataclass_json
from fireo.models import Model
from gcsfs import GCSFileSystem
from tqdm.contrib.concurrent import process_map, thread_map

from .constants import DEFAULT_DATASET_STORAGE_DIR
from .utils import connect_to_infrastructure, db_utils

###############################################################################

log = logging.getLogger(__name__)

###############################################################################
# Fetch utils


@dataclass
class _VideoFetchParams:
    session_id: str
    session_key: str
    event_id: str
    video_uri: str
    parent_cache_dir: Path
    fs: GCSFileSystem
    raise_on_error: bool


@dataclass_json
@dataclass
class _MatchingVideo:
    session_key: str
    video_path: Optional[Path]


def _get_matching_video(
    fetch_params: _VideoFetchParams,
) -> _MatchingVideo:
    try:
        # Handle cache dir
        this_video_cache_dir = (
            fetch_params.parent_cache_dir
            / f"event-{fetch_params.event_id}"
            / f"session-{fetch_params.session_id}"
        )
        # Create cache dir (Handle try except because threaded)
        try:
            this_video_cache_dir.mkdir(parents=True, exist_ok=True)
        except FileExistsError:
            pass

        # Download transcript if needed
        save_path = this_video_cache_dir / "video"
        if save_path.is_dir():
            raise IsADirectoryError(
                f"Video '{fetch_params.video_uri}', could not be saved because "
                f"'{save_path}' is a directory. Delete or move the directory to a "
                f"different location or change the target dataset cache dir."
            )
        elif save_path.is_file():
            log.debug(
                f"Skipping video '{fetch_params.video_uri}'. "
                f"A file already exists at target save path."
            )
        else:
            resource_copy(uri=fetch_params.video_uri, dst=save_path)

        return _MatchingVideo(
            session_key=fetch_params.session_key,
            video_path=save_path,
        )

    except Exception as e:
        if fetch_params.raise_on_error:
            raise FileNotFoundError(
                f"Something went wrong while fetching the video for session: "
                f"'{fetch_params.session_id}' from '{fetch_params.fs.project}' "
                f"Please check if a report has already been made to GitHub "
                f"(https://github.com/CouncilDataProject/cdp-data/issues). "
                f"If you cannot find an open issue for this session, "
                f"please create a new one. "
                f"In the meantime, please try rerunning your request with "
                f"`raise_on_error=False`"
            ) from e

        return _MatchingVideo(
            session_key=fetch_params.session_key,
            video_path=None,
        )


@dataclass
class _AudioFetchParams:
    session_id: str
    session_key: str
    event_id: str
    parent_cache_dir: Path
    fs: GCSFileSystem
    raise_on_error: bool


@dataclass_json
@dataclass
class _MatchingAudio:
    session_key: str
    audio_path: Optional[Path]


def _get_matching_audio(
    fetch_params: _AudioFetchParams,
) -> _MatchingAudio:
    try:
        # Get any DB transcript
        db_transcript = db_models.Transcript.collection.filter(
            "session_ref", "==", fetch_params.session_key
        ).get()

        # Get transcript file info
        db_transcript_file = db_transcript.file_ref.get()

        # Strip the transcript details from filename
        # Audio files are stored with the same URI as the transcript
        # but instead of `-cdp_{version}-transcript.json`
        # they simply end with `-audio.wav`
        transcript_uri_parts = db_transcript_file.uri.split("/")
        transcript_filename = transcript_uri_parts[-1]
        uri_base = "/".join(transcript_uri_parts[:-1])
        session_content_hash = transcript_filename[:64]
        audio_uri = "/".join([uri_base, f"{session_content_hash}-audio.wav"])

        # Handle cache dir
        this_audio_cache_dir = (
            fetch_params.parent_cache_dir
            / f"event-{fetch_params.event_id}"
            / f"session-{fetch_params.session_id}"
        )
        # Create cache dir (Handle try except because threaded)
        try:
            this_audio_cache_dir.mkdir(parents=True, exist_ok=True)
        except FileExistsError:
            pass

        # Download audio if needed
        save_path = this_audio_cache_dir / "audio.wav"
        if save_path.is_dir():
            raise IsADirectoryError(
                f"Audio '{audio_uri}', could not be saved because "
                f"'{save_path}' is a directory. Delete or move the directory to a "
                f"different location or change the target dataset cache dir."
            )
        elif save_path.is_file():
            log.debug(
                f"Skipping audio '{audio_uri}'. "
                f"A file already exists at target save path."
            )
        else:
            fetch_params.fs.get(audio_uri, str(save_path))

        return _MatchingAudio(
            session_key=fetch_params.session_key,
            audio_path=save_path,
        )

    except Exception as e:
        if fetch_params.raise_on_error:
            raise FileNotFoundError(
                f"Something went wrong while fetching the video for session: "
                f"'{fetch_params.session_id}' from '{fetch_params.fs.project}' "
                f"Please check if a report has already been made to GitHub "
                f"(https://github.com/CouncilDataProject/cdp-data/issues). "
                f"If you cannot find an open issue for this session, "
                f"please create a new one. "
                f"In the meantime, please try rerunning your request with "
                f"`raise_on_error=False`"
            ) from e

        return _MatchingAudio(
            session_key=fetch_params.session_key,
            audio_path=None,
        )


@dataclass
class _TranscriptFetchParams:
    session_id: str
    session_key: str
    event_id: str
    transcript_selection: str
    parent_cache_dir: Path
    fs: GCSFileSystem
    raise_on_error: bool


@dataclass_json
@dataclass
class _MatchingTranscript:
    session_key: str
    transcript: Optional[db_models.Transcript]
    transcript_path: Optional[Path]


def _get_matching_db_transcript(
    fetch_params: _TranscriptFetchParams,
) -> _MatchingTranscript:
    try:
        # Get DB transcript
        db_transcript = (
            db_models.Transcript.collection.filter(
                "session_ref", "==", fetch_params.session_key
            )
            .order(f"-{fetch_params.transcript_selection}")
            .get()
        )

        # Get transcript file info
        db_transcript_file = db_transcript.file_ref.get()

        # Handle cache dir
        this_transcript_cache_dir = (
            fetch_params.parent_cache_dir
            / f"event-{fetch_params.event_id}"
            / f"session-{fetch_params.session_id}"
        )
        # Create cache dir (Handle try except because threaded)
        try:
            this_transcript_cache_dir.mkdir(parents=True, exist_ok=True)
        except FileExistsError:
            pass

        # Download transcript if needed
        save_path = this_transcript_cache_dir / "transcript.json"
        if save_path.is_dir():
            raise IsADirectoryError(
                f"Transcript '{db_transcript_file.uri}', could not be saved because "
                f"'{save_path}' is a directory. Delete or move the directory to a "
                f"different location or change the target dataset cache dir."
            )
        elif save_path.is_file():
            log.debug(
                f"Skipping transcript '{db_transcript_file.uri}'. "
                f"A file already exists at target save path."
            )
        else:
            fetch_params.fs.get(db_transcript_file.uri, str(save_path))

        return _MatchingTranscript(
            session_key=fetch_params.session_key,
            transcript=db_transcript,
            transcript_path=save_path,
        )

    except Exception as e:
        if fetch_params.raise_on_error:
            raise FileNotFoundError(
                f"Something went wrong while fetching the transcript for session: "
                f"'{fetch_params.session_id}' from '{fetch_params.fs.project}' "
                f"Please check if a report has already been made to GitHub "
                f"(https://github.com/CouncilDataProject/cdp-data/issues). "
                f"If you cannot find an open issue for this session, "
                f"please create a new one. "
                f"In the meantime, please try rerunning your request with "
                f"`raise_on_error=False`"
            ) from e

        return _MatchingTranscript(
            session_key=fetch_params.session_key,
            transcript=None,
            transcript_path=None,
        )


@dataclass_json
@dataclass
class _TranscriptConversionParams:
    session_id: str
    session_key: str
    fs: GCSFileSystem
    transcript_path: Path
    raise_on_error: bool


@dataclass_json
@dataclass
class _ConvertedTranscript:
    session_key: str
    transcript_as_csv_path: Optional[Path]


def _convert_transcript_to_csv(
    convert_params: _TranscriptConversionParams,
) -> _ConvertedTranscript:
    # Safety around conversion, any error, we return None and that will be dropped
    try:
        # Get storage name
        dest = convert_params.transcript_path.with_suffix(".csv")

        # If this path already exists, just attach and return
        if dest.exists():
            return _ConvertedTranscript(
                session_key=convert_params.session_key,
                transcript_as_csv_path=dest,
            )

        # The path doesn't exist, convert
        transcript_df = convert_transcript_to_dataframe(convert_params.transcript_path)
        transcript_df.to_csv(dest, index=False)

        return _ConvertedTranscript(
            session_key=convert_params.session_key,
            transcript_as_csv_path=dest,
        )

    except Exception as e:
        if convert_params.raise_on_error:
            raise ValueError(
                f"Something went wrong while converting the transcript for a session: "
                f"'{convert_params.session_id}' from '{convert_params.fs.project}' "
                f"Please check if a report has already been made to GitHub "
                f"(https://github.com/CouncilDataProject/cdp-data/issues). "
                f"If you cannot find an open issue for this session, "
                f"please create a new one. "
                f"In the meantime, please try rerunning your request with "
                f"`raise_on_error=False`"
            ) from e

        return _ConvertedTranscript(
            session_key=convert_params.session_key,
            transcript_as_csv_path=None,
        )


def _merge_dataclasses_to_df(
    data_objs: List[dataclass_json],
    df: pd.DataFrame,
    data_objs_key: str,
    df_key: str,
) -> pd.DataFrame:
    # Merge back to video dataframe
    fetched_objs = pd.DataFrame([obj.to_dict() for obj in data_objs])

    # Join to larger dataframe
    return df.join(
        fetched_objs.set_index(data_objs_key),
        on=df_key,
    )


###############################################################################


[docs] def replace_db_model_cols_with_id_cols( df: pd.DataFrame, ) -> pd.DataFrame: """ Replace all database model column values with the model ID. Example: an `event` column with event models, will be replaced by an `event_id` column with just the event id. Parameters ---------- df: pd.DataFrame The data to replace database models with just ids. Returns ------- pd.DataFrame The updated DataFrame. """ # Get a single row sample sample = df.loc[0] # Iter over cols, check type, and replace if type is a db model for col in df.columns: sample_col_val = sample[col] if isinstance(sample_col_val, Model): df[f"{sample_col_val.collection_name}_id"] = df[col].apply(lambda m: m.id) df = df.drop(columns=[col]) return df
[docs] def replace_pathlib_path_cols_with_str_path_cols( df: pd.DataFrame, ) -> pd.DataFrame: """ Replace all pathlib Path column values with string column values. Example: a `transcript_path` column with a pathlib Path, will be replaced as a normal Python string. Parameters ---------- df: pd.DataFrame The data to replace paths in. Returns ------- pd.DataFrame The updated DataFrame. """ # Get a single row sample sample = df.loc[0] # Iter over cols, check type, and replace if type is a db model for col in df.columns: sample_col_val = sample[col] if isinstance(sample_col_val, Path): df[col] = df[col].apply(lambda p: str(p)) return df
[docs] def replace_dataframe_cols_with_storage_replacements( df: pd.DataFrame, ) -> pd.DataFrame: """ Run various replacement functions against the dataframe to get the data to a point where it can be store to disk. Parameters ---------- df: pd.DataFrame The data to fix. Returns ------- pd.DataFrame The updated DataFrame. See Also -------- replace_db_model_cols_with_id_cols Function to replace database model column values with their ids. replace_pathlib_path_cols_with_str_path_cols Function to replace pathlib Path column values with normal Python strings. """ # Replace everything for func in ( replace_db_model_cols_with_id_cols, replace_pathlib_path_cols_with_str_path_cols, ): df = func(df) return df
[docs] def convert_transcript_to_dataframe( transcript: Union[str, Path, Transcript] ) -> pd.DataFrame: """ Create a dataframe from only the sentence data from the provided transcript. Parameters ---------- transcript: Union[str, Path, Transcript] The transcript to pull all sentences from. Returns ------- pd.DataFrame: The sentences of the transcript. """ # Read transcript is need be if isinstance(transcript, (str, Path)): with open(transcript) as open_f: transcript = Transcript.from_json(open_f.read()) # Dump sentences to frame sentences = pd.DataFrame(transcript.sentences) # Drop the words col sentences = sentences.drop(columns=["words"]) return sentences
[docs] def get_session_dataset( # noqa: C901 infrastructure_slug: str, start_datetime: Optional[Union[str, datetime]] = None, end_datetime: Optional[Union[str, datetime]] = None, sample: Optional[Union[int, float]] = None, replace_py_objects: bool = False, store_full_metadata: bool = False, store_transcript: bool = False, transcript_selection: str = "created", store_transcript_as_csv: bool = False, store_video: bool = False, store_audio: bool = False, cache_dir: Optional[Union[str, Path]] = None, raise_on_error: bool = True, tqdm_kws: Union[Dict[str, Any], None] = None, ) -> pd.DataFrame: """ Get a dataset of sessions from a CDP infrastructure. Parameters ---------- infrastructure_slug: str The CDP infrastructure to connect to and pull sessions for. start_datetime: Optional[Union[str, datetime]] An optional datetime that the session dataset will start at. Default: None (no datetime beginning bound on the dataset) end_datetime: Optional[Union[str, datetime]] An optional datetime that the session dataset will end at. Default: None (no datetime end bound on the dataset) sample: Optional[Union[int, float]] An optional sample of the dataset to return. If an int, the number of rows to return. If a float, the percentage of rows to return. Default: None (return all rows) replace_py_objects: bool Replace any non-standard Python type with standard ones to allow the returned data be ready for storage. See 'See Also' for more details. Default: False (keep Python objects in the DataFrame) store_full_metadata: bool Should a JSON file of the full event metadata be stored to disk and a path to the stored JSON file be added to the returned DataFrame. Default: False (do not request extra data and store to disk) **Currently not implemented** store_transcript: bool Should a session transcript be requested and stored to disk and a path to the stored transcript JSON file be added to the returned DataFrame. Default: False (do not request extra data and do not store the transcript) transcript_selection: str How should the single transcript be selected. Default: "created" (Return the most recently created transcript per session) store_transcript_as_csv: bool Additionally convert and store all transcripts as CSVs. Does nothing if `store_transcript` is False. Default: False (do not convert and store again) store_video: bool Should the session video be requested and stored to disk and a path to the stored video file be added to the returned DataFrame. Note: the video is stored without a file extension. However, the video with always be either mp4 or webm. Default: False (do not request and store the video) store_audio: bool Should the session audio be requested and stored to disk and a path to the stored audio file be added to the returned DataFrame. Default: False (do not request and store the audio) cache_dir: Optional[Union[str, Path]] An optional directory path to cache the dataset. Directory is created if it does not exist. Default: "./cdp-datasets" raise_on_error: bool Should any failure to pull files result in an error or be ignored. Default: True (raise on any failure) tqdm_kws: Dict[str, Any] A dictionary with extra keyword arguments to provide to tqdm progress bars. Must not include the `desc` keyword argument. Returns ------- dataset: pd.DataFrame The dataset with all additions requested. Notes ----- All file additions (transcript, full event metadata, video, audio, etc.) are cached to disk to avoid multiple downloads. If you use the same cache directory multiple times over the course of multiple runs, no new data will be downloaded, but the existing files will be used. Caching is done by simply file existence not by a content hash comparison. Datasets are cached with the following structure:: {cache-dir}/ └── {infrastructure_slug} ├── event-{event-id-0} │ ├── metadata.json │ └── session-{session-id-0} │ ├── audio.wav │ ├── transcript.json │ └── video ├── event-{event-id-1} │ ├── metadata.json │ └── session-{session-id-0} │ ├── audio.wav │ ├── transcript.json │ └── video ├── event-{event-id-2} │ ├── metadata.json │ └── session-{session-id-0} │ ├── audio.wav │ ├── transcript.json │ └── video │ └── session-{session-id-1} │ ├── audio.wav │ ├── transcript.json │ └── video To clean a whole dataset or specific events or sessions simply delete the associated directory. See Also -------- replace_dataframe_cols_with_storage_replacements The function used to clean the data of non-standard Python types. """ # Handle default dict if not tqdm_kws: tqdm_kws = {} # Connect to infra fs = connect_to_infrastructure(infrastructure_slug) # Begin partial query query = db_models.Session.collection # Add datetime filters if start_datetime: if isinstance(start_datetime, str): start_datetime = datetime.fromisoformat(start_datetime) query = query.filter("session_datetime", ">=", start_datetime) if end_datetime: if isinstance(end_datetime, str): end_datetime = datetime.fromisoformat(end_datetime) query = query.filter("session_datetime", "<=", end_datetime) # Query for events and cast to pandas sessions = pd.DataFrame([e.to_dict() for e in query.fetch()]) # If no sessions found, return empty dataset if len(sessions) == 0: return pd.DataFrame( columns=[ "session_datetime", "session_index", "session_content_hash", "video_uri", "caption_uri", "external_source_id", "id", "key", "event", ] ) # Handle sample if sample: if isinstance(sample, int): sessions = sessions.sample(sample) elif isinstance(sample, float): sessions = sessions.sample(frac=sample) else: raise ValueError( f"Unrecognized sample type: '{type(sample)}'. " f"Must be either an int or a float." ) # Handle basic event metadata attachment log.info("Attaching event metadata to each session datum") sessions = db_utils.load_model_from_pd_columns( sessions, join_id_col="id", model_ref_col="event_ref", tqdm_kws=tqdm_kws, ) # Add body_ref from event to col to then unpack sessions["body_ref"] = sessions["event"].apply(lambda e: e.body_ref) sessions = db_utils.load_model_from_pd_columns( sessions, join_id_col="id", model_ref_col="body_ref", tqdm_kws=tqdm_kws, ) # Store minutes item uri sessions["minutes_pdf_url"] = sessions["event"].apply(lambda e: e.minutes_uri) # We only need to handle cache dir and more if any extras are True if not any( [ store_full_metadata, store_transcript, store_video, store_audio, ] ): return sessions # Handle cache dir if not cache_dir: cache_dir = DEFAULT_DATASET_STORAGE_DIR if isinstance(cache_dir, str): cache_dir = Path(cache_dir).resolve() # Make cache dir cache_dir = cache_dir / infrastructure_slug cache_dir.mkdir(parents=True, exist_ok=True) # TODO: # Handle metadata reversal to ingestion model if store_full_metadata: log.warning("`store_full_metadata` not implemented") # Handle video if store_video: log.info("Fetching video") fetched_video_infos = thread_map( _get_matching_video, [ _VideoFetchParams( session_id=row.id, session_key=row.key, event_id=row.event.id, video_uri=row.video_uri, parent_cache_dir=cache_dir, fs=fs, raise_on_error=raise_on_error, ) for _, row in sessions.iterrows() ], desc="Fetching videos", **tqdm_kws, ) # Merge fetched data back to session df sessions = _merge_dataclasses_to_df( data_objs=fetched_video_infos, df=sessions, data_objs_key="session_key", df_key="key", ).dropna(subset=["video_path"]) # Handle audio if store_audio: log.info("Fetching audio") fetched_audio_infos = thread_map( _get_matching_audio, [ _AudioFetchParams( session_id=row.id, session_key=row.key, event_id=row.event.id, parent_cache_dir=cache_dir, fs=fs, raise_on_error=raise_on_error, ) for _, row in sessions.iterrows() ], desc="Fetching audios", **tqdm_kws, ) # Merge fetched data back to session df sessions = _merge_dataclasses_to_df( data_objs=fetched_audio_infos, df=sessions, data_objs_key="session_key", df_key="key", ).dropna(subset=["audio_path"]) # Pull transcript info if store_transcript: log.info("Fetching transcripts") # Threaded get of transcript info fetched_transcript_infos = thread_map( _get_matching_db_transcript, [ _TranscriptFetchParams( session_id=row.id, session_key=row.key, event_id=row.event.id, transcript_selection=transcript_selection, parent_cache_dir=cache_dir, fs=fs, raise_on_error=raise_on_error, ) for _, row in sessions.iterrows() ], desc="Fetching transcripts", **tqdm_kws, ) # Merge fetched data back to session df sessions = _merge_dataclasses_to_df( data_objs=fetched_transcript_infos, df=sessions, data_objs_key="session_key", df_key="key", ).dropna(subset=["transcript_path"]) # Handle conversion of transcripts to CSVs if store_transcript_as_csv: log.info("Converting and storing transcripts as CSVs") # Threaded processing of transcript conversion converted_transcript_infos = process_map( _convert_transcript_to_csv, [ _TranscriptConversionParams( session_id=row.id, session_key=row.key, fs=fs, transcript_path=row.transcript_path, raise_on_error=raise_on_error, ) for _, row in sessions.iterrows() ], desc="Converting transcripts", **tqdm_kws, ) # Merge fetched data back to session df sessions = _merge_dataclasses_to_df( data_objs=converted_transcript_infos, df=sessions, data_objs_key="session_key", df_key="key", ).dropna(subset=["transcript_as_csv_path"]) # Replace col values with storage ready replacements if replace_py_objects: sessions = replace_dataframe_cols_with_storage_replacements(sessions) return sessions
[docs] def save_dataset( df: pd.DataFrame, dest: Union[str, Path], ) -> Path: """ Helper function to store a dataset to disk by replacing non-standard Python types with storage ready replacements. Parameters ---------- df: pd.DataFrame The DataFrame to store. dest: Union[str, Path] The path to store the data. Must end in ".csv" or ".parquet". Returns ------- Path: The path to the stored data. See Also -------- replace_dataframe_cols_with_storage_replacements The function used to replace column values. """ # Replace col values with storage ready replacements df = replace_dataframe_cols_with_storage_replacements(df) # Convert dest to Path if isinstance(dest, str): dest = Path(dest) # Check suffix if dest.suffix == ".csv": df.to_csv(dest, index=False) return dest if dest.suffix == ".parquet": df.to_parquet(dest) return dest raise ValueError( f"Unrecognized filepath suffix: '{dest.suffix}'. " f"Support storage types are 'csv' and 'parquet'." )
def _get_votes_for_event(key: str) -> pd.DataFrame: return pd.DataFrame( [ v.to_dict() for v in db_models.Vote.collection.filter( "event_ref", "==", key, ).fetch() ] )
[docs] def get_vote_dataset( infrastructure_slug: str, start_datetime: Optional[Union[str, datetime]] = None, end_datetime: Optional[Union[str, datetime]] = None, replace_py_objects: bool = False, tqdm_kws: Union[Dict[str, Any], None] = None, ) -> pd.DataFrame: """ Get a dataset of votes from a CDP infrastructure. Parameters ---------- infrastructure_slug: str The CDP infrastructure to connect to and pull votes for. start_datetime: Optional[Union[str, datetime]] An optional datetime that the vote dataset will start at. Default: None (no datetime beginning bound on the dataset) end_datetime: Optional[Union[str, datetime]] An optional datetime that the vote dataset will end at. Default: None (no datetime end bound on the dataset) replace_py_objects: bool Replace any non-standard Python type with standard ones to allow the returned data be ready for storage. See 'See Also' for more details. Default: False (keep Python objects in the DataFrame) tqdm_kws: Dict[str, Any] A dictionary with extra keyword arguments to provide to tqdm progress bars. Must not include the `desc` keyword argument. Returns ------- pd.DataFrame The dataset requested. See Also -------- replace_dataframe_cols_with_storage_replacements The function used to clean the data of non-standard Python types. """ if not tqdm_kws: tqdm_kws = {} # Connect to infra connect_to_infrastructure(infrastructure_slug) # Begin partial query query = db_models.Event.collection # Add datetime filters if start_datetime: if isinstance(start_datetime, str): start_datetime = datetime.fromisoformat(start_datetime) query = query.filter("event_datetime", ">=", start_datetime) if end_datetime: if isinstance(end_datetime, str): end_datetime = datetime.fromisoformat(end_datetime) query = query.filter("event_datetime", "<=", end_datetime) # Query for events events = list(query.fetch()) # Thread fetch votes for each event log.info("Fetching votes for each event") fetched_votes_frames = thread_map( _get_votes_for_event, [e.key for e in events], desc="Fetching votes for each event", **tqdm_kws, ) votes = pd.concat(fetched_votes_frames) # If no votes are found, return empty dataset if len(votes) == 0: return pd.DataFrame( columns=[ "decision", "in_majority", "external_source_id", "id", "key", "event_id", "event_key", "event_datetime", "agenda_uri", "minutes_uri", "matter_id", "matter_key", "matter_name", "matter_type", "matter_title", "event_minutes_item_id", "event_minutes_item_key", "event_minutes_item_index_in_meeting", "event_minutes_item_overall_decision", "person_id", "person_key", "person_name", "body_id", "body_key", "body_name", ], ) # Thread fetch events for each vote log.info("Attaching event metadata to each vote datum") votes = db_utils.load_model_from_pd_columns( votes, join_id_col="id", model_ref_col="event_ref", tqdm_kws=tqdm_kws, ) # Thread fetch matters for each vote log.info("Attaching matter metadata to each vote datum") votes = db_utils.load_model_from_pd_columns( votes, join_id_col="id", model_ref_col="matter_ref", tqdm_kws=tqdm_kws, ) # Thread fetch event minutes items for each vote log.info("Attaching event minutes item metadata to each vote datum") votes = db_utils.load_model_from_pd_columns( votes, join_id_col="id", model_ref_col="event_minutes_item_ref", tqdm_kws=tqdm_kws, ) # Thread fetch people for each vote log.info("Attaching person metadata to each vote datum") votes = db_utils.load_model_from_pd_columns( votes, join_id_col="id", model_ref_col="person_ref", tqdm_kws=tqdm_kws, ) # Expand event models votes = db_utils.expand_models_from_pd_column( votes, model_col="event", model_attr_rename_lut={ "id": "event_id", "key": "event_key", "body_ref": "body_ref", "event_datetime": "event_datetime", "agenda_uri": "agenda_uri", "minutes_uri": "minutes_uri", }, tqdm_kws=tqdm_kws, ) # Expand matter models votes = db_utils.expand_models_from_pd_column( votes, model_col="matter", model_attr_rename_lut={ "id": "matter_id", "key": "matter_key", "name": "matter_name", "matter_type": "matter_type", "title": "matter_title", }, tqdm_kws=tqdm_kws, ) # Expand event minutes item models votes = db_utils.expand_models_from_pd_column( votes, model_col="event_minutes_item", model_attr_rename_lut={ "id": "event_minutes_item_id", "key": "event_minutes_item_key", "index": "event_minutes_item_index_in_meeting", "decision": "event_minutes_item_overall_decision", }, tqdm_kws=tqdm_kws, ) # Expand person models votes = db_utils.expand_models_from_pd_column( votes, model_col="person", model_attr_rename_lut={ "id": "person_id", "key": "person_key", "name": "person_name", }, tqdm_kws=tqdm_kws, ) # Thread fetch body for each vote log.info("Attaching body metadata to each vote datum") votes = db_utils.load_model_from_pd_columns( votes, join_id_col="id", model_ref_col="body_ref", tqdm_kws=tqdm_kws, ) # Expand body models votes = db_utils.expand_models_from_pd_column( votes, model_col="body", model_attr_rename_lut={ "id": "body_id", "key": "body_key", "name": "body_name", }, tqdm_kws=tqdm_kws, ) # Replace col values with storage ready replacements if replace_py_objects: votes = replace_dataframe_cols_with_storage_replacements(votes) return votes