cdp_backend.pipeline package#

Submodules#

cdp_backend.pipeline.event_gather_pipeline module#

class cdp_backend.pipeline.event_gather_pipeline.SessionProcessingResult(session, session_video_hosted_url, session_content_hash, audio_uri, transcript, transcript_uri, static_thumbnail_uri, hover_thumbnail_uri)[source]#

Bases: NamedTuple

Create new instance of SessionProcessingResult(session, session_video_hosted_url, session_content_hash, audio_uri, transcript, transcript_uri, static_thumbnail_uri, hover_thumbnail_uri)

audio_uri: str#

Alias for field number 3

hover_thumbnail_uri: str#

Alias for field number 7

session: Session#

Alias for field number 0

session_content_hash: str#

Alias for field number 2

session_video_hosted_url: str#

Alias for field number 1

static_thumbnail_uri: str#

Alias for field number 6

transcript: Transcript#

Alias for field number 4

transcript_uri: str#

Alias for field number 5

cdp_backend.pipeline.event_gather_pipeline.create_event_gather_flow(config: EventGatherPipelineConfig, from_dt: str | datetime | None = None, to_dt: str | datetime | None = None, prefetched_events: list[EventIngestionModel] | None = None) list[Flow][source]#

Create the Prefect Flow object to preview, run, or visualize.

Parameters:
config: EventGatherPipelineConfig

Configuration options for the pipeline.

from_dt: Optional[Union[str, datetime]]

Optional ISO formatted string or datetime object to pass to the get_events function to act as the start point for event gathering. Default: None (two days ago)

to_dt: Optional[Union[str, datetime]]

Optional ISO formatted string or datetime object to pass to the get_events function to act as the end point for event gathering. Default: None (now)

prefetched_events: Optional[List[EventIngestionModel]]

A list of events to process instead of running the scraper found in the config. Default: None (use the scraper from the config)

Returns:
flows: list[Flow]

The constructed CDP Event Gather Pipelines as a Prefect Flows.

cdp_backend.pipeline.event_gather_pipeline.generate_transcript(session_content_hash: str, audio_path: str, session: Session, bucket: str, credentials_file: str, whisper_model_name: str = 'medium', whisper_model_confidence: float | None = None) tuple[str, Transcript][source]#

Route transcript generation to the correct processing.

Parameters:
session_content_hash: str

The unique key (SHA256 hash of video content) for this session processing.

audio_path: str

The path to the audio file to generate a transcript from.

session: Session

The specific session details to be used in final transcript upload and archival.

bucket: str

The name of the GCS bucket to upload the produced audio to.

credentials_file: str

Path to the GCS JSON credentials file.

whisper_model_name: str

The whisper model to use for transcription.

whisper_model_confidence: Optional[float]

The confidence to set the produce transcript to.

Returns:
transcript_uri: str

The URI to the uploaded transcript file.

transcript: Transcript

The in-memory Transcript object.

cdp_backend.pipeline.event_gather_pipeline.get_events_with_backoff(func: Callable, start_dt: datetime, end_dt: datetime) list[EventIngestionModel][source]#
cdp_backend.pipeline.event_gather_pipeline.import_get_events_func(func_path: str) Callable[source]#

cdp_backend.pipeline.generate_event_index_pipeline module#

class cdp_backend.pipeline.generate_event_index_pipeline.ContextualizedGram(event_id: 'str', event_datetime: 'datetime', unstemmed_gram: 'str', stemmed_gram: 'str', context_span: 'str')[source]#

Bases: DataClassJsonMixin

context_span: str#
event_datetime: datetime#
event_id: str#
stemmed_gram: str#
unstemmed_gram: str#
class cdp_backend.pipeline.generate_event_index_pipeline.EventTranscripts(event_id, event_datetime, transcript_db_files)[source]#

Bases: NamedTuple

Create new instance of EventTranscripts(event_id, event_datetime, transcript_db_files)

event_datetime: datetime#

Alias for field number 1

event_id: str#

Alias for field number 0

transcript_db_files: list[File]#

Alias for field number 2

class cdp_backend.pipeline.generate_event_index_pipeline.SentenceManager(original_details: 'Sentence', cleaned_text: 'str', n_grams: 'list[tuple[str]]')[source]#

Bases: DataClassJsonMixin

cleaned_text: str#
n_grams: list[tuple[str]]#
original_details: Sentence#
cdp_backend.pipeline.generate_event_index_pipeline.create_event_index_generation_pipeline(config: EventIndexPipelineConfig, n_grams: int = 1, ngrams_per_chunk: int = 50000, store_remote: bool = False) Flow[source]#

Create the Prefect Flow object to preview, run, or visualize for indexing all events in the database.

Parameters:
config: EventIndexPipelineConfig

Configuration options for the pipeline.

n_grams: int

N number of terms to act as a unique entity. Default: 1

ngrams_per_chunk: int

The number of ngrams to store in a single chunk file. Default: 50_000

store_remote: bool

Should the generated index chunks be sent to cloud storage. Default: False (only store locally)

Returns:
flow: Flow

The constructed CDP Event Index Pipeline as a Prefect Flow.

cdp_backend.pipeline.ingestion_models module#

class cdp_backend.pipeline.ingestion_models.Body(name: str, is_active: bool = True, start_datetime: datetime | None = None, description: str | None = None, end_datetime: datetime | None = None, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

A meeting body. This can be full council, a subcommittee, or “off-council” matters such as election debates.

Notes

If start_datetime is not provided, and the Body did not exist prior to ingestion, the session datetime associated with this ingestion will be used as start_datetime during storage.

description: str | None = None#
end_datetime: datetime | None = None#
external_source_id: str | None = None#
is_active: bool = True#
name: str#
start_datetime: datetime | None = None#
class cdp_backend.pipeline.ingestion_models.EventIngestionModel(body: Body, sessions: list[Session], event_minutes_items: list[EventMinutesItem] | None = None, agenda_uri: str | None = None, minutes_uri: str | None = None, static_thumbnail_uri: str | None = None, hover_thumbnail_uri: str | None = None, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

An event can be a normally scheduled meeting, a special event such as a press conference or election debate, and, can be upcoming or historical.

Notes

If static_thumbnail_uri and/or hover_thumbnail_uri is not provided, it will be generated during pipeline processing.

The earliest session_datetime will be used for the overall event_datetime.

agenda_uri: str | None = None#
body: Body#
event_minutes_items: list[EventMinutesItem] | None = None#
external_source_id: str | None = None#
hover_thumbnail_uri: str | None = None#
minutes_uri: str | None = None#
sessions: list[Session]#
static_thumbnail_uri: str | None = None#
class cdp_backend.pipeline.ingestion_models.EventMinutesItem(minutes_item: MinutesItem, index: int | None = None, matter: Matter | None = None, supporting_files: list[SupportingFile] | None = None, decision: str | None = None, votes: list[Vote] | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

Details about a specific item during an event.

Notes

If index is not provided, the index will be set to the index of the item in the whole EventMinutesItem list on Event.

If matter is provided, the supporting_files will be additionally be stored as MatterFile.

decision: str | None = None#
index: int | None = None#
matter: Matter | None = None#
minutes_item: MinutesItem#
supporting_files: list[SupportingFile] | None = None#
votes: list[Vote] | None = None#
class cdp_backend.pipeline.ingestion_models.IngestionModel[source]#

Bases: object

Base class for IngestionModel type.

class cdp_backend.pipeline.ingestion_models.Matter(name: str, matter_type: str, title: str, result_status: str | None = None, sponsors: list[Person] | None = None, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

A matter is a specific legislative document. e.g. A bill, resolution, initiative, etc.

external_source_id: str | None = None#
matter_type: str#
name: str#
result_status: str | None = None#
sponsors: list[Person] | None = None#
title: str#
class cdp_backend.pipeline.ingestion_models.MinutesItem(name: str, description: str | None = None, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

An item referenced during a meeting. This can be a matter but it can be a presentation or budget file, etc.

description: str | None = None#
external_source_id: str | None = None#
name: str#
class cdp_backend.pipeline.ingestion_models.Person(name: str, is_active: bool = True, router_string: str | None = None, email: str | None = None, phone: str | None = None, website: str | None = None, picture_uri: str | None = None, seat: Seat | None = None, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

Primarily the council members, this could technically include the mayor or city manager, or any other “normal” presenters and attendees of meetings.

Notes

If router_string is not provided, and the Person did not exist prior to ingestion, router_string will be generated from name.

The email, phone, website will be updated if changed from prior values. The picture will be uploaded or updated in the CDP file storage system.

If person is operating under new roles or new seat, new Role and Seat documents will be stored.

email: str | None = None#
external_source_id: str | None = None#
is_active: bool = True#
name: str#
phone: str | None = None#
picture_uri: str | None = None#
router_string: str | None = None#
seat: Seat | None = None#
website: str | None = None#
class cdp_backend.pipeline.ingestion_models.Role(title: str, body: Body | None = None, start_datetime: datetime | None = None, end_datetime: datetime | None = None, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

A role is a person’s job for a period of time in the city council. A person can (and should) have multiple roles. For example: a person has two terms as city council member for district four then a term as city council member for a citywide seat. Roles can also be tied to committee chairs. For example: a council member spends a term on the transportation committee and then spends a term on the finance committee.

Notes

If start_datetime is not provided, and the Role did not exist prior to ingestion, the session datetime associated with this ingestion will be used as start_datetime during storage.

body: Body | None = None#
end_datetime: datetime | None = None#
external_source_id: str | None = None#
start_datetime: datetime | None = None#
title: str#
class cdp_backend.pipeline.ingestion_models.Seat(name: str, electoral_area: str | None = None, electoral_type: str | None = None, image_uri: str | None = None, external_source_id: str | None = None, roles: list[Role] | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

An electable office on the City Council. I.E. “Position 9”.

Notes

The electoral_area and electoral_type will be updated if changed from prior values. The image will be uploaded or updated in the CDP file storage system.

electoral_area: str | None = None#
electoral_type: str | None = None#
external_source_id: str | None = None#
image_uri: str | None = None#
name: str#
roles: list[Role] | None = None#
class cdp_backend.pipeline.ingestion_models.Session(session_datetime: datetime, video_uri: str, session_index: int, video_start_time: str | None = None, video_end_time: str | None = None, caption_uri: str | None = None, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

A session is a working period for an event. For example, an event could have a morning and afternoon session.

Notes

video_start_time is a duration relative to the beginning of the video in HH:MM:SS format. It does not affect nor is relative to session_datetime or any other datetime. If the portion of the video relavent to the session begins 37m50s into the full video, video_start_time will be “37:50”. An absent start time is equivalent to the beginning of the video, and an absent end time is equivalent to the end of the video, so either can be omitted.

caption_uri: str | None = None#
external_source_id: str | None = None#
session_datetime: datetime#
session_index: int#
video_end_time: str | None = None#
video_start_time: str | None = None#
video_uri: str#
class cdp_backend.pipeline.ingestion_models.SupportingFile(name: str, uri: str, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

A file related tied to a matter or minutes item.

Notes

This file is not stored in the CDP file storage system.

external_source_id: str | None = None#
name: str#
uri: str#
class cdp_backend.pipeline.ingestion_models.Vote(person: Person, decision: str, external_source_id: str | None = None)[source]#

Bases: IngestionModel, DataClassJsonMixin

A reference tying a specific person and an event minutes item together.

Notes

The in_majority field stored in the database will be calculated from the provided list of votes.

decision: str#
external_source_id: str | None = None#
person: Person#

cdp_backend.pipeline.mock_get_events module#

cdp_backend.pipeline.mock_get_events.filled_get_events(**kwargs: Any) list[EventIngestionModel][source]#
cdp_backend.pipeline.mock_get_events.get_events(**kwargs: Any) list[EventIngestionModel][source]#

A mock get_events function that will generate entirely random events based off a set of permutation settings.

Useful when running a pipeline test against a dev infrastructure.

Also imported and used in the example repo and integration tests.

cdp_backend.pipeline.mock_get_events.many_get_events(**kwargs: Any) list[EventIngestionModel][source]#
cdp_backend.pipeline.mock_get_events.min_get_events(**kwargs: Any) list[EventIngestionModel][source]#

cdp_backend.pipeline.pipeline_config module#

class cdp_backend.pipeline.pipeline_config.EventGatherPipelineConfig(google_credentials_file: str, get_events_function_path: str, gcs_bucket_name: str | None = None, whisper_model_name: str = 'medium', whisper_model_confidence: float | None = None, default_event_gather_from_days_timedelta: int = 2)[source]#

Bases: DataClassJsonMixin

Configuration options for the CDP event gather pipeline.

Parameters:
google_credentials_file: str

Path to the Google Service Account Credentials JSON file.

get_events_function_path: str

Path to the function (including function name) that supplies event data to the CDP event gather pipeline.

gcs_bucket_name: Optional[str]

The name of the Google Storage bucket for CDP generated files. Default: None (parse from the Google Service Account Credentials JSON file)

whisper_model_name: str

Passthrough to sr_models.whisper.WhisperModel.

whisper_model_confidence: Optional[float]

Passthrough to sr_models.whisper.WhisperModel.

default_event_gather_from_days_timedelta: int

Default number of days to subtract from current time to then pass to the provided get_events function as the from_dt datetime. Default: 2 (from_dt will be set to current datetime - 2 days)

default_event_gather_from_days_timedelta: int = 2#
gcs_bucket_name: str | None = None#
get_events_function_path: str#
google_credentials_file: str#
property validated_gcs_bucket_name: str#

GCS bucket name after it has been validated to exist.

whisper_model_confidence: float | None = None#
whisper_model_name: str = 'medium'#
class cdp_backend.pipeline.pipeline_config.EventIndexPipelineConfig(google_credentials_file: str, gcs_bucket_name: str | None = None, datetime_weighting_days_decay: int = 30, local_storage_dir: str | Path = 'index/')[source]#

Bases: DataClassJsonMixin

Configuration options for the CDP event index pipeline.

Parameters:
google_credentials_file: str

Path to the Google Service Account Credentials JSON file.

gcs_bucket_name: Optional[str]

The name of the Google Storage bucket for CDP generated files. Default: None (parse from the Google Service Account Credentials JSON file)

datetime_weighting_days_decay: int

The number of days that grams from an event should be labeled as more relevant. Default: 30 (grams from events less than 30 days old will generally be valued higher than their pure relevance score)

local_storage_dir: Union[str, Path]

The local storage directory to store the chunked index prior to upload. Default: “index/” (current working directory / index)

datetime_weighting_days_decay: int = 30#
gcs_bucket_name: str | None = None#
google_credentials_file: str#
local_storage_dir: str | Path = 'index/'#
property validated_gcs_bucket_name: str#

GCS bucket name after it has been validated to exist.

cdp_backend.pipeline.process_event_index_chunk_pipeline module#

class cdp_backend.pipeline.process_event_index_chunk_pipeline.AlmostCompleteIndexedEventGram(event_id, unstemmed_gram, stemmed_gram, context_span, value, datetime_weighted_value)[source]#

Bases: NamedTuple

Create new instance of AlmostCompleteIndexedEventGram(event_id, unstemmed_gram, stemmed_gram, context_span, value, datetime_weighted_value)

context_span: str#

Alias for field number 3

datetime_weighted_value: float#

Alias for field number 5

event_id: str#

Alias for field number 0

stemmed_gram: str#

Alias for field number 2

unstemmed_gram: str#

Alias for field number 1

value: float#

Alias for field number 4

cdp_backend.pipeline.process_event_index_chunk_pipeline.create_event_index_upload_pipeline(config: EventIndexPipelineConfig, index_chunk: str | Path, upload_batch_size: int = 500) Flow[source]#

Create the Prefect Flow object to preview, run, or visualize for uploading a generated index for all events in the database.

Parameters:
config: EventIndexPipelineConfig

Configuration options for the pipeline.

index_chunk: int

Path to the index chunk file to process.

upload_batch_size: int

Number of ngrams to upload to database in a single batch. Default: 500 (max)

Returns:
flow: Flow

The constructed CDP Event Index Pipeline as a Prefect Flow.

cdp_backend.pipeline.transcript_model module#

class cdp_backend.pipeline.transcript_model.SectionAnnotation(name: str, start_sentence_index: int, stop_sentence_index: int | None, generator: str, description: str | None = None)[source]#

Bases: DataClassJsonMixin

A section annotation used for topic segmentation and minutes item alignment.

Parameters:
name: str

The name of the sections.

start_sentence_index: int

The sentence index that acts as the starting point for the section.

stop_sentence_index: int

The sentence index that acts as the stopping point for the section.

generator: str

A description of the algorithm or annotator that provided this annotation.

description: Optional[str]

An optional description of what the section is about. Default: None

Notes

The attributes start_sentence_index and stop_sentence_index should be treated as inclusive and exclusive respectively, exactly like how the Python slice function works.

I.e. given a transcript of ordered sentences, the sentence indices will work as the parameters for a slice against the list of sentences: sentences[start_sentence_index:stop_sentence_index]

Examples

Usage pattern for annotation attachment.

>>> transcript.annotations.sections = [
...     SectionAnnotation(
...         name="Public Comment",
...         start_sentence_index=12,
...         stop_sentence_index=87,
...         generator="Eva Maxfield Brown",
...     ),
...     SectionAnnotation(
...         name="CB 120121",
...         start_sentence_index=243,
...         stop_sentence_index=419,
...         description="AN ORDINANCE relating to land use and zoning ...",
...         generator="queue-cue--v1.0.0",
...     ),
... ]
description: str | None = None#
generator: str#
name: str#
start_sentence_index: int#
stop_sentence_index: int | None#
class cdp_backend.pipeline.transcript_model.Sentence(index: int, confidence: float, start_time: float, end_time: float, words: List[Word], text: str, speaker_index: int | None = None, speaker_name: str | None = None, annotations: SentenceAnnotations | None = None)[source]#

Bases: DataClassJsonMixin

Data for a sentence in a transcript.

Parameters:
index: int

The index of the sentence in it’s respective transcript.

confidence: float

A number between 0 and 1 for the confidence of the sentence accuracy.

start_time: float

Time in seconds for when this sentence begins.

end_time: float

Time in seconds for when this sentence ends.

speaker_index: Optional[int]

The optional speaker index for the sentence.

speaker_name: Optional[str]

The optional speaker name for the sentence.

annotations: Optional[SentenceAnnotations]

Any annotations specific to this sentence. Default: None (no annotations)

words: List[Word]

The list of word for the sentence.

text: str

The text of the sentence including all formatting and non-deliminating chars.

annotations: SentenceAnnotations | None = None#
confidence: float#
end_time: float#
index: int#
speaker_index: int | None = None#
speaker_name: str | None = None#
start_time: float#
text: str#
words: List[Word]#
class cdp_backend.pipeline.transcript_model.SentenceAnnotations[source]#

Bases: DataClassJsonMixin

Annotations that can appear on an individual sentence level.

class cdp_backend.pipeline.transcript_model.Transcript(generator: str, confidence: float, session_datetime: str | None, created_datetime: str, sentences: List[Sentence], annotations: TranscriptAnnotations | None = None)[source]#

Bases: DataClassJsonMixin

Transcript model for all transcripts in CDP databases / filestores.

Parameters:
generator: str

A descriptive name of the generative process that produced this transcript. Example: “Google Speech-to-Text – Lib Version: 2.0.1”

confidence: float

A number between 0 and 1. If available, use the average of all confidence annotations reported for each text block in the transcript. Otherwise, make an estimation for (or manually calculate): n-correct-tokens / n-total-tokens for the whole transcript.

session_datetime: Optional[str]

ISO formatted datetime for the session that this document transcribes.

created_datetime: str

ISO formatted datetime for when this transcript was created.

sentences: List[Sentence]

A list of sentences.

annotations: Optional[TranscriptAnnotations]

Any annotations that can be applied to the whole transcript. Default: None (no annotations)

Examples

Dumping transcript to JSON file.

>>> # transcript = Transcript(...)
... with open("transcript.json", "w") as open_resource:
...     open_resource.write(transcript.to_json())

Reading transcript from JSON file.

>>> with open("transcript.json", "r") as open_resource:
...     transcript = Transcript.from_json(open_resource.read())
annotations: TranscriptAnnotations | None = None#
confidence: float#
created_datetime: str#
generator: str#
sentences: List[Sentence]#
session_datetime: str | None#
class cdp_backend.pipeline.transcript_model.TranscriptAnnotations(sections: List[SectionAnnotation] | None = None)[source]#

Bases: DataClassJsonMixin

Annotations that can appear (but are not guaranteed) for the whole transcript.

sections: List[SectionAnnotation] | None = None#
class cdp_backend.pipeline.transcript_model.Word(index: int, start_time: float, end_time: float, text: str, annotations: WordAnnotations | None = None)[source]#

Bases: DataClassJsonMixin

Data for a word in a transcript.

Parameters:
index: int

The index of the word in it’s respective sentence.

start_time: float

Time in seconds for when this word begins.

end_time: float

Time in seconds for when this word ends.

text: str

The raw text of the word, lowercased and cleaned of all non-deliminating chars.

annotations: Optional[WordAnnotations]

Any annotations specific to this word. Default: None (no annotations)

annotations: WordAnnotations | None = None#
end_time: float#
index: int#
start_time: float#
text: str#
class cdp_backend.pipeline.transcript_model.WordAnnotations[source]#

Bases: DataClassJsonMixin

Annotations that can appear on an individual word level.

Module contents#

Pipeline package for cdp_backend.