Source code for cdp_backend.bin.process_special_event
#!/usr/bin/env python
import argparse
import logging
import sys
import traceback
from pathlib import Path
from fsspec.core import url_to_fs
from fsspec.implementations.local import LocalFileSystem
from cdp_backend.file_store.functions import upload_file
from cdp_backend.pipeline import event_gather_pipeline as pipeline
from cdp_backend.pipeline.ingestion_models import EventIngestionModel
from cdp_backend.pipeline.pipeline_config import EventGatherPipelineConfig
###############################################################################
logging.basicConfig(
level=logging.INFO,
format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s",
)
log = logging.getLogger(__name__)
###############################################################################
[docs]
class Args(argparse.Namespace):
def __init__(self) -> None:
self.__parse()
def __parse(self) -> None:
p = argparse.ArgumentParser(
prog="process_special_event",
description="Process prefetched events (with remote or local files) "
+ "into the event pipeline.",
)
p.add_argument(
"--event_details_file",
type=Path,
help="Path to the JSON file with event details.",
)
p.add_argument(
"--event_gather_config_file",
type=Path,
help="Path to the JSON file with event gather pipeline configuration.",
)
p.parse_args(namespace=self)
[docs]
def main() -> None:
try:
args = Args()
# Read pipeline config
with open(args.event_gather_config_file) as open_resource:
config = EventGatherPipelineConfig.from_json(open_resource.read())
log.info("Parsing event details...")
# Convert event details file to EventIngestionModel
with open(args.event_details_file) as open_resource:
ingestion_model = EventIngestionModel.from_json(open_resource.read())
for session in ingestion_model.sessions:
# Copy if remote resource, otherwise use local file uri
fs, _ = url_to_fs(session.video_uri)
if isinstance(fs, LocalFileSystem):
# Upload video file to file store
log.info(f"Uploading {session.video_uri}...")
video_uri = upload_file(
credentials_file=config.google_credentials_file,
bucket=config.validated_gcs_bucket_name,
filepath=session.video_uri,
)
# Replace video_uri of session
session.video_uri = video_uri
# Create event gather pipeline flow
log.info("Beginning processing...")
# Get all flow definitions
flows = pipeline.create_event_gather_flow(
config=config,
prefetched_events=[ingestion_model],
)
# Run each pipeline
states = []
for flow in flows:
states.append(flow.run())
# Track errored states
errored_states = []
for state in states:
if state.is_failed():
errored_states.append(state)
# Handle errors
if len(errored_states) > 0:
log.error(f"{len(errored_states)} / {len(flows)} flows failed.")
raise ValueError("Flow run failed.")
else:
log.info(f"{len(states)} flows ran successfully.")
except Exception as e:
log.error("=============================================")
log.error("\n\n" + traceback.format_exc())
log.error("=============================================")
log.error("\n\n" + str(e) + "\n")
log.error("=============================================")
sys.exit(1)
###############################################################################
if __name__ == "__main__":
main()