Source code for cdp_backend.pipeline.event_gather_pipeline

#!/usr/bin/env python

from __future__ import annotations

import http.client
import logging
from datetime import datetime, timedelta
from importlib import import_module
from operator import attrgetter
from pathlib import Path
from typing import Callable, NamedTuple
from uuid import uuid4

import backoff
from aiohttp.client_exceptions import ClientResponseError
from fireo.fields.errors import FieldValidationFailed, InvalidFieldType, RequiredField
from gcsfs import GCSFileSystem
from prefect import Flow, task
from prefect.tasks.control_flow import case, merge
from requests import ConnectionError

from .. import __version__
from ..database import constants as db_constants
from ..database import functions as db_functions
from ..database import models as db_models
from ..database.validators import is_secure_uri, resource_exists, try_url
from ..file_store import functions as fs_functions
from ..sr_models import WhisperModel
from ..utils import constants_utils, file_utils
from . import ingestion_models
from .ingestion_models import EventIngestionModel, Session
from .pipeline_config import EventGatherPipelineConfig
from .transcript_model import Transcript

###############################################################################

log = logging.getLogger(__name__)

###############################################################################


[docs] class SessionProcessingResult(NamedTuple): session: Session session_video_hosted_url: str session_content_hash: str audio_uri: str transcript: Transcript transcript_uri: str static_thumbnail_uri: str hover_thumbnail_uri: str
[docs] def import_get_events_func(func_path: str) -> Callable: path, func_name = str(func_path).rsplit(".", 1) mod = import_module(path) return getattr(mod, func_name)
[docs] @backoff.on_exception( backoff.expo, http.client.HTTPException, max_tries=3, ) def get_events_with_backoff( func: Callable, start_dt: datetime, end_dt: datetime, ) -> list[ingestion_models.EventIngestionModel]: return func(from_dt=start_dt, to_dt=end_dt)
[docs] def create_event_gather_flow( config: EventGatherPipelineConfig, from_dt: str | datetime | None = None, to_dt: str | datetime | None = None, prefetched_events: list[EventIngestionModel] | None = None, ) -> list[Flow]: """ Create the Prefect Flow object to preview, run, or visualize. Parameters ---------- config: EventGatherPipelineConfig Configuration options for the pipeline. from_dt: Optional[Union[str, datetime]] Optional ISO formatted string or datetime object to pass to the get_events function to act as the start point for event gathering. Default: None (two days ago) to_dt: Optional[Union[str, datetime]] Optional ISO formatted string or datetime object to pass to the get_events function to act as the end point for event gathering. Default: None (now) prefetched_events: Optional[List[EventIngestionModel]] A list of events to process instead of running the scraper found in the config. Default: None (use the scraper from the config) Returns ------- flows: list[Flow] The constructed CDP Event Gather Pipelines as a Prefect Flows. """ # Load get_events_func get_events_func = import_get_events_func(config.get_events_function_path) # Handle from datetime if isinstance(from_dt, str) and len(from_dt) != 0: from_datetime = datetime.fromisoformat(from_dt) elif isinstance(from_dt, datetime): from_datetime = from_dt else: from_datetime = datetime.utcnow() - timedelta( days=config.default_event_gather_from_days_timedelta, ) # Handle to datetime if isinstance(to_dt, str) and len(to_dt) != 0: to_datetime = datetime.fromisoformat(to_dt) elif isinstance(to_dt, datetime): to_datetime = to_dt else: to_datetime = datetime.utcnow() # Gather events log.info( f"Gathering events to process. " f"({from_datetime.isoformat()} - {to_datetime.isoformat()})" ) # Use prefetched events instead of get_events_func if provided if prefetched_events is not None: events = prefetched_events else: # Run the get events events = get_events_with_backoff( get_events_func, start_dt=from_datetime, end_dt=to_datetime, ) # Safety measure catch if events is None: events = [] # Create each session flow flows = [] log.info(f"Processing {len(events)} events.") for event in events: log.info(f"Creating flows for {len(event.sessions)} sessions.") for session in event.sessions: with Flow("cdp-session-processing") as flow: # Download video to local copy making # copy unique in case of shared session video resource_copy_filepath = resource_copy_task( uri=session.video_uri, dst=f"{str(uuid4())}_temp", copy_suffix=True, ) # Handle video conversion or non-secure resource # hosting ( tmp_video_filepath, session_video_hosted_url, session_content_hash, ) = convert_video_and_handle_host( video_filepath=resource_copy_filepath, session=session, credentials_file=config.google_credentials_file, bucket=config.validated_gcs_bucket_name, ) # Split audio and store (audio_uri, local_audio_path,) = split_audio( session_content_hash=session_content_hash, tmp_video_filepath=tmp_video_filepath, bucket=config.validated_gcs_bucket_name, credentials_file=config.google_credentials_file, ) # Check caption uri if session.caption_uri is not None: # If the caption doesn't exist, remove the value if not resource_exists(session.caption_uri): log.warning( f"File not found using provided caption URI: " f"'{session.caption_uri}'. " f"Removing the referenced caption URI." ) session.caption_uri = None # Generate transcript transcript_uri, transcript = generate_transcript( session_content_hash=session_content_hash, audio_path=local_audio_path, session=session, bucket=config.validated_gcs_bucket_name, credentials_file=config.google_credentials_file, whisper_model_name=config.whisper_model_name, whisper_model_confidence=config.whisper_model_confidence, ) # Generate thumbnails (static_thumbnail_uri, hover_thumbnail_uri,) = generate_thumbnails( session_content_hash=session_content_hash, tmp_video_path=tmp_video_filepath, event=event, bucket=config.validated_gcs_bucket_name, credentials_file=config.google_credentials_file, ) # Add audio uri and static thumbnail uri resource_delete_task( tmp_video_filepath, upstream_tasks=[audio_uri, static_thumbnail_uri] ) resource_delete_task( local_audio_path, upstream_tasks=[transcript], ) # Store all processed and provided data session_processing_result = compile_session_processing_result( session=session, session_video_hosted_url=session_video_hosted_url, session_content_hash=session_content_hash, audio_uri=audio_uri, transcript=transcript, transcript_uri=transcript_uri, static_thumbnail_uri=static_thumbnail_uri, hover_thumbnail_uri=hover_thumbnail_uri, ) # Store just this result store_event_processing_results( event=event, session_processing_results=[session_processing_result], credentials_file=config.google_credentials_file, bucket=config.validated_gcs_bucket_name, ) flows.append(flow) return flows
@task(max_retries=3, retry_delay=timedelta(seconds=120)) def resource_copy_task(uri: str, dst: str = None, copy_suffix: bool = False) -> str: """ Copy a file to a temporary location for processing. Parameters ---------- uri: str The URI to the file to copy. dst: Optional[str] An optional destination path for the file. Default: None (place the file in the current directory with the same file name) copy_suffix: bool A bool for if the file suffix should be copied. Default: False Returns ------- local_path: str The local path to the copied file. Notes ----- We sometimes get file downloading failures when running in parallel so this has two retries attached to it that will run after a failure on a 2 minute delay. """ return file_utils.resource_copy( uri=uri, dst=dst, copy_suffix=copy_suffix, overwrite=True, ) @task def resource_delete_task(uri: str) -> None: """ Remove local file. Parameters ---------- uri: str The local video file. """ fs_functions.remove_local_file(uri) @task def get_session_content_hash( tmp_video_filepath: str, ) -> str: """ Hash the video file content to get a unique identifier for the session. Parameters ---------- tmp_video_filepath: str The local path for video file to generate a hash for. Returns ------- session_content_hash: str The unique key (SHA256 hash of video content) for this session processing. """ # Hash the video contents return file_utils.hash_file_contents(uri=tmp_video_filepath) @task(nout=3) def convert_video_and_handle_host( # noqa: C901 video_filepath: str, session: Session, credentials_file: str, bucket: str, ) -> tuple[str, str, str]: """ Convert a video to MP4 (if necessary), upload it to the file store, and remove the original non-MP4 file that was resource copied. Additionally, if the video is hosted from an unsecure resource, host it ourselves. Parameters ---------- session_content_hash: str The content hash to use as the filename for the video once uploaded. video_filepath: Union[str, Path] The local path for video file to convert. session: Session The session to append the new MP4 video uri to. credentials_file: str Path to Google Service Account Credentials JSON file. bucket: str The GCS bucket to store the MP4 file to. Returns ------- mp4_filepath: str The local filepath of the converted MP4 file. hosted_video_uri: str The URI for the CDP hosted video. """ # Get file extension ext = Path(video_filepath).suffix.lower() log.info(f"Original video uri: '{session.video_uri}'") log.info(f"Handling video conversion and hosting for video: '{video_filepath}'") trim_video = bool(session.video_start_time or session.video_end_time) # Convert to mp4 if file isn't of approved web format cdp_will_host = False if ext not in [".mp4", ".webm"]: cdp_will_host = True # Convert video to mp4 mp4_filepath = file_utils.convert_video_to_mp4( video_filepath=Path(video_filepath), start_time=session.video_start_time, end_time=session.video_end_time, ) fs_functions.remove_local_file(video_filepath) # Update variable name for easier downstream typing video_filepath = str(mp4_filepath) # host trimmed videos because it's simpler than setting # up transcription and playback ranges elif trim_video: cdp_will_host = True # Trim video trimmed_filepath = file_utils.clip_and_reformat_video( video_filepath=Path(video_filepath), start_time=session.video_start_time, end_time=session.video_end_time, ) fs_functions.remove_local_file(video_filepath) # Update variable name for easier downstream typing video_filepath = str(trimmed_filepath) # Check if original session video uri is a m3u8 # We cant follow the normal coonvert video process from above # because the m3u8 looks to the URI for all the chunks elif session.video_uri.endswith(".m3u8"): cdp_will_host = True # Store if the original host isn't https elif not is_secure_uri(session.video_uri): log.info("Handling secure video URI") try: resource_uri = try_url(session.video_uri) except LookupError: # The provided URI could still be like GCS or S3 URI, which # works for download but not for streaming / hosting cdp_will_host = True else: if is_secure_uri(resource_uri): log.info( f"Found secure version of {session.video_uri}, " f"updating stored video URI." ) hosted_video_media_url = resource_uri else: cdp_will_host = True # Try with www elif not resource_exists(session.video_uri): log.info("Handling www URL problems") www_url = session.video_uri.replace("://", "://www.") if resource_exists(www_url): hosted_video_media_url = www_url # All good else: log.info(f"Video URI '{session.video_uri}' passed all checks.") hosted_video_media_url = session.video_uri # Get unique session identifier session_content_hash = file_utils.hash_file_contents(uri=video_filepath) # Upload and swap if cdp is hosting if cdp_will_host: # Upload to gcsfs log.info("Storing a copy of video to CDP filestore.") hosted_video_uri = fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=video_filepath, save_name=f"{session_content_hash}-video.mp4", ) # Create fs to generate hosted media URL hosted_video_media_url = fs_functions.get_open_url_for_gcs_file( credentials_file=credentials_file, uri=hosted_video_uri, ) log.info(f"Verified video URL: '{hosted_video_media_url}") return video_filepath, hosted_video_media_url, session_content_hash @task(nout=2) def split_audio( session_content_hash: str, tmp_video_filepath: str, bucket: str, credentials_file: str, ) -> tuple[str, str]: """ Split the audio from a local video file. Parameters ---------- session_content_hash: str The unique identifier for the session. tmp_video_filepath: str The local path for video file to generate a hash for. bucket: str The bucket to store the transcript to. credentials_file: str Path to Google Service Account Credentials JSON file. Returns ------- audio_uri: str The URI to the uploaded audio file. audio_local_path: str The local path for the split audio. """ # Check for existing audio local_audio_path = f"{session_content_hash}-audio.wav" audio_uri = fs_functions.get_file_uri( bucket=bucket, filename=local_audio_path, credentials_file=credentials_file, ) # If no pre-existing audio, split if audio_uri is None: # Split and store the audio in temporary file prior to upload ( local_audio_path, tmp_audio_log_out_filepath, tmp_audio_log_err_filepath, ) = file_utils.split_audio( video_read_path=tmp_video_filepath, audio_save_path=local_audio_path, overwrite=True, ) # Store audio and logs audio_uri = fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=local_audio_path, ) fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=tmp_audio_log_out_filepath, remove_local=True, ) fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=tmp_audio_log_err_filepath, remove_local=True, ) # Download to ensure we have it for later else: local_audio_path = fs_functions.download_file( credentials_file=credentials_file, bucket=bucket, remote_filepath=local_audio_path, save_path=local_audio_path, ) return audio_uri, local_audio_path @task(max_retries=2, retry_delay=timedelta(seconds=10)) def use_speech_to_text_and_generate_transcript( audio_path: str, model_name: str = "medium", confidence: float | None = None, ) -> Transcript: """ Pass the audio path to the speech recognition model. Parameters ---------- audio_path: str The URI to the audio path. The audio must be on the local machine. model_name: str The whisper model to use for transcription. confidence: Optional[float] The confidence to set the produce transcript to. Returns ------- transcript: Transcript The generated Transcript object. Notes ----- See the sr_models.whisper.WhisperModel code for more details about pass through params. """ # Init model model = WhisperModel(model_name=model_name, confidence=confidence) return model.transcribe(file_uri=audio_path) @task(nout=2) def finalize_and_archive_transcript( transcript: Transcript, transcript_save_path: str, bucket: str, credentials_file: str, session: Session, ) -> tuple[str, Transcript]: """ Finalize and store the transcript to GCP. Parameters ---------- transcript: Transcript The transcript to finish processing and store. transcript_save_path: str The path (or filename) to save the transcript at in the bucket. bucket: str The bucket to store the transcript to. credentials_file: str Path to Google Service Account Credentials JSON file. session: Session The event session to pull extra metadata from. Returns ------- transcript_uri: str The URI of the stored transcript JSON. transcript: Transcript The finalized in memory Transcript object. """ # Add session datetime to transcript transcript.session_datetime = session.session_datetime.isoformat() # Dump to JSON with open(transcript_save_path, "w") as open_resource: open_resource.write(transcript.to_json()) # Store to file store transcript_file_uri = fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=transcript_save_path, remove_local=True, ) return transcript_file_uri, transcript @task(nout=4) def check_for_existing_transcript( session_content_hash: str, bucket: str, credentials_file: str, ) -> tuple[str, str | None, Transcript | None, bool]: """ Check and load any existing transcript object. Parameters ---------- session_content_hash: str The unique key (SHA256 hash of video content) for this session processing. bucket: str The bucket to store the transcript to. credentials_file: str Path to Google Service Account Credentials JSON file. Returns ------- transcript_filename: str The filename of the transcript to create (or found). transcript_uri: Optional[str] If found, the transcript uri. Else, None. transcript: Optional[Transcript] If found, the loaded in-memory Transcript object. Else, None. transcript_exists: bool Boolean value for if the transcript was found or not. Required for downstream Prefect usage. """ # Combine to transcript filename tmp_transcript_filepath = ( f"{session_content_hash}-" f"cdp_{__version__.replace('.', '_')}-" f"transcript.json" ) # Check for existing transcript transcript_uri = fs_functions.get_file_uri( bucket=bucket, filename=tmp_transcript_filepath, credentials_file=credentials_file, ) transcript_exists = True if transcript_uri is not None else False # Load transcript if exists if transcript_exists: fs = GCSFileSystem(token=credentials_file) with fs.open(transcript_uri, "r") as open_resource: transcript = Transcript.from_json(open_resource.read()) else: transcript = None return (tmp_transcript_filepath, transcript_uri, transcript, transcript_exists)
[docs] def generate_transcript( session_content_hash: str, audio_path: str, session: Session, bucket: str, credentials_file: str, whisper_model_name: str = "medium", whisper_model_confidence: float | None = None, ) -> tuple[str, Transcript]: """ Route transcript generation to the correct processing. Parameters ---------- session_content_hash: str The unique key (SHA256 hash of video content) for this session processing. audio_path: str The path to the audio file to generate a transcript from. session: Session The specific session details to be used in final transcript upload and archival. bucket: str The name of the GCS bucket to upload the produced audio to. credentials_file: str Path to the GCS JSON credentials file. whisper_model_name: str The whisper model to use for transcription. whisper_model_confidence: Optional[float] The confidence to set the produce transcript to. Returns ------- transcript_uri: str The URI to the uploaded transcript file. transcript: Transcript The in-memory Transcript object. """ # Get unique transcript name from parameters and current lib version ( tmp_transcript_filepath, transcript_uri, transcript, transcript_exists, ) = check_for_existing_transcript( session_content_hash=session_content_hash, bucket=bucket, credentials_file=credentials_file, ) # If no pre-existing transcript with the same parameters, generate with case(transcript_exists, False): generated_transcript = use_speech_to_text_and_generate_transcript( audio_path=audio_path, model_name=whisper_model_name, confidence=whisper_model_confidence, ) # Add extra metadata and upload ( generated_transcript_uri, generated_transcript, ) = finalize_and_archive_transcript( transcript=generated_transcript, transcript_save_path=tmp_transcript_filepath, bucket=bucket, credentials_file=credentials_file, session=session, ) # Existing transcript with case(transcript_exists, True): found_transcript_uri = transcript_uri found_transcript = transcript # Merge the two paths and results # Set the names of the merge for visualization and testing purposes result_transcript_uri = merge( generated_transcript_uri, found_transcript_uri, ) result_transcript_uri.name = "merge_transcript_uri" result_transcript = merge( generated_transcript, found_transcript, ) result_transcript.name = "merge_in_memory_transcript" return (result_transcript_uri, result_transcript)
@task(nout=2) def generate_thumbnails( session_content_hash: str, tmp_video_path: str, event: EventIngestionModel, bucket: str, credentials_file: str, ) -> tuple[str, str]: """ Create static and hover thumbnails. Parameters ---------- session_content_hash: str The unique key (SHA256 hash of video content) for this session processing. tmp_video_path: str The URI to the video file to generate thumbnails from. event: EventIngestionModel The parent event of the session. bucket: str The name of the GCS bucket to upload the produced audio to. credentials_file: str Path to Google Service Account Credentials JSON file. Returns ------- static_thumbnail_url: str The URL of the static thumbnail, stored on GCS. hover_thumbnail_url: str The URL of the hover thumbnail, stored on GCS. """ if event.static_thumbnail_uri is None: # Generate new static_thumbnail_file = file_utils.get_static_thumbnail( tmp_video_path, session_content_hash ) else: static_thumbnail_file = file_utils.resource_copy( event.static_thumbnail_uri, session_content_hash ) static_thumbnail_url = fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=static_thumbnail_file, remove_local=True, ) if event.hover_thumbnail_uri is None: # Generate new hover_thumbnail_file = file_utils.get_hover_thumbnail( tmp_video_path, session_content_hash ) else: hover_thumbnail_file = file_utils.resource_copy( event.hover_thumbnail_uri, session_content_hash ) hover_thumbnail_url = fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=hover_thumbnail_file, remove_local=True, ) return ( static_thumbnail_url, hover_thumbnail_url, ) @task def compile_session_processing_result( session: Session, session_video_hosted_url: str, session_content_hash: str, audio_uri: str, transcript: Transcript, transcript_uri: str, static_thumbnail_uri: str, hover_thumbnail_uri: str, ) -> SessionProcessingResult: return SessionProcessingResult( session=session, session_video_hosted_url=session_video_hosted_url, session_content_hash=session_content_hash, audio_uri=audio_uri, transcript=transcript, transcript_uri=transcript_uri, static_thumbnail_uri=static_thumbnail_uri, hover_thumbnail_uri=hover_thumbnail_uri, ) def _process_person_ingestion( # noqa: C901 person: ingestion_models.Person, default_session: Session, credentials_file: str, bucket: str, upload_cache: dict[str, db_models.Person] | None = None, ) -> db_models.Person: # The JSON string of the whole person tree turns out to be a great cache key because # 1. we can hash strings (which means we can shove them into a dictionary) # 2. the JSON string store all of the attached seat and role information # So, if the same person is referenced multiple times in the ingestion model # but most of those references have the same data and only a few have different data # the produced JSON string will note the differences and run when it needs to. # Create upload cache if upload_cache is None: upload_cache = {} person_cache_key = person.to_json() if person_cache_key not in upload_cache: # Store person picture file person_picture_db_model: db_models.File | None if person.picture_uri is not None: try: tmp_person_picture_path = file_utils.resource_copy( uri=person.picture_uri, dst=f"{person.name}--person_picture", overwrite=True, ) destination_path = file_utils.generate_file_storage_name( tmp_person_picture_path, "person-picture", ) person_picture_uri = fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=tmp_person_picture_path, save_name=destination_path, remove_local=True, ) person_picture_db_model = db_functions.create_file( uri=person_picture_uri, credentials_file=credentials_file, ) person_picture_db_model = db_functions.upload_db_model( db_model=person_picture_db_model, credentials_file=credentials_file, ) except (FileNotFoundError, ClientResponseError, ConnectionError) as e: person_picture_db_model = None log.error( f"Person ('{person.name}'), picture URI could not be archived." f"({person.picture_uri}). Error: {e}" ) else: person_picture_db_model = None # Create person try: person_db_model = db_functions.create_person( person=person, picture_ref=person_picture_db_model, credentials_file=credentials_file, ) person_db_model = db_functions.upload_db_model( db_model=person_db_model, credentials_file=credentials_file, ) except ( FieldValidationFailed, RequiredField, InvalidFieldType, ConnectionError, ): log.error( f"Person ({person_db_model.to_dict()}), " f"was missing required information. Generating minimum person details." ) person_db_model = db_functions.create_minimal_person(person=person) # No ingestion model provided here so that we don't try to # re-validate the already failed model upload person_db_model = db_functions.upload_db_model( db_model=person_db_model, credentials_file=credentials_file, ) # Update upload cache with person upload_cache[person_cache_key] = person_db_model # Create seat if person.seat is not None: # Store seat picture file person_seat_image_db_model: db_models.File | None try: if person.seat.image_uri is not None: tmp_person_seat_image_path = file_utils.resource_copy( uri=person.seat.image_uri, dst=f"{person.name}--{person.seat.name}--seat_image", overwrite=True, ) destination_path = file_utils.generate_file_storage_name( tmp_person_seat_image_path, "seat-image", ) person_seat_image_uri = fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket, filepath=tmp_person_seat_image_path, save_name=destination_path, remove_local=True, ) person_seat_image_db_model = db_functions.create_file( uri=person_seat_image_uri, credentials_file=credentials_file ) person_seat_image_db_model = db_functions.upload_db_model( db_model=person_seat_image_db_model, credentials_file=credentials_file, ) else: person_seat_image_db_model = None except (FileNotFoundError, ClientResponseError, ConnectionError) as e: person_seat_image_db_model = None log.error( f"Person ('{person.name}'), seat image URI could not be archived." f"({person.seat.image_uri}). Error: {e}" ) # Actual seat creation person_seat_db_model = db_functions.create_seat( seat=person.seat, image_ref=person_seat_image_db_model, ) person_seat_db_model = db_functions.upload_db_model( db_model=person_seat_db_model, credentials_file=credentials_file, ) # Create roles if person.seat.roles is not None: for person_role in person.seat.roles: # Create any bodies for roles person_role_body_db_model: db_models.Body | None if person_role.body is not None: # Use or default role body start_datetime if person_role.body.start_datetime is None: person_role_body_start_datetime = ( default_session.session_datetime ) else: person_role_body_start_datetime = ( person_role.body.start_datetime ) person_role_body_db_model = db_functions.create_body( body=person_role.body, start_datetime=person_role_body_start_datetime, ) person_role_body_db_model = db_functions.upload_db_model( db_model=person_role_body_db_model, credentials_file=credentials_file, ) else: person_role_body_db_model = None # Use or default role start_datetime if person_role.start_datetime is None: person_role_start_datetime = default_session.session_datetime else: person_role_start_datetime = person_role.start_datetime # Actual role creation person_role_db_model = db_functions.create_role( role=person_role, person_ref=person_db_model, seat_ref=person_seat_db_model, start_datetime=person_role_start_datetime, body_ref=person_role_body_db_model, ) person_role_db_model = db_functions.upload_db_model( db_model=person_role_db_model, credentials_file=credentials_file, ) else: person_db_model = upload_cache[person_cache_key] log.info(f"Completed metadata processing for '{person.name}'.") return person_db_model def _calculate_in_majority( vote: ingestion_models.Vote, event_minutes_item: ingestion_models.EventMinutesItem, ) -> bool | None: # Voted to Approve or Approve-by-abstention-or-absence if vote.decision in [ db_constants.VoteDecision.APPROVE, db_constants.VoteDecision.ABSTAIN_APPROVE, db_constants.VoteDecision.ABSENT_APPROVE, ]: return ( event_minutes_item.decision == db_constants.EventMinutesItemDecision.PASSED ) # Voted to Reject or Reject-by-abstention-or-absence elif vote.decision in [ db_constants.VoteDecision.REJECT, db_constants.VoteDecision.ABSTAIN_REJECT, db_constants.VoteDecision.ABSENT_REJECT, ]: return ( event_minutes_item.decision == db_constants.EventMinutesItemDecision.FAILED ) # Explicit return None for "was turn absent or abstain" return None @task(max_retries=2, retry_delay=timedelta(seconds=60)) def store_event_processing_results( # noqa: C901 event: EventIngestionModel, session_processing_results: list[SessionProcessingResult], credentials_file: str, bucket: str, ) -> None: # TODO: check metadata before pipeline runs to avoid the many try excepts # Get first session first_session = min(event.sessions, key=attrgetter("session_index")) # Get high level event metadata and db models # Use or default body start_datetime if event.body.start_datetime is None: body_start_datetime = first_session.session_datetime else: body_start_datetime = event.body.start_datetime # Upload body body_db_model = db_functions.create_body( body=event.body, start_datetime=body_start_datetime, ) body_db_model = db_functions.upload_db_model( db_model=body_db_model, credentials_file=credentials_file, ) event_static_thumbnail_file_db_model = None event_hover_thumbnail_file_db_model = None for session_result in session_processing_results: # Upload static thumbnail static_thumbnail_file_db_model = db_functions.create_file( uri=session_result.static_thumbnail_uri, credentials_file=credentials_file, ) static_thumbnail_file_db_model = db_functions.upload_db_model( db_model=static_thumbnail_file_db_model, credentials_file=credentials_file, ) if event_static_thumbnail_file_db_model is None: event_static_thumbnail_file_db_model = static_thumbnail_file_db_model # Upload hover thumbnail hover_thumbnail_file_db_model = db_functions.create_file( uri=session_result.hover_thumbnail_uri, credentials_file=credentials_file, ) hover_thumbnail_file_db_model = db_functions.upload_db_model( db_model=hover_thumbnail_file_db_model, credentials_file=credentials_file, ) if event_hover_thumbnail_file_db_model is None: event_hover_thumbnail_file_db_model = hover_thumbnail_file_db_model # Upload event try: event_db_model = db_functions.create_event( body_ref=body_db_model, event_datetime=first_session.session_datetime, static_thumbnail_ref=event_static_thumbnail_file_db_model, hover_thumbnail_ref=event_hover_thumbnail_file_db_model, agenda_uri=event.agenda_uri, minutes_uri=event.minutes_uri, external_source_id=event.external_source_id, credentials_file=credentials_file, ) event_db_model = db_functions.upload_db_model( db_model=event_db_model, credentials_file=credentials_file, ) except (FieldValidationFailed, ConnectionError): log.error( f"Agenda and/or minutes docs could not be found. " f"Adding event without agenda and minutes URIs. " f"({event.agenda_uri} AND/OR {event.minutes_uri} do not exist)" ) event_db_model = db_functions.create_event( body_ref=body_db_model, event_datetime=first_session.session_datetime, static_thumbnail_ref=event_static_thumbnail_file_db_model, hover_thumbnail_ref=event_hover_thumbnail_file_db_model, external_source_id=event.external_source_id, credentials_file=credentials_file, ) event_db_model = db_functions.upload_db_model( db_model=event_db_model, credentials_file=credentials_file, ) # Iter sessions for session_result in session_processing_results: # Upload audio file audio_file_db_model = db_functions.create_file( uri=session_result.audio_uri, credentials_file=credentials_file, ) audio_file_db_model = db_functions.upload_db_model( db_model=audio_file_db_model, credentials_file=credentials_file, ) # Upload transcript file transcript_file_db_model = db_functions.create_file( uri=session_result.transcript_uri, credentials_file=credentials_file, ) transcript_file_db_model = db_functions.upload_db_model( db_model=transcript_file_db_model, credentials_file=credentials_file, ) # Create session session_db_model = db_functions.create_session( session=session_result.session, session_video_hosted_url=session_result.session_video_hosted_url, session_content_hash=session_result.session_content_hash, event_ref=event_db_model, credentials_file=credentials_file, ) session_db_model = db_functions.upload_db_model( db_model=session_db_model, credentials_file=credentials_file, ) # Create transcript transcript_db_model = db_functions.create_transcript( transcript_file_ref=transcript_file_db_model, session_ref=session_db_model, transcript=session_result.transcript, ) transcript_db_model = db_functions.upload_db_model( db_model=transcript_db_model, credentials_file=credentials_file, ) # Add event metadata processed_person_upload_cache: dict[str, db_models.Person] = {} if event.event_minutes_items is not None: for emi_index, event_minutes_item in enumerate(event.event_minutes_items): if event_minutes_item.matter is not None: # Create matter matter_db_model = db_functions.create_matter( matter=event_minutes_item.matter, ) matter_db_model = db_functions.upload_db_model( db_model=matter_db_model, credentials_file=credentials_file, ) # Add people from matter sponsors if event_minutes_item.matter.sponsors is not None: for sponsor_person in event_minutes_item.matter.sponsors: sponsor_person_db_model = _process_person_ingestion( person=sponsor_person, default_session=first_session, credentials_file=credentials_file, bucket=bucket, upload_cache=processed_person_upload_cache, ) # Create matter sponsor association matter_sponsor_db_model = db_functions.create_matter_sponsor( matter_ref=matter_db_model, person_ref=sponsor_person_db_model, ) matter_sponsor_db_model = db_functions.upload_db_model( db_model=matter_sponsor_db_model, credentials_file=credentials_file, ) else: matter_db_model = None # Create minutes item minutes_item_db_model = db_functions.create_minutes_item( minutes_item=event_minutes_item.minutes_item, matter_ref=matter_db_model, ) minutes_item_db_model = db_functions.upload_db_model( db_model=minutes_item_db_model, credentials_file=credentials_file, ) # Handle event minutes item index if event_minutes_item.index is None: event_minutes_item_index = emi_index else: event_minutes_item_index = event_minutes_item.index # Create event minutes item try: event_minutes_item_db_model = db_functions.create_event_minutes_item( event_minutes_item=event_minutes_item, event_ref=event_db_model, minutes_item_ref=minutes_item_db_model, index=event_minutes_item_index, ) event_minutes_item_db_model = db_functions.upload_db_model( db_model=event_minutes_item_db_model, credentials_file=credentials_file, ) except (FieldValidationFailed, InvalidFieldType): allowed_emi_decisions = constants_utils.get_all_class_attr_values( db_constants.EventMinutesItemDecision ) log.warning( f"Provided 'decision' is not an approved constant. " f"Provided: '{event_minutes_item.decision}' " f"Should be one of: {allowed_emi_decisions} " f"See: " f"cdp_backend.database.constants.EventMinutesItemDecision. " f"Creating EventMinutesItem without decision value." ) event_minutes_item_db_model = ( db_functions.create_minimal_event_minutes_item( event_ref=event_db_model, minutes_item_ref=minutes_item_db_model, index=event_minutes_item_index, ) ) event_minutes_item_db_model = db_functions.upload_db_model( db_model=event_minutes_item_db_model, credentials_file=credentials_file, ) # Create matter status if matter_db_model is not None and event_minutes_item.matter is not None: if event_minutes_item.matter.result_status is not None: matter_status_db_model = db_functions.create_matter_status( matter_ref=matter_db_model, event_minutes_item_ref=event_minutes_item_db_model, status=event_minutes_item.matter.result_status, update_datetime=first_session.session_datetime, ) try: matter_status_db_model = db_functions.upload_db_model( db_model=matter_status_db_model, credentials_file=credentials_file, ) except (FieldValidationFailed, RequiredField): allowed_matter_decisions = ( constants_utils.get_all_class_attr_values( db_constants.MatterStatusDecision ) ) log.error( f"Provided 'status' is not an approved constant. " f"Provided: '{event_minutes_item.matter.result_status}' " f"Should be one of: {allowed_matter_decisions} " f"See: " f"cdp_backend.database.constants.MatterStatusDecision. " f"Skipping matter status database upload." ) # Add supporting files for matter and event minutes item if event_minutes_item.supporting_files is not None: for supporting_file in event_minutes_item.supporting_files: try: file_uri = try_url(supporting_file.uri) supporting_file.uri = file_uri except LookupError as e: log.error( f"SupportingFile ('{supporting_file.uri}') " f"uri does not exist. Skipping. Error: {e}" ) continue # Archive as matter file if event_minutes_item.matter is not None: try: matter_file_db_model = db_functions.create_matter_file( matter_ref=matter_db_model, supporting_file=supporting_file, credentials_file=credentials_file, ) matter_file_db_model = db_functions.upload_db_model( db_model=matter_file_db_model, credentials_file=credentials_file, ) except ( FieldValidationFailed, ConnectionError, ) as e: log.error( f"MatterFile ('{supporting_file.uri}') " f"could not be archived. Skipping. Error: {e}" ) # Archive as event minutes item file try: event_minutes_item_file_db_model = ( db_functions.create_event_minutes_item_file( event_minutes_item_ref=event_minutes_item_db_model, supporting_file=supporting_file, credentials_file=credentials_file, ) ) event_minutes_item_file_db_model = db_functions.upload_db_model( db_model=event_minutes_item_file_db_model, credentials_file=credentials_file, ) except (FieldValidationFailed, ConnectionError) as e: log.error( f"EventMinutesItemFile ('{supporting_file.uri}') " f"could not be archived. Error: {e}" ) # Add vote information if event_minutes_item.votes is not None: # Protect against corrupted data if ( event_minutes_item.decision is not None and event_minutes_item.matter is not None ): for vote in event_minutes_item.votes: # Add people from voters vote_person_db_model = _process_person_ingestion( person=vote.person, default_session=first_session, credentials_file=credentials_file, bucket=bucket, upload_cache=processed_person_upload_cache, ) # Create vote try: vote_db_model = db_functions.create_vote( matter_ref=matter_db_model, event_ref=event_db_model, event_minutes_item_ref=event_minutes_item_db_model, person_ref=vote_person_db_model, decision=vote.decision, in_majority=_calculate_in_majority( vote=vote, event_minutes_item=event_minutes_item, ), external_source_id=vote.external_source_id, ) vote_db_model = db_functions.upload_db_model( db_model=vote_db_model, credentials_file=credentials_file, ) except ( FieldValidationFailed, RequiredField, InvalidFieldType, ): allowed_vote_decisions = ( constants_utils.get_all_class_attr_values( db_constants.VoteDecision ) ) log.error( f"Provided 'decision' is not an approved constant. " f"Provided: '{vote.decision}' " f"Should be one of: {allowed_vote_decisions} " f"See: cdp_backend.database.constants.VoteDecision. " f"Skipping vote database upload." ) else: log.error( f"Unable to process voting information for event minutes item: " f"'{event_minutes_item.minutes_item.name}'. " f"Votes were present but overall decision for the " f"event minutes item was 'None'." )