Source code for cdp_scrapers.legistar_utils

#!/usr/bin/env python

from __future__ import annotations

import enum
import logging
import re
from copy import deepcopy
from datetime import datetime, timedelta
from json import JSONDecodeError
from typing import Any, NamedTuple
from urllib.error import HTTPError, URLError
from urllib.parse import quote_plus, urlsplit
from urllib.request import urlopen

import requests
from bs4 import BeautifulSoup
from cdp_backend.database.constants import (
    EventMinutesItemDecision,
    MatterStatusDecision,
    VoteDecision,
)
from cdp_backend.pipeline.ingestion_models import (
    Body,
    EventIngestionModel,
    EventMinutesItem,
    Matter,
    MinutesItem,
    Person,
    Role,
    Session,
    SupportingFile,
    Vote,
)

from .legistar_content_parsers import all_parsers
from .scraper_utils import (
    IngestionModelScraper,
    reduced_list,
    sanitize_roles,
    str_simplified,
)
from .types import ContentURIs, LegistarContentParser, ScraperStaticData

###############################################################################

log = logging.getLogger(__name__)

###############################################################################

LEGISTAR_BASE = "http://webapi.legistar.com/v1/{client}"
LEGISTAR_VOTE_BASE = LEGISTAR_BASE + "/EventItems"
LEGISTAR_EVENT_BASE = LEGISTAR_BASE + "/Events"
LEGISTAR_MATTER_BASE = LEGISTAR_BASE + "/Matters"
LEGISTAR_PERSON_BASE = LEGISTAR_BASE + "/Persons"
LEGISTAR_BODY_BASE = LEGISTAR_BASE + "/Bodies"

# e.g. Session.video_uri =  EventVideoPath from legistar api
LEGISTAR_SESSION_VIDEO_URI = "EventVideoPath"
LEGISTAR_EV_MINUTE_DECISION = "EventItemPassedFlagName"
# NOTE: EventItemAgendaSequence is also a candidate for this
LEGISTAR_EV_INDEX = "EventItemMinutesSequence"
LEGISTAR_PERSON_EMAIL = "PersonEmail"
LEGISTAR_PERSON_EXT_ID = "PersonId"
LEGISTAR_PERSON_NAME = "PersonFullName"
LEGISTAR_PERSON_PHONE = "PersonPhone"
LEGISTAR_PERSON_WEBSITE = "PersonWWW"
LEGISTAR_PERSON_ACTIVE = "PersonActiveFlag"
LEGISTAR_PERSON_ROLES = "OfficeRecordInfo"
LEGISTAR_BODY_NAME = "BodyName"
LEGISTAR_BODY_EXT_ID = "BodyId"
LEGISTAR_BODY_ACTIVE = "BodyActiveFlag"
LEGISTAR_VOTE_DECISION = "VoteResult"
LEGISTAR_VOTE_EXT_ID = "VoteId"
LEGISTAR_FILE_EXT_ID = "MatterAttachmentId"
LEGISTAR_FILE_NAME = "MatterAttachmentName"
LEGISTAR_FILE_URI = "MatterAttachmentHyperlink"
LEGISTAR_MATTER_EXT_ID = "EventItemMatterId"
LEGISTAR_MATTER_TITLE = "EventItemMatterFile"
LEGISTAR_MATTER_NAME = "EventItemMatterName"
LEGISTAR_MATTER_TYPE = "EventItemMatterType"
LEGISTAR_MATTER_STATUS = "EventItemMatterStatus"
LEGISTAR_MATTER_SPONSORS = "MatterSponsorInfo"
LEGISTAR_SPONSOR_PERSON = "SponsorPersonInfo"
# Session.session_datetime is a combo of EventDate and EventTime
# TODO: this means same time for all Sessions in a EventIngestionModel.
#       some other legistar api data that can be used instead
LEGISTAR_SESSION_DATE = "EventDate"
LEGISTAR_SESSION_TIME = "EventTime"
LEGISTAR_AGENDA_URI = "EventAgendaFile"
LEGISTAR_MINUTES_URI = "EventMinutesFile"
LEGISTAR_MINUTE_EXT_ID = "EventItemId"
LEGISTAR_MINUTE_NAME = "EventItemTitle"
LEGISTAR_VOTE_VAL_ID = "VoteValueId"
LEGISTAR_VOTE_VAL_NAME = "VoteValueName"
LEGISTAR_ROLE_BODY = "OfficeRecordBodyInfo"
LEGISTAR_ROLE_BODY_ALT = "OfficeRecordBodyName"
LEGISTAR_ROLE_START = "OfficeRecordStartDate"
LEGISTAR_ROLE_END = "OfficeRecordEndDate"
LEGISTAR_ROLE_EXT_ID = "OfficeRecordId"
LEGISTAR_ROLE_TITLE = "OfficeRecordTitle"
LEGISTAR_ROLE_TITLE_ALT = "OfficeRecordMemberType"

LEGISTAR_EV_ITEMS = "EventItems"
LEGISTAR_EV_ATTACHMENTS = "EventItemMatterAttachments"
LEGISTAR_EV_VOTES = "EventItemVoteInfo"
LEGISTAR_VOTE_PERSONS = "PersonInfo"
LEGISTAR_EV_SITE_URL = "EventInSiteURL"
LEGISTAR_EV_EXT_ID = "EventId"
LEGISTAR_EV_BODY = "EventBodyInfo"

LEGISTAR_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
###############################################################################


known_legistar_persons: dict[int, dict[str, Any]] = {}
known_legistar_bodies: dict[int, dict[str, Any]] = {}
# video web page parser type per municipality
video_page_parser: dict[str, LegistarContentParser] = {}


[docs] def get_legistar_body( client: str, body_id: int, use_cache: bool = False, ) -> dict[str, Any] | None: """ Return information for a single legistar body in JSON. Parameters ---------- client: str Which legistar client to target. Ex: "seattle" body_id: int Unique ID for this body in the legistar municipality use_cache: bool True: Store result to prevent querying repeatedly for same body_id Returns ------- body: Dict[str, Any] legistar API body Notes ----- known_legistar_bodies cache is cleared for every LegistarScraper.get_events() call """ global known_legistar_bodies if use_cache: try: return known_legistar_bodies[body_id] except KeyError: # new body pass body_request_format = LEGISTAR_BODY_BASE + "/{body_id}" response = requests.get( body_request_format.format( client=client, body_id=body_id, ) ) if response.status_code == 200: body = response.json() else: body = None if use_cache: known_legistar_bodies[body_id] = body return body
[docs] def get_legistar_person( client: str, person_id: int, use_cache: bool = False, ) -> dict[str, Any] | None: """ Return information for a single legistar person in JSON. Parameters ---------- client: str Which legistar client to target. Ex: "seattle" person_id: int Unique ID for this person in the legistar municipality use_cache: bool True: Store result to prevent querying repeatedly for same person_id Returns ------- person: Dict[str, Any] legistar API person Notes ----- known_legistar_persons cache is cleared for every LegistarScraper.get_events() call """ global known_legistar_persons if use_cache: try: return known_legistar_persons[person_id] except KeyError: # new person pass person_request_format = LEGISTAR_PERSON_BASE + "/{person_id}" response = requests.get( person_request_format.format( client=client, person_id=person_id, ) ) if response.status_code != 200: if use_cache: known_legistar_persons[person_id] = None return None person = response.json() # all known OfficeRecords (roles) for this person response = requests.get( (person_request_format + "/OfficeRecords").format( client=client, person_id=person_id, ) ) if response.status_code != 200: person[LEGISTAR_PERSON_ROLES] = None if use_cache: known_legistar_persons[person_id] = person return person office_records: list[dict[str, Any]] = response.json() for record in office_records: # body for this role record[LEGISTAR_ROLE_BODY] = get_legistar_body( client=client, body_id=record["OfficeRecordBodyId"], use_cache=use_cache ) person[LEGISTAR_PERSON_ROLES] = office_records if use_cache: known_legistar_persons[person_id] = person return person
[docs] def get_legistar_events_for_timespan( client: str, begin: datetime | None = None, end: datetime | None = None, ) -> list[dict]: """ Get all legistar events and each events minutes items, people, and votes, for a client for a given timespan. Parameters ---------- client: str Which legistar client to target. Ex: "seattle" begin: Optional[datetime] The timespan beginning datetime to query for events after. Default: UTC now - 1 day end: Optional[datetime] The timespan end datetime to query for events before. Default: UTC now Returns ------- events: List[Dict] All legistar events that occur between the datetimes provided for the client provided. Additionally, requests and attaches agenda items, minutes items, any attachments, called "EventItems", requests votes for any of these "EventItems", and requests person information for any vote. """ # Set defaults if begin is None: begin = datetime.utcnow() - timedelta(days=1) if end is None: end = datetime.utcnow() # The unformatted request parts filter_datetime_format = "EventDate+{op}+datetime%27{dt}%27" request_format = LEGISTAR_EVENT_BASE + "?$filter={begin}+and+{end}" # a given person and/or body's information being updated # during the lifetime of this single call is miniscule. # use a cache to prevent 10s-100s of web requests # for the same person/body global known_legistar_persons, known_legistar_bodies # See Also # get_legistar_person() known_legistar_persons.clear() # See Also # get_legistar_body() known_legistar_bodies.clear() # Get response from formatted request log.debug(f"Querying Legistar for events between: {begin} - {end}") response = requests.get( request_format.format( client=client, begin=filter_datetime_format.format( op="ge", dt=str(begin).replace(" ", "T"), ), end=filter_datetime_format.format( op="lt", dt=str(end).replace(" ", "T"), ), ) ).json() # Get all event items for each event item_request_format = ( LEGISTAR_EVENT_BASE + "/{event_id}/EventItems?AgendaNote=1&MinutesNote=1&Attachments=1" ) for event in response: # Attach the Event Items to the event event["EventItems"] = requests.get( item_request_format.format(client=client, event_id=event["EventId"]) ).json() # Attach info for the body responsible for this event event[LEGISTAR_EV_BODY] = get_legistar_body( client=client, body_id=event["EventBodyId"], use_cache=True ) # Get vote information for event_item in event["EventItems"]: vote_request_format = LEGISTAR_VOTE_BASE + "/{event_item_id}/Votes" event_item["EventItemVoteInfo"] = requests.get( vote_request_format.format( client=client, event_item_id=event_item["EventItemId"], ) ).json() # Get person information for vote_info in event_item["EventItemVoteInfo"]: vote_info["PersonInfo"] = get_legistar_person( client=client, person_id=vote_info["VotePersonId"], use_cache=True, ) if ( not isinstance(event_item["EventItemMatterId"], int) or event_item["EventItemMatterId"] < 0 ): event_item[LEGISTAR_MATTER_SPONSORS] = None else: # this matter's sponsors sponsor_request_format = ( LEGISTAR_MATTER_BASE + "/{event_item_matter_id}/Sponsors" ) sponsors = requests.get( sponsor_request_format.format( client=client, event_item_matter_id=event_item["EventItemMatterId"], ) ).json() # legistar MatterSponsor just has a reference to a Person # so further obtain the actual Person information for sponsor in sponsors: sponsor[LEGISTAR_SPONSOR_PERSON] = get_legistar_person( client=client, person_id=sponsor["MatterSponsorNameId"], use_cache=True, ) event_item[LEGISTAR_MATTER_SPONSORS] = sponsors log.debug(f"Collected {len(response)} Legistar events") return response
[docs] class ContentUriScrapeResult(NamedTuple):
[docs] class Status(enum.IntEnum): """Status of content parsing.""" # Web page(s) are in unrecognized structure UnrecognizedPatternError = -1 # Error in accessing some resource ResourceAccessError = -2 # Video was not provided for the event ContentNotProvidedError = -3 # Found URIs to video and optional caption Ok = 0
status: Status uris: list[ContentURIs] | None = None
[docs] def parse_video_page_url(video_page_url: str, client: str) -> list[ContentURIs]: """ Return URLs for videos and captions from a Legistar/Granicus-hosted video web page. Parameters ---------- video_page_url: str The URL for the page of the legistar video client: str Which legistar client to target. Ex: "seattle" Returns ------- uris: Optional[List[ContentURIs]] URIs for video and optional caption """ with urlopen(video_page_url) as resp: # now load the page to get the actual video url soup = BeautifulSoup(resp.read(), "html.parser") if client in video_page_parser: # we already know which format parser to call uris = video_page_parser[client](client, soup) else: for parser in all_parsers: uris = parser(client, soup) if uris is not None: # remember so we just call this from here on video_page_parser[client] = parser log.debug(f"{parser} for {client}") break else: uris = None return uris
[docs] def get_legistar_content_uris(client: str, legistar_ev: dict) -> ContentUriScrapeResult: """ Return URLs for videos and captions from a Legistar/Granicus-hosted video web page. Parameters ---------- client: str Which legistar client to target. Ex: "seattle" legistar_ev: Dict Data for one Legistar Event. Returns ------- ContentUriScrapeResult status: ContentUriScrapeResult.Status Status code describing the scraping process. Use uris only if status is Ok uris: Optional[List[ContentURIs]] URIs for video and optional caption Raises ------ NotImplementedError Means the content structure of the web page hosting session video has changed. We need explicit review and update the scraping code. ConnectionError When the Legistar site (e.g. *.legistar.com) itself may be down. See Also -------- LegistarScraper.get_content_uris cdp_scrapers.legistar_content_parsers """ global video_page_parser # prefer video file path in legistar Event.EventVideoPath if legistar_ev[LEGISTAR_SESSION_VIDEO_URI]: return ( ContentUriScrapeResult.Status.Ok, [ ContentURIs( video_uri=str_simplified(legistar_ev[LEGISTAR_SESSION_VIDEO_URI]), caption_uri=None, ) ], ) if not legistar_ev[LEGISTAR_EV_SITE_URL]: return (ContentUriScrapeResult.Status.UnrecognizedPatternError, None) try: # a td tag with a certain id pattern. # this is usually something like # https://somewhere.legistar.com/MeetingDetail.aspx... # that is a summary-like page for a meeting with urlopen(legistar_ev[LEGISTAR_EV_SITE_URL]) as resp: soup = BeautifulSoup(resp.read(), "html.parser") if "server error" in soup.text.lower(): try: url_attrs = urlsplit(legistar_ev[LEGISTAR_EV_SITE_URL]) netloc = url_attrs.netloc except ValueError: netloc = legistar_ev[LEGISTAR_EV_SITE_URL] raise ConnectionError( f"{netloc} appears to be down: {str_simplified(soup.text)}" ) except (URLError, HTTPError) as e: log.debug(f"{legistar_ev[LEGISTAR_EV_SITE_URL]}: {str(e)}") return (ContentUriScrapeResult.Status.ResourceAccessError, None) # this gets us the url for the web PAGE containing the video # video link is provided in the window.open()command inside onclick event # <a id="ctl00_ContentPlaceHolder1_hypVideo" # data-event-id="75f1e143-6756-496f-911b-d3abe61d64a5" # data-running-text="In&amp;nbsp;progress" class="videolink" # onclick="window.open('Video.aspx? # Mode=Granicus&amp;ID1=8844&amp;G=D64&amp;Mode2=Video','video'); # return false;" # href="#" style="color:Blue;font-family:Tahoma;font-size:10pt;">Video</a> extract_url = soup.find( "a", id=re.compile(r"ct\S*_ContentPlaceHolder\S*_hypVideo"), class_="videolink", ) if extract_url is None: return (ContentUriScrapeResult.Status.UnrecognizedPatternError, None) # the <a> tag will not have this attribute if there is no video if "onclick" not in extract_url.attrs: return (ContentUriScrapeResult.Status.ContentNotProvidedError, None) # NOTE: after this point, failing to scrape video url should raise an exception. # we need to be alerted that we probabaly have a new web page structure. extract_url = extract_url["onclick"] start = extract_url.find("'") + len("'") end = extract_url.find("',") video_page_url = f"https://{client}.legistar.com/{extract_url[start:end]}" log.debug(f"{legistar_ev[LEGISTAR_EV_SITE_URL]} -> {video_page_url}") try: uris = parse_video_page_url(video_page_url, client) except HTTPError as e: log.debug(f"Error opening {video_page_url}:\n{str(e)}") return (ContentUriScrapeResult.Status.ResourceAccessError, None) if uris is None: raise NotImplementedError( "get_legistar_content_uris() needs attention. " f"Unrecognized video web page HTML structure: {video_page_url}" ) return (ContentUriScrapeResult.Status.Ok, uris)
[docs] class LegistarScraper(IngestionModelScraper): r""" Base class for transforming Legistar API data to CDP IngestionModel. If get_events() naively fails and raises an error, a given installation must define a derived class and implement the get_content_uris() function. Parameters ---------- client: str Legistar client name, e.g. "seattle" for Seattle, "kingcounty" for King County. timezone: str The timezone for the target client. i.e. "America/Los_Angeles" or "America/New_York" See https://en.wikipedia.org/wiki/List_of_tz_database_time_zones for canonical timezones. ignore_minutes_item_patterns: List[str] A list of string patterns or substrings to act as a minutes item filter. Any item in the provided list will be compiled as a regex string and any minute's item that contains the compiled pattern will be filtered out of the produced CDP minutes item list. Default: [] (do not filter any minutes items) vote_approve_pattern: str Regex pattern used to convert Legistar instance's votes in approval value to CDP constant value. Default: "approve|favor|yes" vote_abstain_pattern: str Regex pattern used to convert Legistar instance's abstension value to CDP constant value. Note, this is a pure abstension, not an "approval by abstention" or "rejection by abstension" value. Those should be places in vote_approve_pattern and vote_reject_pattern respectively. Default: "abstain|refuse|refrain" vote_reject_pattern: str Regex pattern used to convert Legistar instance's votes in rejection value to CDP constant value. Default: "reject|oppose|no" vote_absent_pattern: str Regex pattern used to convert Legistar instance's excused absense value to CDP constant value. Default: "absent" vote_nonvoting_pattern: str Regex pattern used to convert Legistar instance's non-voting value to CDP constant value. Default: "nv|(?:non.*voting)" matter_adopted_pattern: str Regex pattern used to convert Legistar instance's matter was adopted to CDP constant value. Default: "approved|confirmed|passed|adopted" matter_in_progess_pattern: str Regex pattern used to convert Legistar instance's matter is in-progress to CDP constant value. Default: "heard|ready|filed|held|(?:in\s*committee)" matter_rejected_pattern: str Regex pattern used to convert Legistar instance's matter was rejected to CDP constant value. Default: "rejected|dropped" minutes_item_decision_passed_pattern: str Regex pattern used to convert Legistar instance's minutes item passage to CDP constant value. Default: "pass" minutes_item_decision_failed_pattern: str Regex pattern used to convert Legistar instance's minutes item failure to CDP constant value. Default: "not|fail" static_data: Optional[ScraperStaticData] predefined Seats, Bodies and Persons used to provide more accurate Person.seat. person_aliases: Optional[Dict[str, Set[str]]] Dictionary used to catch name aliases and resolve improperly unique Persons to the one correct Person. Default: None role_replacements: Optional[Dict[str, str]] Dictionary used to replace role titles with CDP standard role titles. The keys should be titles you want to replace and the values should be a CDP standard role. Default: None See Also -------- cdp_scrapers.legistar_utils.LegistarScraper.get_content_uris cdp_scrapers.instances.seattle.SeattleScraper """
[docs] def __init__( self, client: str, timezone: str, ignore_minutes_item_patterns: list[str] | None = None, vote_approve_pattern: str = r"approve|favor|yes", vote_abstain_pattern: str = r"abstain|refuse|refrain", vote_reject_pattern: str = r"reject|oppose|no", vote_absent_pattern: str = r"absent", vote_nonvoting_pattern: str = r"nv|(?:non.*voting)", matter_adopted_pattern: str = ( r"approved|confirmed|passed|adopted|consent|(?:voted.*com+it+ee)" ), matter_in_progress_pattern: str = r"heard|read|filed|held|(?:in.*com+it+ee)", matter_rejected_pattern: str = r"rejected|dropped", minutes_item_decision_passed_pattern: str = r"pass", minutes_item_decision_failed_pattern: str = r"not|fail", static_data: ScraperStaticData | None = None, person_aliases: dict[str, set[str]] | None = None, role_replacements: dict[str, str] | None = None, ): super().__init__(timezone=timezone, person_aliases=person_aliases) if ignore_minutes_item_patterns is None: ignore_minutes_item_patterns = [] self.client_name: str = client self.ignore_minutes_item_patterns: list[str] = ignore_minutes_item_patterns # regex patterns used to infer cdp_backend.database.constants # from Legistar string fields self.vote_approve_pattern: str = vote_approve_pattern self.vote_abstain_pattern: str = vote_abstain_pattern self.vote_reject_pattern: str = vote_reject_pattern # TODO: need to debug these using real examples self.vote_absent_pattern: str = vote_absent_pattern self.vote_nonvoting_pattern: str = vote_nonvoting_pattern self.matter_adopted_pattern: str = matter_adopted_pattern self.matter_in_progress_pattern: str = matter_in_progress_pattern self.matter_rejected_patten: str = matter_rejected_pattern self.minutes_item_decision_passed_pattern: str = ( minutes_item_decision_passed_pattern ) self.minutes_item_decision_failed_pattern: str = ( minutes_item_decision_failed_pattern ) self.static_data = static_data self.role_replacements = role_replacements or {}
[docs] def get_matter_status(self, legistar_matter_status: str) -> str | None: """ Return appropriate MatterStatusDecision constant from EventItemMatterStatus. Parameters ---------- legistar_matter_status: str Legistar API EventItemMatterStatus. Returns ------- matter_status: Optional[str] A constant from CDP allowed matter status decisions. None if missing information or if matter status decision parameter patterns are not inclusive to the Legistar matter status value. See Also -------- cdp_backend.database.constants.MatterStatusDecision """ if not legistar_matter_status: return None if ( re.search( self.matter_adopted_pattern, legistar_matter_status, re.IGNORECASE ) is not None ): return MatterStatusDecision.ADOPTED if ( re.search( self.matter_in_progress_pattern, legistar_matter_status, re.IGNORECASE, ) is not None ): return MatterStatusDecision.IN_PROGRESS if ( re.search( self.matter_rejected_patten, legistar_matter_status, re.IGNORECASE ) is not None ): return MatterStatusDecision.REJECTED log.debug( "not able to decide MatterStatusDecision from " f"{legistar_matter_status}. consider updating matter_*_pattern" ) log.debug(f"no MatterStatusDecision filter for {legistar_matter_status}") return None
[docs] def get_minutes_item_decision( self, legistar_item_passed_name: str, ) -> str | None: """ Return appropriate EventMinutesItemDecision constant from EventItemPassedFlagName. Parameters ---------- legistar_item_passed_name: str Legistar API EventItemPassedFlagName Returns ------- emi_decision: Optional[str] A constant from CDP allowed minutes item decisions. None if missing information or if minutes item decision parameter patterns are no inclusive of the Legistar minutes item decision value. See Also -------- cdp_backend.database.constants.EventMinutesItemDecision """ if not legistar_item_passed_name: return None if ( re.search( self.minutes_item_decision_passed_pattern, legistar_item_passed_name, re.IGNORECASE, ) is not None ): return EventMinutesItemDecision.PASSED if ( re.search( self.minutes_item_decision_failed_pattern, legistar_item_passed_name, re.IGNORECASE, ) is not None ): return EventMinutesItemDecision.FAILED log.debug(f"no EventMinutesItemDecision filter for {legistar_item_passed_name}") return None
[docs] def get_vote_decision(self, legistar_vote: dict) -> str | None: # noqa: C901 """ Return appropriate VoteDecision constant based on Legistar Vote. Parameters ---------- legistar_vote: Dict Legistar API Vote Returns ------- vote_decision: Optional[str] A constant from CDP allowed vote decisions. None if missing vote information or if vote decision parameter patterns are not inclusive of the Legistar vote value. See Also -------- cdp_backend.database.constants.VoteDecision """ vote_value = legistar_vote[LEGISTAR_VOTE_VAL_NAME] # don't want to make assumption about VoteValueId = 0 meaning here # so treating VoteValueId as null only when None if not vote_value and legistar_vote[LEGISTAR_VOTE_VAL_ID] is None: return None # NOTE: The required integer VoteValueId = 16 seems to be "in favor". # But don't know what other values would be e.g. "opposed to", etc. # Therefore deciding VoteDecision based on the string VoteValueName. decision = None if ( re.search( self.vote_approve_pattern, vote_value, re.IGNORECASE, ) is not None ): decision = VoteDecision.APPROVE elif ( re.search( self.vote_reject_pattern, vote_value, re.IGNORECASE, ) is not None ): decision = VoteDecision.REJECT nonvoting = ( re.search( self.vote_nonvoting_pattern, vote_value, re.IGNORECASE, ) is not None ) # determine qualifer like absent, abstain if ( re.search( self.vote_absent_pattern, vote_value, re.IGNORECASE, ) is not None ): if decision == VoteDecision.APPROVE: return VoteDecision.ABSENT_APPROVE elif decision == VoteDecision.REJECT: return VoteDecision.ABSENT_REJECT elif nonvoting: return VoteDecision.ABSENT_NON_VOTING elif ( re.search( self.vote_abstain_pattern, vote_value, re.IGNORECASE, ) is not None ): if decision == VoteDecision.APPROVE: return VoteDecision.ABSTAIN_APPROVE elif decision == VoteDecision.REJECT: return VoteDecision.ABSTAIN_REJECT elif nonvoting: return VoteDecision.ABSTAIN_NON_VOTING if not decision: log.debug(f"no VoteDecision filter for {vote_value}") return decision
[docs] def get_body(self, legistar_body: dict[str, Any]) -> Body | None: """ Return CDP Body for Legistar body. Parameters ---------- legistar_body: Dict Legistar API body Returns ------- body: Optional[body] The Legistar body converted to a CDP body ingestion model. None if missing required information. See Also -------- get_legistar_body """ if not legistar_body: return None return self.get_none_if_empty( Body( external_source_id=str(legistar_body[LEGISTAR_BODY_EXT_ID]), is_active=bool(legistar_body[LEGISTAR_BODY_ACTIVE]), name=str_simplified(legistar_body[LEGISTAR_BODY_NAME]), ) )
[docs] def use_or_replace_role(self, role_title: str) -> str: """ Lookup if the provided role title should be replaced with a CDP standard value. If the provided role title should be replaced, then return the proper replacement title, otherwise if the title wasn't found in the role replacement lookup table, return the provided role_title unchanged. Parameters ---------- role_title: str The role title to check and potentially replace with a CDP standard. Returns ------- role_title: str The original role title if no replacement was found in the role replacements lookup-table, or the CDP standard title swapped from the lookup-table. """ if role_title in self.role_replacements: return self.role_replacements[role_title] return role_title
[docs] def get_roles( self, legistar_office_records: list[dict[str, Any]] ) -> list[Role] | None: """ Return list of CDP Role from list of legistar OfficeRecord. Parameters ---------- legistar_office_records: List[Dict] Legistar API OfficeRecords Returns ------- roles: Optional[List[Role]] From Legistar OfficeRecords. None if missing information. """ if not legistar_office_records: legistar_office_records = [] return reduced_list( [ self.get_none_if_empty( Role( body=( self.get_body(record[LEGISTAR_ROLE_BODY]) or self.get_none_if_empty( Body( name=str_simplified(record[LEGISTAR_ROLE_BODY_ALT]), ) ) ), # e.g. 2017-11-30T00:00:00 start_datetime=self.localize_datetime( datetime.strptime( record[LEGISTAR_ROLE_START], LEGISTAR_DATETIME_FORMAT, ) ), end_datetime=self.localize_datetime( datetime.strptime( record[LEGISTAR_ROLE_END], LEGISTAR_DATETIME_FORMAT ) ), external_source_id=str(record[LEGISTAR_ROLE_EXT_ID]), title=self.use_or_replace_role( str_simplified(record[LEGISTAR_ROLE_TITLE]) or str_simplified(record[LEGISTAR_ROLE_TITLE_ALT]) ), ) ) for record in legistar_office_records ] )
[docs] def resolve_person_alias(self, person: Person) -> Person | None: """ If input person is in fact an alias of a reference known person, return the reference person instead. Else return person as-is. Parameters ---------- person: Person Person to check whether is an alias or a real unique Person Returns ------- Person input person, or the correct reference Person if input person is an alias. See Also -------- instances.seattle.person_aliases """ # nothing to do if the input person is a reference person itself if not self.person_aliases or person.name in self.person_aliases: return person request_format = ( LEGISTAR_PERSON_BASE + "?$filter=PersonFullName+eq+%27{name}%27" ) for name, aliases in self.person_aliases.items(): if person.name in aliases: # found the reference person with input person.name as an alias try: # query to get PersonId for the reference person we want to use # in place of the input person response: list[dict[str, Any]] = requests.get( request_format.format( client=self.client_name, name=quote_plus(name) ), ).json() except JSONDecodeError: response: list[dict[str, Any]] = [] if len(response) == 0 or LEGISTAR_PERSON_EXT_ID not in response[0]: log.error( f"Found {person.name}, an alias of {name} " f"but failed get valid JSON for {name} from Legistar API. " f"Keeping this alias {person.name} without resolving." ) return person return self.get_person( get_legistar_person( self.client_name, response[0][LEGISTAR_PERSON_EXT_ID], use_cache=True, ) ) # input person is not an alias of a reference Person return person
[docs] def get_person(self, legistar_person: dict) -> Person | None: """ Return CDP Person for Legistar Person. Parameters ---------- legistar_person: Dict Legistar API Person Returns ------- person: Optional[Person] The Legistar Person converted to a CDP person ingestion model. None if missing information. See Also -------- get_legistar_person """ if ( not legistar_person or not legistar_person[LEGISTAR_PERSON_NAME] # have seen PersonFullName with something like "no sponsor required" or re.search("no.*required", legistar_person[LEGISTAR_PERSON_NAME], re.I) ): return None phone = str_simplified(legistar_person[LEGISTAR_PERSON_PHONE]) if phone: # (123)456... -> 123-456... phone = phone.replace("(", "").replace(")", "-") return self.get_none_if_empty( # If applicable, catch [mistakenly] entered duplicate persons. # i.e. Don't create unique Person objects for the same real person. self.resolve_person_alias( Person( email=str_simplified(legistar_person[LEGISTAR_PERSON_EMAIL]), external_source_id=str(legistar_person[LEGISTAR_PERSON_EXT_ID]), name=str_simplified(legistar_person[LEGISTAR_PERSON_NAME]), phone=phone, website=str_simplified(legistar_person[LEGISTAR_PERSON_WEBSITE]), is_active=bool(legistar_person[LEGISTAR_PERSON_ACTIVE]), ) ) )
[docs] def get_votes(self, legistar_votes: list[dict]) -> list[Vote] | None: """ Return List[Vote] for Legistar API Votes. Parameters ---------- legistar_votes: List[Dict] Legistar votes as CDP Vote ingestion models. Returns ------- votes: Optional[List[Vote]] List of votes if any were provided. None if empty list or missing information. """ return reduced_list( [ self.get_none_if_empty( Vote( decision=self.get_vote_decision(vote), external_source_id=str(vote[LEGISTAR_VOTE_EXT_ID]), person=self.get_person(vote[LEGISTAR_VOTE_PERSONS]), ) ) for vote in legistar_votes ] )
[docs] def get_event_supporting_files( self, legistar_ev_attachments: list[dict], ) -> list[SupportingFile] | None: """ Return List[SupportingFile] for Legistar API MatterAttachments. Parameters ---------- legistar_ev_attachments: List[Dict] Legistar API MatterAttachments Returns ------- files: Optional[List[SupportingFile]] List of supporting files if provided. None if empty list or missing information. """ return reduced_list( [ self.get_none_if_empty( SupportingFile( external_source_id=str(attachment[LEGISTAR_FILE_EXT_ID]), name=str_simplified(attachment[LEGISTAR_FILE_NAME]), uri=str_simplified(attachment[LEGISTAR_FILE_URI]), ) ) for attachment in legistar_ev_attachments ] )
[docs] def get_sponsors(self, legistar_sponsors: list[dict]) -> list[Person] | None: """Get legislation sponsors.""" if not legistar_sponsors: return None return reduced_list( [ self.get_person(sponsor["SponsorPersonInfo"]) for sponsor in legistar_sponsors ] )
[docs] def get_matter(self, legistar_ev: dict) -> Matter | None: """ Return Matter from Legistar API EventItem. Parameters ---------- legistar_ev: Dict Legistar API EventItem Returns ------- matter: Optional[Matter] List of converted Legistar matter details to CDP matter objects. None if missing information. """ return self.get_none_if_empty( Matter( external_source_id=str(legistar_ev[LEGISTAR_MATTER_EXT_ID]), # Too often EventItemMatterName is not filled # but EventItemMatterFile is name=str_simplified(legistar_ev[LEGISTAR_MATTER_NAME]) or str_simplified(legistar_ev[LEGISTAR_MATTER_TITLE]), matter_type=str_simplified(legistar_ev[LEGISTAR_MATTER_TYPE]), sponsors=self.get_sponsors(legistar_ev[LEGISTAR_MATTER_SPONSORS]), title=str_simplified(legistar_ev[LEGISTAR_MATTER_TITLE]), result_status=self.get_matter_status( legistar_ev[LEGISTAR_MATTER_STATUS] ), ) )
[docs] def get_minutes_item(self, legistar_ev_item: dict) -> MinutesItem | None: """ Return MinutesItem from parts of Legistar API EventItem. Parameters ---------- legistar_ev_item: Dict Legistar API EventItem Returns ------- minutes_item: Optional[MinutesItem] None if could not get nonempty MinutesItem.name from EventItem. """ return self.get_none_if_empty( MinutesItem( external_source_id=str(legistar_ev_item[LEGISTAR_MINUTE_EXT_ID]), name=str_simplified(legistar_ev_item[LEGISTAR_MINUTE_NAME]), ) )
[docs] def fix_event_minutes( self, ev_minutes_item: EventMinutesItem | None, legistar_ev_item: dict ) -> EventMinutesItem | None: """ Inspect the MinutesItem and Matter in ev_minutes_item. - Move some fields between them to make the information more meaningful. - Enforce matter.result_status when appropriate. Parameters ---------- ev_minutes_item: Optional[EventMinutesItem] The specific event minutes item to clean. Or None if running this function in a loop with multiple event minutes items and you don't want to clean / the emi was filtered out. legistar_ev_item: Dict The original Legistar EventItem. Returns ------- cleaned_emi: Optional[EventMinutesItem] The cleaned event minutes item. This can clean both the event minutes item and the attached matter information. """ if not ev_minutes_item: return ev_minutes_item if ev_minutes_item.minutes_item and ev_minutes_item.matter: # we have both matter and minutes_item # - make minutes_item.name the more concise text e.g. "CB 11111" # - make minutes_item.description the more descriptive lengthy text # e.g. "AN ORDINANCE related to the..." # - make matter.title the same descriptive lengthy text ev_minutes_item.minutes_item.description = ev_minutes_item.minutes_item.name ev_minutes_item.minutes_item.name = ev_minutes_item.matter.name ev_minutes_item.matter.title = ev_minutes_item.minutes_item.description # matter.result_status is allowed to be null # only when no votes or Legistar EventItemMatterStatus is null if ev_minutes_item.matter and not ev_minutes_item.matter.result_status: if ev_minutes_item.votes and legistar_ev_item[LEGISTAR_MATTER_STATUS]: # means did not find matter_*_pattern in Legistar EventItemMatterStatus. # default to in progress (as opposed to adopted or rejected) # NOTE: if our matter_*_patterns ARE "complete", # this clause would hit only because the info from Legistar # is incomplete or malformed ev_minutes_item.matter.result_status = MatterStatusDecision.IN_PROGRESS return ev_minutes_item
[docs] def filter_event_minutes( self, ev_minutes_item: EventMinutesItem ) -> EventMinutesItem | None: """ Return None if minutes_item.name contains unimportant text that we want to ignore. Parameters ---------- ev_minutes_item: EventMinutesItem The minutes item to filter. Returns ------- filtered_event_minutes_items: Optional[EventMinutesItem] The allowed minutes item or None is filtered out. """ if not ev_minutes_item.minutes_item or not ev_minutes_item.minutes_item.name: return ev_minutes_item for filter_ in self.ignore_minutes_item_patterns: if re.search(filter_, ev_minutes_item.minutes_item.name, re.IGNORECASE): return None return ev_minutes_item
[docs] def get_event_minutes( self, legistar_ev_items: list[dict] ) -> list[EventMinutesItem] | None: """ Return List[EventMinutesItem] for Legistar API EventItems. Parameters ---------- legistar_ev_items: List[Dict] Legistar API EventItems Returns ------- event_minutes_items: Optional[List[EventMinutesItem]] Filtered set of event minutes items. """ return reduced_list( [ self.get_none_if_empty( self.fix_event_minutes( # if minutes_item contains unimportant data, # just make the entire EventMinutesItem = None self.filter_event_minutes( EventMinutesItem( index=item[LEGISTAR_EV_INDEX], minutes_item=self.get_minutes_item(item), votes=self.get_votes(item[LEGISTAR_EV_VOTES]), matter=self.get_matter(item), decision=self.get_minutes_item_decision( item[LEGISTAR_EV_MINUTE_DECISION] ), supporting_files=self.get_event_supporting_files( item[LEGISTAR_EV_ATTACHMENTS] ), ) ), item, ) ) # EventMinutesItem object per member in EventItems for item in legistar_ev_items ] )
[docs] @staticmethod def date_and_time_to_datetime(ev_date: str, ev_time: str | None) -> datetime: """ Return datetime from ev_date and ev_time. Parameters ---------- ev_date: str Formatted as "%Y-%m-%dT%H:%M:%S" ev_time: Optional[str] Formatted as "%I:%M %p" Or None and do not attach time to date. Returns ------- datetime date using ev_date and time using ev_time """ # 2021-07-09T00:00:00 d = datetime.fromisoformat(ev_date) # 9:30 AM # some events may have ev_time =None if ev_time is not None: t = datetime.strptime(ev_time, "%I:%M %p") return datetime( year=d.year, month=d.month, day=d.day, hour=t.hour, minute=t.minute, second=t.second, ) else: return datetime( year=d.year, month=d.month, day=d.day, hour=0, minute=0, second=0, )
[docs] def get_content_uris(self, legistar_ev: dict) -> list[ContentURIs]: """ Must implement in class derived from LegistarScraper. If Legistar Event.EventVideoPath is used, return an empty list in the override. Parameters ---------- legistar_ev: Dict Data for one Legistar Event. Returns ------- event_content_uris: List[ContentURIs] List of ContentURIs objects for each session found. Raises ------ NotImplementedError This base implementation does nothing See Also -------- cdp_scrapers.legistar_utils.get_legistar_events_for_timespan """ # see if our base legistar/granicus video parsing routine will work result, uris = get_legistar_content_uris(self.client_name, legistar_ev) if result != ContentUriScrapeResult.Status.UnrecognizedPatternError: return uris or [] raise NotImplementedError( f"Please provide get_content_uris() for {self.client_name}" )
[docs] def inject_known_person(self, person: Person) -> Person: """ Inject information if person exists in static_data.persons. Parameters ---------- person: Person Person into which to inject data from static_data Returns ------- Person Input person updated with information from static_data, and seat.roles sanitized. See Also -------- scraper_utils.sanitize_roles """ try: known_person = self.static_data.persons[person.name] except (AttributeError, KeyError): return person for attr in person.__dataclass_fields__.keys(): static_info = getattr(known_person, attr) if static_info is not None: # have long-term information provided in "static*.json" setattr(person, attr, deepcopy(static_info)) # now that we have seat from static hard-coded data # we can bring in seat.roles (OfficeRecords from Legistar API) if person.seat is not None: person.seat.roles = sanitize_roles( person_name=person.name, roles=self.get_roles( legistar_office_records=get_legistar_person( client=self.client_name, person_id=person.external_source_id, use_cache=True, )[LEGISTAR_PERSON_ROLES] ), static_data=self.static_data, ) return person
[docs] def inject_known_data( self, events: list[EventIngestionModel] ) -> list[EventIngestionModel]: """ Augment with long-term static data that changes very infrequently. e.e. self.static_data which includes Person.picture_uri, Person.seat. Parameters ---------- events: Returned events from get_events() Returns ------- events: List[EventIngestionModel] Input events with static information possibly injected """ # don't waste time if we don't have any info at all if not self.static_data: return events for event in events: if not event.event_minutes_items: continue # 2 places for Person: # EventMinutesItem.matter.sponsors # EventMinutesItem.votes.person for minute_item in event.event_minutes_items: if minute_item.matter and minute_item.matter.sponsors: for sponsor in minute_item.matter.sponsors: sponsor = self.inject_known_person(sponsor) if minute_item.votes: for vote in minute_item.votes: vote.person = self.inject_known_person(vote.person) return events
[docs] def post_process_ingestion_models( self, events: list[EventIngestionModel] ) -> list[EventIngestionModel]: """ Called at the end of get_events() for fully custom site-specific prcessing. inject_known_data() already operated on input events. Parameters ---------- events: Returned events from get_events() Returns ------- events: List[EventIngestionModel] Base implementation simply returns input events as-is """ return events
[docs] def get_events( self, begin: datetime | None = None, end: datetime | None = None, ) -> list[EventIngestionModel]: """ Calls get_legistar_events_for_timespan to retrieve Legistar API data and return as List[EventIngestionModel]. Parameters ---------- begin: datetime, optional The timespan beginning datetime to query for events after. Default is 2 days from UTC now end: datetime, optional The timespan end datetime to query for events before. Default is UTC now Returns ------- events: List[EventIngestionModel] One instance of EventIngestionModel per Legistar Event See Also -------- cdp_scrapers.legistar_utils.get_legistar_events_for_timespan """ if begin is None: begin = datetime.utcnow() - timedelta(days=2) if end is None: end = datetime.utcnow() ingestion_models = [] for legistar_ev in get_legistar_events_for_timespan( self.client_name, begin=begin, end=end ): # better to return time as local time with time zone info, # rather than as utc time. # this way the calling pipeline can find out what is the local zone. session_time = self.localize_datetime( self.date_and_time_to_datetime( legistar_ev[LEGISTAR_SESSION_DATE], legistar_ev[LEGISTAR_SESSION_TIME], ) ) list_uri = self.get_content_uris(legistar_ev) or [ ContentURIs(video_uri=None, caption_uri=None) ] ingestion_models.append( self.get_none_if_empty( EventIngestionModel( external_source_id=str(legistar_ev[LEGISTAR_EV_EXT_ID]), agenda_uri=str_simplified(legistar_ev[LEGISTAR_AGENDA_URI]), minutes_uri=str_simplified(legistar_ev[LEGISTAR_MINUTES_URI]), body=self.get_body(legistar_ev[LEGISTAR_EV_BODY]), sessions=reduced_list( [ self.get_none_if_empty( Session( session_datetime=session_time, session_index=list_uri.index(content_uris), video_uri=content_uris.video_uri, caption_uri=content_uris.caption_uri, ) ) # Session per video for content_uris in list_uri ] ), event_minutes_items=self.get_event_minutes( legistar_ev[LEGISTAR_EV_ITEMS] ), ) ) ) # easier for calling pipeline to handle an empty list rather than None # so request reduced_list() to give me [], not None events = reduced_list(ingestion_models, collapse=False) events = self.inject_known_data(events) events = self.post_process_ingestion_models(events) return events
@property def is_legistar_compatible(self) -> bool: """ Check that Legistar API recognizes client name. Returns ------- compatible: bool True if client_name is a valid Legistar client name """ # simplest check, if the GET request works, it is a legistar municipality try: resp = urlopen(f"http://webapi.legistar.com/v1/{self.client_name}/bodies") return resp.status == 200 except URLError or HTTPError: return False
[docs] def check_for_cdp_min_ingestion(self, check_days: int = 7) -> bool: """ Test if can obtain at least one minimally defined EventIngestionModel. Parameters ---------- check_days: int, default=7 Test duration is the past check_days days from now Returns ------- minimum_ingestion_data_available: bool True if got at least one minimally defined EventIngestionModel """ # no point wasting time if the client isn't on legistar at all if not self.is_legistar_compatible: return False now = datetime.utcnow() days = range(check_days) for d in days: begin = now - timedelta(days=d + 1) end = now - timedelta(days=d) log.debug( "Testing for minimal information " f"from {begin.isoformat()} to {end.isoformat()}" ) # ev: EventIngestionModel for cdp_ev in self.get_events(begin=begin, end=end): try: if ( len(cdp_ev.body.name) > 0 and cdp_ev.sessions[0].session_datetime is not None and len(cdp_ev.sessions[0].video_uri) > 0 ): session_time = cdp_ev.sessions[0].session_datetime log.debug( f"Got minimal EventIngestionModel for {self.client_name}: " f"body={cdp_ev.body.name}, " f"session={session_time.isoformat()}, " f"video={cdp_ev.sessions[0].video_uri}" ) return True # catch None or empty list except TypeError or IndexError: pass log.debug( f"Failed to get minimal EventIngestionModel for {self.client_name} " f"in the past {check_days} days from {now.isoformat()}" ) # no event in check_days had enough for minimal ingestion model item return False