From 4147b7185dbf1fe62e921567e43d16adffeb0076 Mon Sep 17 00:00:00 2001 From: lasseedfast <> Date: Tue, 15 Oct 2024 15:20:22 +0200 Subject: [PATCH] Refactor scrape_initiatives.py to improve code structure and readability --- scrape_initiatives.py | 163 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 scrape_initiatives.py diff --git a/scrape_initiatives.py b/scrape_initiatives.py new file mode 100644 index 0000000..c1e06ce --- /dev/null +++ b/scrape_initiatives.py @@ -0,0 +1,163 @@ +import re +import json +import asyncio +import os +from pyppeteer import launch +from bs4 import BeautifulSoup +from _arango import ArangoDB +from datetime import datetime +import random +from colorprinter.print_color import * + +start = datetime.now() + +def sanitize_filename(filename): + return re.sub(r'[\\/*?:"<>|]', "_", filename) + +async def get_info(browser, doc, download_path): + page = await browser.newPage() + await page._client.send('Page.setDownloadBehavior', { + 'behavior': 'allow', + 'downloadPath': download_path + }) + await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36') + + await page.goto(doc['link'], {'waitUntil': 'networkidle2'}) + await asyncio.sleep(2) + content = await page.content() + + # Locate and click the download button + try: + await page.click('a.ecl-file__download') # Adjust the selector as needed + await asyncio.sleep(5) # Wait for the download to complete + except Exception as e: + print(f"Error clicking download button: {e}") + + await page.close() + + # Rename the downloaded file + for filename in os.listdir(download_path): + if filename.endswith(".pdf") and '_' not in filename: # Adjust the file extension as needed + old_file = os.path.join(download_path, filename) + new_file = os.path.join(download_path, f"{doc['_key']}.pdf") + try: + # Check if the new file already exists + if os.path.exists(new_file): + os.remove(new_file) + os.rename(old_file, new_file) + except: + try: + os.rename(old_file, new_file[:50]) + except: + os.remove(old_file) + doc['meta']['file_path_doc'] = new_file + break + + soup = BeautifulSoup(content, 'html.parser') + + consultation_button = soup.find('span', {'class': 'ecl-ecl-button__container'}) + if consultation_button: + doc['consultation_link'] = f"https://ec.europa.eu{doc['link']}/public-consultation_en" + else: + doc['consultation_link'] = None + summary_div = soup.find('div', {'class': 'initiative-detail-summary'}) + if summary_div: + doc['summary'] = summary_div.get_text(strip=True) + else: + doc['summary'] = '' + + for link in soup.find_all('a'): + if 'All feedback and statistics' in link.get_text(): + doc['feedback_url'] = f"https://ec.europa.eu{link['href']}" + break + + return doc + +async def get_all_initiatives(browser, page_number: int, status): + page = await browser.newPage() + await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36') + + url = f'https://ec.europa.eu/info/law/better-regulation/have-your-say/initiatives_en?feedbackStatus={status}&page={page_number}' + + await page.goto(url, {'waitUntil': 'networkidle2'}) + await asyncio.sleep(2) + content = await page.content() + await page.close() + + soup = BeautifulSoup(content, 'html.parser') + + html = soup.find_all('div', {'class': 'ux-block-content'})[0] + + + articles = html.find_all('article', {'class': 'search-result-item'}) + if articles == []: + return None + initiatives = [] + for article in articles: + try: + link_tag = article.find('a', {'class': 'ecl-u-pt-xs ecl-u-type-none'}) + link = f"https://ec.europa.eu{link_tag['href']}" + headline = link_tag.find('div', {'class': 'search-result-title'}).get_text(strip=True) + + topic_div = article.find('div', string='Topic') + topic = topic_div.find_next_sibling('div').get_text(strip=True) if topic_div else None + + type_of_act_div = article.find('div', string='Type of act') + type_of_act = type_of_act_div.find_next_sibling('div').get_text(strip=True) if type_of_act_div else None + + feedback_period_div = article.find('div', string='Feedback period') + feedback_period = feedback_period_div.find_next_sibling('div').get_text(strip=True) if feedback_period_div else None + + initiative = { + 'headline': headline, + 'link': link, + 'topic': topic, + 'type_of_act': type_of_act, + 'feedback_period': feedback_period + } + initiatives.append(initiative) + except Exception as e: + print(f"Error fetching initiative: {e}") + print(article.prettify()) + print(html.prettify()) + exit() + return initiatives + +async def main(): + arango = ArangoDB() + if not arango.db.has_collection('eu_initiatives'): + arango.db.create_collection('eu_initiatives') + arango.db.collection('eu_initiatives').truncate() + browser = await launch(headless=True, args=['--no-sandbox', '--disable-setuid-sandbox']) + + for status in ['OPEN', 'CLOSED', 'UPCOMING', 'DISABLED']: + print(status) + for page_number in range(0, 335): + initiatives = await get_all_initiatives(browser, page_number, status) + if not initiatives: + break + for initiative in initiatives: + print(initiative['headline']) + initiative['_key'] = arango.fix_key(f"{initiative['headline'].split('_')[0]}") + initiative['meta'] = {'status': status, 'date': datetime.now().strftime('%Y-%m-%d'), 'page': page_number} + initiative = await get_info(browser, initiative, 'initiatives_downloads') + arango.db.collection('eu_initiatives').insert(initiative, overwrite=True) + print(f'Page {page_number} done') + + await browser.close() + + +if __name__ == '__main__': + + # arango = ArangoDB() + # initiatives = [i for i in arango.db.collection('eu_initiatives').all()] + # ordered_by_headline = sorted(initiatives, key=lambda x: x['headline']) + + # s = set() + # for i in ordered_by_headline: + + # s.add(i['headline']) + # print(len(s)) + + + asyncio.get_event_loop().run_until_complete(main()) \ No newline at end of file