Source code for cdp_backend.bin.run_cdp_event_gather

#!/usr/bin/env python

import argparse
import logging
import sys
import traceback
from pathlib import Path

from cdp_backend.pipeline import event_gather_pipeline as pipeline
from cdp_backend.pipeline.pipeline_config import EventGatherPipelineConfig

###############################################################################

logging.basicConfig(
    level=logging.INFO,
    format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s",
)
log = logging.getLogger(__name__)

###############################################################################


[docs] class Args(argparse.Namespace): def __init__(self) -> None: self.__parse() def __parse(self) -> None: p = argparse.ArgumentParser( prog="run_cdp_event_gather", description="Gather, process, and store event data to CDP infrastructure.", ) p.add_argument( "config_file", type=Path, help=( "Path to the pipeline configuration file. " "See cdp_backend.pipeline.pipeline_config.EventGatherPipelineConfig " "for more details." ), ) p.add_argument( "-f", "--from", type=str, default=None, help=( "Optional ISO formatted string to pass to the get_event function to act" "as the start point for event gathering." ), dest="from_dt", ) p.add_argument( "-t", "--to", type=str, default=None, help=( "Optional ISO formatted string to pass to the get_event function to act" "as the end point for event gathering." ), dest="to_dt", ) p.parse_args(namespace=self)
[docs] def main() -> None: try: args = Args() with open(args.config_file) as open_resource: config = EventGatherPipelineConfig.from_json(open_resource.read()) # Get all flow definitions flows = pipeline.create_event_gather_flow( config=config, from_dt=args.from_dt, to_dt=args.to_dt, ) # Run each pipeline states = [] for flow in flows: states.append(flow.run()) # Track errored states errored_states = [] for state in states: if state.is_failed(): errored_states.append(state) # Handle errors if len(errored_states) > 0: log.error(f"{len(errored_states)} / {len(flows)} flows failed.") raise ValueError("Flow run failed.") else: log.info(f"{len(states)} flows ran successfully.") except Exception as e: log.error("=============================================") log.error("\n\n" + traceback.format_exc()) log.error("=============================================") log.error("\n\n" + str(e) + "\n") log.error("=============================================") sys.exit(1)
############################################################################### # Allow caller to directly run this module (usually in development scenarios) if __name__ == "__main__": main()