Source code for cdp_backend.database.functions

#!/usr/bin/env python

from __future__ import annotations

import logging
import pickle
from datetime import datetime
from hashlib import sha256
from typing import Any

import fireo
from fireo.models import Model
from fireo.queries.query_wrapper import ReferenceDocLoader
from google.cloud.firestore_v1.batch import WriteBatch
from google.cloud.firestore_v1.transaction import Transaction

from ..database import models as db_models
from ..pipeline import ingestion_models, transcript_model

###############################################################################

log = logging.getLogger(__name__)

###############################################################################


[docs] def generate_and_attach_doc_hash_as_id(db_model: Model) -> Model: """ Generate a SHA256 hash to use as the document key for storage using the primary keys of the database model. Parameters ---------- db_model: Model The initialized database model. Returns ------- db_model: Model The updated database model with the doc key set. """ # Create hasher and hash primary values hasher = sha256() for pk in db_model._PRIMARY_KEYS: field = getattr(db_model, pk) # Load the document for reference fields and add id to hasher if isinstance(field, ReferenceDocLoader): hasher.update(pickle.dumps(field.get().id, protocol=4)) # Handle reference fields by using their doc path elif isinstance(field, Model): # Ensure that the underlying model has an id # In place update to db_model for this field setattr(db_model, pk, generate_and_attach_doc_hash_as_id(field)) # Update variable after setattr field = getattr(db_model, pk) # Now attach the generated hash document path hasher.update(pickle.dumps(field.id, protocol=4)) # If datetime, hash with epoch millis to avoid timezone issues elif isinstance(field, datetime): field = field.timestamp() hasher.update(pickle.dumps(field, protocol=4)) # Otherwise just simply add the primary key value else: hasher.update(pickle.dumps(field, protocol=4)) # Set the id to the first twelve characters of hexdigest db_model.id = hasher.hexdigest()[:12] return db_model
[docs] def upload_db_model( db_model: Model, credentials_file: str, transaction: Transaction | None = None, batch: WriteBatch | None = None, ) -> Model: """ Upload or update an existing database model. Parameters ---------- db_model: Model The database model to upload. credentials_file: str Path to Google Service Account Credentials JSON file. transaction: Optional[Transaction] The transaction to write this model during. batch: Optional[WriteBatch] The write batch to use during uploading this model. Returns ------- db_model: Model The uploaded, or updated, database model. """ # Init transaction and auth fireo.connection(from_file=credentials_file) # Generate id and upsert db_model = generate_and_attach_doc_hash_as_id(db_model) db_model = db_model.upsert(transaction=transaction, batch=batch) return db_model
[docs] def get_all_of_collection( db_model: Model, credentials_file: str, batch_size: int = 1000 ) -> list[Model]: """ Get all documents in a collection as a single list but request in batches. Parameters ---------- db_model: Model The CDP database model to get all documents for. credentials_file: str Path to Google Service Account Credentials JSON file. batch_size: int How many documents to request at a single time. Default: 1000 Returns ------- documents: List[Model] All documents in the model's collection. """ fireo.connection(from_file=credentials_file) # Construct all documents list and fill as batches return all_documents: list[Model] = [] paginator = db_model.collection.fetch(batch_size) all_documents_gathered = False while not all_documents_gathered: batch = list(paginator) all_documents += batch if len(batch) == 0: all_documents_gathered = True else: paginator.next_fetch() return all_documents
def _strip_field(field: str | None) -> str | None: if isinstance(field, str): return field.strip() return field def _ensure_string_or_optional(field: Any | None) -> str | None: if field is not None: return str(field) return None
[docs] def create_body( body: ingestion_models.Body, start_datetime: datetime, ) -> db_models.Body: db_body = db_models.Body() # Required fields db_body.name = _strip_field(body.name) db_body.is_active = body.is_active db_body.start_datetime = start_datetime # Optional fields db_body.end_datetime = body.end_datetime db_body.description = _strip_field(body.description) db_body.external_source_id = _ensure_string_or_optional(body.external_source_id) return db_body
[docs] def create_event( body_ref: db_models.Body, event_datetime: datetime, static_thumbnail_ref: db_models.File | None = None, hover_thumbnail_ref: db_models.File | None = None, agenda_uri: str | None = None, minutes_uri: str | None = None, external_source_id: str | None = None, credentials_file: str | None = None, ) -> db_models.Event: db_event = db_models.Event() if credentials_file: db_event.set_validator_kwargs( kwargs={"google_credentials_file": credentials_file} ) # Required fields db_event.body_ref = body_ref db_event.event_datetime = event_datetime # Optional fields db_event.static_thumbnail_ref = static_thumbnail_ref db_event.hover_thumbnail_ref = hover_thumbnail_ref db_event.agenda_uri = _strip_field(agenda_uri) db_event.minutes_uri = _strip_field(minutes_uri) db_event.external_source_id = _ensure_string_or_optional(external_source_id) return db_event
[docs] def create_session( session: ingestion_models.Session, session_video_hosted_url: str, session_content_hash: str, event_ref: db_models.Event, credentials_file: str | None = None, ) -> db_models.Session: db_session = db_models.Session() if credentials_file: db_session.set_validator_kwargs( kwargs={"google_credentials_file": credentials_file} ) # Required fields db_session.event_ref = event_ref db_session.session_datetime = session.session_datetime db_session.video_uri = session_video_hosted_url db_session.session_index = session.session_index db_session.session_content_hash = session_content_hash # Optional fields db_session.caption_uri = session.caption_uri db_session.external_source_id = _ensure_string_or_optional( session.external_source_id ) return db_session
[docs] def create_file( uri: str, credentials_file: str | None = None, ) -> db_models.File: db_file = db_models.File() db_file.name = uri.split("/")[-1] db_file.uri = uri if credentials_file: db_file.set_validator_kwargs( kwargs={"google_credentials_file": credentials_file} ) return db_file
[docs] def create_transcript( transcript_file_ref: db_models.File, session_ref: db_models.Session, transcript: transcript_model.Transcript, ) -> db_models.Transcript: db_transcript = db_models.Transcript() db_transcript.session_ref = session_ref db_transcript.file_ref = transcript_file_ref db_transcript.generator = transcript.generator db_transcript.confidence = transcript.confidence db_transcript.created = datetime.fromisoformat(transcript.created_datetime) return db_transcript
[docs] def create_matter( matter: ingestion_models.Matter, ) -> db_models.Matter: db_matter = db_models.Matter() db_matter.name = _strip_field(matter.name) db_matter.matter_type = _strip_field(matter.matter_type) db_matter.title = _strip_field(matter.title) db_matter.external_source_id = _ensure_string_or_optional(matter.external_source_id) return db_matter
[docs] def create_matter_status( matter_ref: db_models.Matter, status: str, update_datetime: datetime, event_minutes_item_ref: db_models.EventMinutesItem | None = None, external_source_id: str | None = None, ) -> db_models.Matter: db_matter_status = db_models.MatterStatus() db_matter_status.matter_ref = matter_ref db_matter_status.event_minutes_item_ref = event_minutes_item_ref db_matter_status.status = _strip_field(status) db_matter_status.update_datetime = update_datetime db_matter_status.external_source_id = _ensure_string_or_optional(external_source_id) return db_matter_status
[docs] def create_matter_file( matter_ref: db_models.Matter, supporting_file: ingestion_models.SupportingFile, credentials_file: str | None = None, ) -> db_models.MatterFile: db_matter_file = db_models.MatterFile() if credentials_file: db_matter_file.set_validator_kwargs( kwargs={"google_credentials_file": credentials_file} ) db_matter_file.matter_ref = matter_ref db_matter_file.name = _strip_field(supporting_file.name) db_matter_file.uri = _strip_field(supporting_file.uri) db_matter_file.external_source_id = _ensure_string_or_optional( supporting_file.external_source_id ) return db_matter_file
[docs] def create_minimal_person( person: ingestion_models.Person, ) -> db_models.Person: db_person = db_models.Person() db_person.name = _strip_field(person.name) db_person.is_active = person.is_active stripped_name = _strip_field(person.name) if stripped_name: db_person.router_string = db_models.Person.generate_router_string(stripped_name) else: raise ValueError( f"Something went very wrong. Person doesn't have a name: {person}" ) return db_person
[docs] def create_matter_sponsor( matter_ref: db_models.Matter, person_ref: db_models.Person, external_source_id: str | None = None, ) -> db_models.MatterSponsor: db_matter_sponsor = db_models.MatterSponsor() db_matter_sponsor.matter_ref = matter_ref db_matter_sponsor.person_ref = person_ref db_matter_sponsor.external_source_id = _ensure_string_or_optional( external_source_id ) return db_matter_sponsor
[docs] def create_person( person: ingestion_models.Person, picture_ref: db_models.File | None = None, credentials_file: str | None = None, ) -> db_models.Person: # Get minimal db_person = create_minimal_person(person=person) if credentials_file: db_person.set_validator_kwargs( kwargs={"google_credentials_file": credentials_file} ) if person.router_string is not None: db_person.router_string = _strip_field(person.router_string) # Optional db_person.email = _strip_field(person.email) db_person.phone = _strip_field(person.phone) db_person.website = _strip_field(person.website) db_person.picture_ref = picture_ref db_person.external_source_id = _ensure_string_or_optional(person.external_source_id) return db_person
[docs] def create_seat( seat: ingestion_models.Seat, image_ref: db_models.File | None, ) -> db_models.Seat: db_seat = db_models.Seat() db_seat.name = _strip_field(seat.name) db_seat.electoral_area = _strip_field(seat.electoral_area) db_seat.electoral_type = _strip_field(seat.electoral_type) db_seat.image_ref = image_ref db_seat.external_source_id = _ensure_string_or_optional(seat.external_source_id) return db_seat
[docs] def create_role( role: ingestion_models.Role, person_ref: db_models.Person, seat_ref: db_models.Seat, start_datetime: datetime, body_ref: db_models.Body | None = None, ) -> db_models.Role: db_role = db_models.Role() # Required db_role.title = _strip_field(role.title) db_role.person_ref = person_ref db_role.seat_ref = seat_ref db_role.start_datetime = start_datetime # Optional db_role.body_ref = body_ref db_role.end_datetime = role.end_datetime db_role.external_source_id = _ensure_string_or_optional(role.external_source_id) return db_role
[docs] def create_minutes_item( minutes_item: ingestion_models.MinutesItem, matter_ref: db_models.Matter | None = None, ) -> db_models.MinutesItem: db_minutes_item = db_models.MinutesItem() db_minutes_item.name = _strip_field(minutes_item.name) db_minutes_item.description = _strip_field(minutes_item.description) db_minutes_item.matter_ref = matter_ref db_minutes_item.external_source_id = _ensure_string_or_optional( minutes_item.external_source_id ) return db_minutes_item
[docs] def create_minimal_event_minutes_item( event_ref: db_models.Event, minutes_item_ref: db_models.MinutesItem, index: int, ) -> db_models.EventMinutesItem: db_event_minutes_item = db_models.EventMinutesItem() db_event_minutes_item.event_ref = event_ref db_event_minutes_item.minutes_item_ref = minutes_item_ref db_event_minutes_item.index = index return db_event_minutes_item
[docs] def create_event_minutes_item( event_minutes_item: ingestion_models.EventMinutesItem, event_ref: db_models.Event, minutes_item_ref: db_models.MinutesItem, index: int, ) -> db_models.EventMinutesItem: db_event_minutes_item = create_minimal_event_minutes_item( event_ref=event_ref, minutes_item_ref=minutes_item_ref, index=index, ) db_event_minutes_item.decision = event_minutes_item.decision return db_event_minutes_item
[docs] def create_event_minutes_item_file( event_minutes_item_ref: db_models.EventMinutesItem, supporting_file: ingestion_models.SupportingFile, credentials_file: str | None = None, ) -> db_models.EventMinutesItemFile: db_event_minutes_item_file = db_models.EventMinutesItemFile() if credentials_file: db_event_minutes_item_file.set_validator_kwargs( kwargs={"google_credentials_file": credentials_file} ) db_event_minutes_item_file.event_minutes_item_ref = event_minutes_item_ref db_event_minutes_item_file.name = _strip_field(supporting_file.name) db_event_minutes_item_file.uri = _strip_field(supporting_file.uri) db_event_minutes_item_file.external_source_id = _ensure_string_or_optional( supporting_file.external_source_id ) return db_event_minutes_item_file
[docs] def create_vote( matter_ref: db_models.Matter, event_ref: db_models.Event, event_minutes_item_ref: db_models.EventMinutesItem, person_ref: db_models.Person, decision: str, in_majority: bool | None, external_source_id: str | None = None, ) -> db_models.Vote: db_vote = db_models.Vote() db_vote.matter_ref = matter_ref db_vote.event_ref = event_ref db_vote.event_minutes_item_ref = event_minutes_item_ref db_vote.person_ref = person_ref db_vote.decision = _strip_field(decision) db_vote.in_majority = in_majority db_vote.external_source_id = _ensure_string_or_optional(external_source_id) return db_vote