Source code for cdp_backend.bin.run_cdp_event_index_generation

#!/usr/bin/env python

import argparse
import logging
import sys
import traceback
from pathlib import Path

from distributed import LocalCluster
from prefect import executors

from cdp_backend.pipeline import generate_event_index_pipeline as pipeline
from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig

###############################################################################

logging.basicConfig(
    level=logging.INFO,
    format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s",
)
log = logging.getLogger(__name__)

###############################################################################


[docs] class Args(argparse.Namespace): def __init__(self) -> None: self.__parse() def __parse(self) -> None: p = argparse.ArgumentParser( prog="run_cdp_event_index_generation", description=( "Index all event (session) transcripts from a CDP infrastructure." ), ) p.add_argument( "config_file", type=Path, help=( "Path to the pipeline configuration file. " "See cdp_backend.pipeline.pipeline_config.EventIndexPipelineConfig " "for more details." ), ) p.add_argument( "-n", "--n_grams", type=int, default=1, help="N number of terms to act as a unique entity.", ) p.add_argument( "--ngrams_per_chunk", type=int, default=50_000, help="Number of ngrams to store in a single chunk file.", ) p.add_argument( "-s", "--store_remote", action="store_true", dest="store_remote", help=( "Store chunks to remote cloud storage. " "Required to add a search index." ), ) p.add_argument( "-p", "--parallel", action="store_true", dest="parallel", help=( "Boolean option to spin up a local multi-threaded " "Dask Distributed cluster for event processing." ), ) p.parse_args(namespace=self)
[docs] def main() -> None: try: args = Args() with open(args.config_file) as open_resource: config = EventIndexPipelineConfig.from_json(open_resource.read()) # Get flow definition flow = pipeline.create_event_index_generation_pipeline( config=config, n_grams=args.n_grams, ngrams_per_chunk=args.ngrams_per_chunk, store_remote=args.store_remote, ) # Determine executor if args.parallel: # Create local cluster log.info("Creating LocalCluster") cluster = LocalCluster(processes=False) log.info("Created LocalCluster") # Set distributed_executor_address distributed_executor_address = cluster.scheduler_address # Log dashboard URI log.info(f"Dask dashboard available at: {cluster.dashboard_link}") # Use dask cluster state = flow.run( executor=executors.DaskExecutor(address=distributed_executor_address), ) # Shutdown cluster after run cluster.close() else: state = flow.run() if state.is_failed(): raise ValueError("Flow run failed.") except Exception as e: log.error("=============================================") log.error("\n\n" + traceback.format_exc()) log.error("=============================================") log.error("\n\n" + str(e) + "\n") log.error("=============================================") sys.exit(1)
############################################################################### # Allow caller to directly run this module (usually in development scenarios) if __name__ == "__main__": main()