Source code for cdp_backend.bin.add_content_hash_to_sessions
#!/usr/bin/env python
import argparse
import logging
import sys
import traceback
from pathlib import Path
import fireo
from cdp_backend.database.models import Session, Transcript
###############################################################################
logging.basicConfig(
level=logging.INFO,
format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s",
)
log = logging.getLogger(__name__)
###############################################################################
[docs]
class Args(argparse.Namespace):
def __init__(self) -> None:
self.__parse()
def __parse(self) -> None:
p = argparse.ArgumentParser(
prog="add_content_hash_to_sessions",
description=(
"Add content hash to all existing session rows in the database."
),
)
p.add_argument(
"--google_credentials_file",
type=Path,
help="Path to Google service account JSON key.",
)
p.parse_args(namespace=self)
###############################################################################
[docs]
def add_content_hash_to_sessions(google_creds_path: Path) -> None:
# Connect to database
fireo.connection(from_file=google_creds_path)
# Fetch all sessions
sessions = Session.collection.fetch()
# Sessions without a content hash
unfixed_sessions = {s.id for s in sessions if not s.session_content_hash}
# Fetch all transcripts
transcripts = Transcript.collection.fetch()
# Unfortunately firestore doesn't have built in distinct querying
# Create list where each transcripts is for a unique session
transcripts_unique_sessions = list({t.session_ref: t for t in transcripts}.values())
# Fetch all sessions
for transcript in transcripts_unique_sessions:
# Get file db ref
_file = transcript.file_ref.get()
# Get session
session = transcript.session_ref.get()
# If no session_content_hash found
if not session.session_content_hash:
# Extract hash from file
session_content_hash = _file.name.split("-")[0]
# Need to set event reference since autoload is disabled,
# and FireO throws an error if the model has a ReferenceDocLoader
# on a property during any db write action
session.event_ref = session.event_ref.get()
# Give GCSFilSystem permissions to read GCS resources
session.set_validator_kwargs(
kwargs={"google_credentials_file": str(google_creds_path)}
)
# Add content hash to session db model
session.session_content_hash = session_content_hash
# Upsert existing session
session.upsert()
log.info(f"Updated session {session.id} with content hash")
# Mark session as fixed
if session.id in unfixed_sessions:
unfixed_sessions.remove(session.id)
# Log any sessions that still don't have a content hash
# Could happen if there are db inconsistencies
# (like a session is orphaned w/o a transcript)
if unfixed_sessions:
log.error(
"The following sessions were not fixed with a "
f"content hash: {unfixed_sessions}"
)
[docs]
def main() -> None:
try:
args = Args()
add_content_hash_to_sessions(
google_creds_path=args.google_credentials_file,
)
except Exception as e:
log.error("=============================================")
log.error("\n\n" + traceback.format_exc())
log.error("=============================================")
log.error("\n\n" + str(e) + "\n")
log.error("=============================================")
sys.exit(1)