Source code for cdp_scrapers.instances.atlanta

import logging
import re
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Tuple

from cdp_backend.database import constants as db_constants
from cdp_backend.pipeline import ingestion_models

if TYPE_CHECKING:
    from selenium.webdriver.chrome.webdriver import WebDriver

log = logging.getLogger(__name__)


[docs] def get_single_person(driver: "WebDriver", member_name: str) -> ingestion_models.Person: """ Get all the information fot one person Includes: role, seat, picture, phone, email. Parameters ---------- driver: webdriver calling the people's dictionary page member_name: person's name Returns ------- ingestion_models the ingestion model for the person's part """ import selenium from selenium.webdriver.common.by import By log.info("start get active person ingestion model") seat_role = driver.find_element(By.CLASS_NAME, "titlewidget-subtitle").text member_role = db_constants.RoleTitle.MEMBER member_seat_area = None member_seat_name = db_constants.RoleTitle.COUNCILPRESIDENT if "President" in seat_role: member_role = db_constants.RoleTitle.COUNCILPRESIDENT member_seat_area = "Citywide" elif "Post" in seat_role: name_list = seat_role.split(" ") member_seat_name = "Post " + name_list[1] member_seat_area = "Citywide" else: member_seat_name = seat_role member_pic = driver.find_element( By.CSS_SELECTOR, ".image_widget img" ).get_attribute("src") temp_email = ( driver.find_element(By.XPATH, "// a[contains(text(),'Click Here')]") .get_attribute("href") .split(":") ) member_email = temp_email[1] try: member_details = driver.find_element( By.XPATH, "//*[contains(@id, 'widget_340_')]" ).text except selenium.common.exceptions.NoSuchElementException: member_details = driver.find_element( By.XPATH, "//*[contains(@id, 'widget_437_')]" ).text detail_str = member_details.split("\n") phone_list = [s for s in detail_str if "P" in s] member_phone = phone_list[0].split(": ")[1] return ingestion_models.Person( name=member_name, is_active=True, email=member_email, phone=member_phone, picture_uri=member_pic, seat=ingestion_models.Seat( name=member_seat_name, electoral_area=member_seat_area, roles=[ingestion_models.Role(title=member_role)], ), )
[docs] def get_person() -> dict: """ Put the informtion get by get_single_person() to dictionary. Returns ------- dictionary key: person's name value: person's ingestion model """ from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager chrome_options = Options() chrome_options.add_argument("--headless") driver = webdriver.Chrome( options=chrome_options, service=Service(ChromeDriverManager().install()) ) log.info("start get all the person ingestion model") driver.get("https://citycouncil.atlantaga.gov/council-members") members = driver.find_elements(By.XPATH, '//*[@id="leftNav_2_0_12"]/ul/li') person_dict = {} for member in members: link = member.find_element(By.TAG_NAME, "a").get_attribute("href") driver = webdriver.Chrome( options=chrome_options, service=Service(ChromeDriverManager().install()) ) driver.get(link) member_name = driver.find_element(By.CLASS_NAME, "titlewidget-title").text if "President" in member_name: member_name = member_name.split("President ")[1] else: current_name = re.match( r"([a-zA-Z]+)((\ {0,1}[a-zA-Z]+\.{0,1}\ )|(\ ))([a-zA-Z]+)", member_name ) if current_name is not None: member_name = f"{current_name.group(1)} {current_name.group(5)}" else: raise ValueError("Person name could not be constructed.") member_model = get_single_person(driver, member_name) driver.quit() person_dict[member_name] = member_model driver.quit() return person_dict
[docs] def get_new_person(name: str) -> ingestion_models.Person: """ Creates the person ingestion model for the people that are not recored. Parameters ---------- name:str the name of the person Returns ------- ingestion model the person ingestion model for the newly appeared person """ log.info("start get inactive person ingestion model") return ingestion_models.Person(name=name, is_active=False)
[docs] def convert_status_constant(decision: str) -> str: """ Converts the matter result status to the exsiting constants. Parameters ---------- decision: str decision of the matter Returns ------- db_constants result status constants """ log.info("start convert result status for vote ingestion model") d_constant = decision if ("FAVORABLE" in decision) or ("ADOPTED" in decision) or ("ACCEPTED" in decision): d_constant = db_constants.MatterStatusDecision.ADOPTED elif ( ("REFERRED" in decision) or ("RETURNED" in decision) or ("FILED" in decision) or ("Refer") or ("/" in decision) ): d_constant = db_constants.MatterStatusDecision.IN_PROGRESS else: raise ValueError("New Type") return d_constant
[docs] def assign_constant( driver: "WebDriver", i: int, j: int, vote_decision: str, voting_list: list, body_name: str, persons: dict, ): """ Assign constants and add Vote to the ingestion models based on the vote decision. Parameters ---------- driver:webdriver webdriver of the matter page i: int tr[i] is the matter we are looking at j: int the row number of the information in a matter that we are looking at vote_decision: str the vote decision constant of the vote decision voting_list: list the list that contains vote ingestion models body_name: str the body name of the current meeting persons: dict Dict[str, ingestion_models.Person] """ from selenium.webdriver.common.by import By log.info("start get vote ingestion model for one type of decision") v_res = driver.find_element( By.XPATH, '//*[@id="ContentPlaceHolder1_divHistory"]/div/table/tbody/tr[' + str(i + 1) + "]/td/table/tbody/tr[" + str(j) + "]/td[2]", ).text res_list = v_res.split(", ") n: str = "" for p in res_list: if "President" in p: n = p.split("President ")[1] else: n_temp = re.match( r"([a-zA-Z]+)((\ {0,1}[a-zA-Z]+\.{0,1}\ )|(\ ))([a-zA-Z]+)", p ) if n_temp is not None: n = f"{n_temp.group(1)} {n_temp.group(5)}" else: raise ValueError("Person name could not be constructed.") person = get_new_person(n) if n in persons: person = persons[n] if person.seat is not None: if person.seat.roles is not None: person.seat.roles[0].body = ingestion_models.Body( body_name, is_active=True ) if body_name == "City Council": if ( person.seat.roles[0].title != db_constants.RoleTitle.COUNCILPRESIDENT ): person.seat.roles[ 0 ].title = db_constants.RoleTitle.COUNCILMEMBER voting_list.append( ingestion_models.Vote( person=person, decision=vote_decision, ) ) return voting_list
[docs] def get_voting_result( driver: "WebDriver", sub_sections_len: int, i: int, body_name: str, persons: dict, ) -> list: """ Scrapes and converts the voting decisions to the exsiting constants. Parameters ---------- driver:webdriver webdriver of the matter page sub_sections_len: int the row number in the block under the matter for the current date i: int tr[i] is the matter we are looking at body_name: str the body name of the current meeting persons: dict Dict[str, ingestion_models.Person] Returns ------- list contains the Vote ingestion model for each person """ from selenium.webdriver.common.by import By log.info("start get the vote ingestion model for a matter") voting_list: list[ingestion_models.Vote] = [] for j in range(1, sub_sections_len + 1): sub_content = driver.find_element( By.XPATH, '//*[@id="ContentPlaceHolder1_divHistory"]/div/table/tbody/tr[' + str(i + 1) + "]/td/table/tbody/tr[" + str(j) + "]", ) sub_content_role = sub_content.find_element(By.CLASS_NAME, "Role").text if "AYES" in sub_content_role: vote_decision = db_constants.VoteDecision.APPROVE assign_constant( driver, i, j, vote_decision, voting_list, body_name, persons ) if "NAYS" in sub_content_role: vote_decision = db_constants.VoteDecision.REJECT assign_constant( driver, i, j, vote_decision, voting_list, body_name, persons ) if ( "ABSENT" in sub_content_role or "AWAY" in sub_content_role or "EXCUSED" in sub_content_role ): vote_decision = db_constants.VoteDecision.ABSENT_NON_VOTING assign_constant( driver, i, j, vote_decision, voting_list, body_name, persons ) if "ABSTAIN" in sub_content_role: vote_decision = db_constants.VoteDecision.ABSTAIN_NON_VOTING assign_constant( driver, i, j, vote_decision, voting_list, body_name, persons ) return voting_list
[docs] def get_matter_status(driver: "WebDriver", i: int) -> Tuple[list, str]: """ Find the matter result status. Parameters ---------- driver:webdriver webdriver of the matter page i: int tracker used to loop the rows in the matter page Returns ------- sub_sections: element the block under the matter for the current date decision_constant: element the matter decision constant """ from selenium.webdriver.common.by import By log.info("start get reslut status for a matter") result = driver.find_element( By.XPATH, '//*[@id="ContentPlaceHolder1_divHistory"]/div/table/tbody/tr[' + str(i + 1) + "]/td/table", ) decision = result.find_element(By.CLASS_NAME, "Result").text sub_sections = result.find_elements( By.XPATH, '//*[@id="ContentPlaceHolder1_divHistory"]/div/table/tbody/tr[' + str(i + 1) + "]/td/table/tbody/tr", ) status_constant = convert_status_constant(decision) return sub_sections, status_constant
[docs] def parse_single_matter( # noqa: C901 D417 driver: "WebDriver", test: str, item: str, body_name: str, s_word_formated: datetime, persons: dict, ) -> ingestion_models.EventMinutesItem: """ Get the minute items that contains a matter. Parameters ---------- driver:webdriver webdriver of the matter page matter:element the matter we are looking at body_name: str the body name of the current meeting s_word_formated: datetime the date of the current meeting persons: dict Dict[str, ingestion_models.Person] Returns ------- ingestion model minutes ingestion model with the matters information """ from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.ui import WebDriverWait log.info("start get ingestion model for a matter") voting_list = [] matter_name = item[0:9] # name of the matter eg. "22-C-5024", "22-R-3404" matter_title = item[ 12: ] # the paragraph the describes the matter eg. "A COMMUNICATION FROM ..." matter_type = ( " " # the type of the matter eg. "COMMUNICATION", "SUBSTITUTE ORDINANCE" ) matter_type_temp = re.split(" BY| FROM", matter_title)[0] matter_type_list = re.split("A |AN ", matter_type_temp) if len(matter_type_list) > 1: matter_type = matter_type_list[1] link = driver.find_element("link text", item) link.click() # get to the specific page for each matter s_matter = WebDriverWait(driver, 10).until( ec.presence_of_all_elements_located( ( By.XPATH, '//*[@id="ContentPlaceHolder1_divHistory"]/div/table/tbody/tr', ) ) ) sponsor_raw = driver.find_element( By.XPATH, '//*[@id="tblLegiFileInfo"]/tbody/tr[1]/td[2]' ).text sponsor_list = sponsor_raw.split(", ") sponsors: Optional[list[ingestion_models.Person]] = [] status = None decision = None if sponsors is not None: for s in sponsor_list: if "District" in s: current_temp = s.split(" ")[2:] current_temp2 = " ".join(current_temp) current_name = re.match( r"([a-zA-Z]+)((\ {0,1}[a-zA-Z]+\.{0,1}\ )|(\ ))([a-zA-Z]+)", current_temp2, ) if current_name is not None: current = f"{current_name.group(1)} {current_name.group(5)}" else: raise ValueError("Person name could not be constructed.") if current in persons: sponsors.append(persons[current]) else: sponsors.append(get_new_person(current)) elif "Post" in s: current_temp3 = s.split("Large ")[1] current_name = re.match( r"([a-zA-Z]+)((\ {0,1}[a-zA-Z]+\.{0,1}\ )|(\ ))([a-zA-Z]+)", current_temp3, ) if current_name is not None: current = f"{current_name.group(1)} {current_name.group(5)}" else: raise ValueError("Person name could not be constructed.") if current in persons: sponsors.append(persons[current]) else: sponsors.append(get_new_person(current)) elif "President" in s: current_temp4 = s.split("President ")[1] current_name = re.match( r"([a-zA-Z]+)((\ {0,1}[a-zA-Z]+\.{0,1}\ )|(\ ))([a-zA-Z]+)", current_temp4, ) if current_name is not None: current = f"{current_name.group(1)} {current_name.group(5)}" else: raise ValueError("Person name could not be constructed.") if current in persons: sponsors.append(persons[current]) else: sponsors.append(get_new_person(current)) s_rows = len(s_matter) for i in range(1, s_rows + 1, 2): header = driver.find_element( By.XPATH, '//*[@id="ContentPlaceHolder1_divHistory"]/div/table/tbody/tr[' + str(i) + "]", ) date = header.find_element(By.CLASS_NAME, "Date").text date_formated = datetime.strptime(date[:-6], "%b %d, %Y %I:%M %p") if s_word_formated == date_formated: # match the current meeting date sub_sections, status = get_matter_status( driver, i ) # get the decision and result_status of the matter if status in ( db_constants.MatterStatusDecision.IN_PROGRESS, db_constants.MatterStatusDecision.ADOPTED, ): decision = db_constants.EventMinutesItemDecision.PASSED else: decision = db_constants.EventMinutesItemDecision.FAILED if "[" in test: voting_list = get_voting_result( driver, len(sub_sections), i, body_name, persons ) if len(sponsors) != 0: return ingestion_models.EventMinutesItem( minutes_item=ingestion_models.MinutesItem( name=matter_name.title(), description=matter_title ), matter=ingestion_models.Matter( matter_name, matter_type=matter_type, title=matter_title, result_status=status, sponsors=sponsors, ), decision=decision, votes=voting_list, ) return ingestion_models.EventMinutesItem( minutes_item=ingestion_models.MinutesItem( name=matter_name.title(), description=matter_title ), matter=ingestion_models.Matter( matter_name, matter_type=matter_type, title=matter_title, result_status=status, ), decision=decision, votes=voting_list, )
[docs] def parse_event( # noqa: C901 url: str, ) -> ingestion_models.EventIngestionModel: """ Scrapes all the information for a meeting. Parameters ---------- url:str the url of the meeting that we want to scrape Returns ------- ingestion model the ingestion model for the meeting """ import selenium from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.ui import WebDriverWait from webdriver_manager.chrome import ChromeDriverManager log.info("start get ingestion model for a event") minute_index = [chr(i) for i in range(ord("A"), ord("Z") + 1)] persons = get_person() chrome_options = Options() chrome_options.add_argument("--headless") driver = webdriver.Chrome( options=chrome_options, service=Service(ChromeDriverManager().install()) ) driver.get(url) WebDriverWait(driver, 10).until( ec.presence_of_all_elements_located( (By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr') ) ) body_name = driver.find_element( By.ID, "ContentPlaceHolder1_lblMeetingGroup" ).text # body name if body_name == "Atlanta City Council": body_name = "City Council" s_word = driver.find_element(By.ID, "ContentPlaceHolder1_lblMeetingDate").text s_word_formated = datetime.strptime(s_word, "%m/%d/%Y %I:%M %p") video_link = driver.find_element(By.ID, "MediaPlayer1_html5_api").get_attribute( "src" ) # video link (mp4) event_minutes_items = [] i = 1 while ( len( driver.find_elements( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]" ) ) != 0 ): try: if ( len( driver.find_elements( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[1]/strong", ) ) ) != 0 and ( len( driver.find_element( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[1]/strong", ).text ) ) != 0: if ( driver.find_element( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[1]/strong", ).text )[0] in minute_index: if ( len( driver.find_elements( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i + 1) + "]/td[3]/span", ) ) == 0 ): minute_title = driver.find_element( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[2]", ).text minute_model = ingestion_models.EventMinutesItem( minutes_item=ingestion_models.MinutesItem( minute_title.title() ) ) event_minutes_items.append(minute_model) elif ( len( driver.find_elements( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[3]/span", ) ) ) != 0: matter = driver.find_element( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[3]" ) test = matter.find_element(By.CLASS_NAME, "ItemVoteResult").text item = matter.find_element(By.CLASS_NAME, "AgendaOutlineLink").text if len(item) != 0: matter_model = parse_single_matter( driver, test, item, body_name, s_word_formated, persons ) event_minutes_items.append(matter_model) elif ( len( driver.find_elements( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[6]/span", ) ) ) != 0: matter = driver.find_element( By.XPATH, '//*[@id="MeetingDetail"]/tbody/tr[' + str(i) + "]/td[6]" ) test = matter.find_element(By.CLASS_NAME, "ItemVoteResult").text item = matter.find_element(By.CLASS_NAME, "AgendaOutlineLink").text if len(item) != 0: matter_model = parse_single_matter( driver, test, item, body_name, s_word_formated, persons ) event_minutes_items.append(matter_model) i += 1 except ( selenium.common.exceptions.NoSuchElementException, selenium.common.exceptions.TimeoutException, ): i += 1 continue try: agenda_link = driver.find_element( By.ID, "ContentPlaceHolder1_hlPublicAgendaFile" ).get_attribute("oldhref") agenda_uri = "https://atlantacityga.iqm2.com/Citizens/" + agenda_link except selenium.common.exceptions.NoSuchElementException: agenda_uri = None try: minutes_link = driver.find_element( By.ID, "ContentPlaceHolder1_hlPublicMinutesFile" ).get_attribute("oldhref") minutes_uri = "https://atlantacityga.iqm2.com/Citizens/" + minutes_link except selenium.common.exceptions.NoSuchElementException: minutes_uri = None return ingestion_models.EventIngestionModel( body=ingestion_models.Body(body_name, is_active=True), sessions=[ ingestion_models.Session( video_uri=video_link, session_index=0, session_datetime=s_word_formated, ) ], event_minutes_items=event_minutes_items, agenda_uri=agenda_uri, minutes_uri=minutes_uri, )
[docs] def get_year(driver: "WebDriver", url: str, from_dt: datetime) -> str: """ Navigate to the year that we are looking for. Parameters ---------- driver:webdriver empty webdriver url:str the url of the calender page from_dt:datetime the datetime object for the search target year Returns ------- link:str the link to the calender of the year that we are looking for """ from selenium.webdriver.common.by import By log.info("start get the current year's calender page") driver.get(url) dates = driver.find_element(By.ID, "ContentPlaceHolder1_lblCalendarRange") link_temp = dates.find_element( By.XPATH, ("//*[text()='" + str(from_dt.year) + "']") ).get_attribute("href") link = "https://atlantacityga.iqm2.com" + link_temp return link
[docs] def get_date( driver: "WebDriver", url: str, from_dt: datetime, to_dt: datetime, ) -> list: """ Get a list of ingestion models for the meetings hold during the selected time range. Parameters ---------- driver:webdriver empty webdriver url:str the url of the calender page from_dt: the begin date to_dt: the end date Returns ------- list all the ingestion models for the selected date range """ from selenium.webdriver.common.by import By log.info("start calling parse_event for a signle meeting") driver.get(url) dates = driver.find_elements(By.CLASS_NAME, "RowTop") events = [] for current_date in dates: current_meeting_date = current_date.find_element(By.CLASS_NAME, "RowLink") current_meeting_time = datetime.strptime( current_meeting_date.text, "%b %d, %Y %I:%M %p" ) if from_dt <= current_meeting_time <= to_dt: link_temp = current_date.find_element( By.CSS_SELECTOR, ".WithoutSeparator a" ).get_attribute("onclick") link = "https://atlantacityga.iqm2.com" + link_temp[23:-3] event = parse_event(link) events.append(event) else: continue driver.quit() return events
[docs] def get_events(from_dt: datetime, to_dt: datetime) -> list: """ gets the right calender link feed it to the function that get a list of ingestion models. Parameters ---------- from_dt: the begin date to_dt: the end date Returns ------- list all the ingestion models for the selected date range """ from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager chrome_options = Options() chrome_options.add_argument("--headless") log.info("start a date range and run all the functions") driver = webdriver.Chrome( options=chrome_options, service=Service(ChromeDriverManager().install()) ) web_url = "https://atlantacityga.iqm2.com/Citizens/Calendar.aspx?Frame=Yes" driver.get(web_url) if from_dt.year != datetime.today().year: web_url = get_year(driver, web_url, from_dt) events = get_date(driver, web_url, from_dt, to_dt) return events