import re import json import asyncio import os from pyppeteer import launch from bs4 import BeautifulSoup from _arango import ArangoDB from datetime import datetime import random from colorprinter.print_color import * start = datetime.now() def sanitize_filename(filename): return re.sub(r'[\\/*?:"<>|]', "_", filename) async def get_info(browser, doc, download_path): page = await browser.newPage() await page._client.send('Page.setDownloadBehavior', { 'behavior': 'allow', 'downloadPath': download_path }) await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36') await page.goto(doc['link'], {'waitUntil': 'networkidle2'}) await asyncio.sleep(2) content = await page.content() # Locate and click the download button try: await page.click('a.ecl-file__download') # Adjust the selector as needed await asyncio.sleep(5) # Wait for the download to complete except Exception as e: print(f"Error clicking download button: {e}") await page.close() # Rename the downloaded file for filename in os.listdir(download_path): if filename.endswith(".pdf") and '_' not in filename: # Adjust the file extension as needed old_file = os.path.join(download_path, filename) new_file = os.path.join(download_path, f"{doc['_key']}.pdf") try: # Check if the new file already exists if os.path.exists(new_file): os.remove(new_file) os.rename(old_file, new_file) except: try: os.rename(old_file, new_file[:50]) except: os.remove(old_file) doc['meta']['file_path_doc'] = new_file break soup = BeautifulSoup(content, 'html.parser') consultation_button = soup.find('span', {'class': 'ecl-ecl-button__container'}) if consultation_button: doc['consultation_link'] = f"https://ec.europa.eu{doc['link']}/public-consultation_en" else: doc['consultation_link'] = None summary_div = soup.find('div', {'class': 'initiative-detail-summary'}) if summary_div: doc['summary'] = summary_div.get_text(strip=True) else: doc['summary'] = '' for link in soup.find_all('a'): if 'All feedback and statistics' in link.get_text(): doc['feedback_url'] = f"https://ec.europa.eu{link['href']}" break return doc async def get_all_initiatives(browser, page_number: int, status): page = await browser.newPage() await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36') url = f'https://ec.europa.eu/info/law/better-regulation/have-your-say/initiatives_en?feedbackStatus={status}&page={page_number}' await page.goto(url, {'waitUntil': 'networkidle2'}) await asyncio.sleep(2) content = await page.content() await page.close() soup = BeautifulSoup(content, 'html.parser') html = soup.find_all('div', {'class': 'ux-block-content'})[0] articles = html.find_all('article', {'class': 'search-result-item'}) if articles == []: return None initiatives = [] for article in articles: try: link_tag = article.find('a', {'class': 'ecl-u-pt-xs ecl-u-type-none'}) link = f"https://ec.europa.eu{link_tag['href']}" headline = link_tag.find('div', {'class': 'search-result-title'}).get_text(strip=True) topic_div = article.find('div', string='Topic') topic = topic_div.find_next_sibling('div').get_text(strip=True) if topic_div else None type_of_act_div = article.find('div', string='Type of act') type_of_act = type_of_act_div.find_next_sibling('div').get_text(strip=True) if type_of_act_div else None feedback_period_div = article.find('div', string='Feedback period') feedback_period = feedback_period_div.find_next_sibling('div').get_text(strip=True) if feedback_period_div else None initiative = { 'headline': headline, 'link': link, 'topic': topic, 'type_of_act': type_of_act, 'feedback_period': feedback_period } initiatives.append(initiative) except Exception as e: print(f"Error fetching initiative: {e}") print(article.prettify()) print(html.prettify()) exit() return initiatives async def main(): arango = ArangoDB() if not arango.db.has_collection('eu_initiatives'): arango.db.create_collection('eu_initiatives') arango.db.collection('eu_initiatives').truncate() browser = await launch(headless=True, args=['--no-sandbox', '--disable-setuid-sandbox']) for status in ['OPEN', 'CLOSED', 'UPCOMING', 'DISABLED']: print(status) for page_number in range(0, 335): initiatives = await get_all_initiatives(browser, page_number, status) if not initiatives: break for initiative in initiatives: print(initiative['headline']) initiative['_key'] = arango.fix_key(f"{initiative['headline'].split('_')[0]}") initiative['meta'] = {'status': status, 'date': datetime.now().strftime('%Y-%m-%d'), 'page': page_number} initiative = await get_info(browser, initiative, 'initiatives_downloads') arango.db.collection('eu_initiatives').insert(initiative, overwrite=True) print(f'Page {page_number} done') await browser.close() if __name__ == '__main__': # arango = ArangoDB() # initiatives = [i for i in arango.db.collection('eu_initiatives').all()] # ordered_by_headline = sorted(initiatives, key=lambda x: x['headline']) # s = set() # for i in ordered_by_headline: # s.add(i['headline']) # print(len(s)) asyncio.get_event_loop().run_until_complete(main())