# rss_reader.py import feedparser import requests import urllib from bs4 import BeautifulSoup from urllib.parse import urljoin from datetime import datetime, timedelta from utils import fix_key import os from _base_class import BaseClass from _llm import LLM from colorprinter.print_color import * class RSSFeed: def __init__(self): self.url = None self.title = None self.icon_path = None self.description = None self.feed_data = None self.fetched_timestamp = None self.entries = [] class RSSReader(BaseClass): def __init__(self, username): super().__init__(username=username) self.username = username self.user_arango = self.get_arango(username) self.feed: RSSFeed = None self.arango_feed = None def discover_feeds(self, url): try: if not url.startswith("http"): url = "https://" + url # Check if the input URL is already an RSS feed f = feedparser.parse(url) if len(f.entries) > 0: return [ { "href": url, "title": f.feed.get("title", "No title"), "icon": self.get_site_icon(url), } ] # If not, proceed to discover feeds from the webpage raw = requests.get(url).text result = [] possible_feeds = [] html = BeautifulSoup(raw, "html.parser") # Find the site icon icon_url = self.get_site_icon(url, html) # Find all tags with rel="alternate" and type containing "rss" or "xml" feed_urls = html.findAll("link", rel="alternate") for f in feed_urls: t = f.get("type", None) if t and ("rss" in t or "xml" in t): href = f.get("href", None) if href: possible_feeds.append(urljoin(url, href)) # Find all tags with href containing "rss", "xml", or "feed" parsed_url = urllib.parse.urlparse(url) base = parsed_url.scheme + "://" + parsed_url.hostname atags = html.findAll("a") for a in atags: href = a.get("href", None) if href and ("rss" in href or "xml" in href or "feed" in href): possible_feeds.append(urljoin(base, href)) # Validate the possible feeds using feedparser for feed_url in list(set(possible_feeds)): f = feedparser.parse(feed_url) if len(f.entries) > 0: result.append( { "href": feed_url, "title": f.feed.get("title", "No title"), "icon": icon_url, } ) return result except Exception as e: print(f"Error discovering feeds: {e}") return [] def add_rss_feed(self, url): self.get_feed(url) self.load_feed_from_url(url=url) self.feed._key = fix_key(self.feed.url) # Store feed data in base_arango's rss_feeds collection self.base_arango.db.collection("rss_feeds").insert(self.feed.__dict__) # Store a reference to the feed in user_arango's user_feeds collection self.user_arango.db.collection("user_feeds").insert( { "_key": self.feed._key, # Use the same key to reference the feed "feed_key": self.feed._key, "subscribed_on": datetime.now().isoformat(), # Add additional user-specific fields here }, overwrite=True, ) def load_feed_from_url(self, url=None, data=None): if url: self.feed = RSSFeed() self.feed.url = url full_feed_data = feedparser.parse(url) elif data: self.feed = RSSFeed() self.feed.url = data.get("url", None) full_feed_data = data else: full_feed_data = feedparser.parse(self.feed.url) self.feed.title = full_feed_data["feed"].get("title", "No title") self.feed.description = full_feed_data["feed"].get( "description", "No description" ) self.feed.icon_path = self.get_site_icon(self.feed.url) self.feed.entries = [] for entry in full_feed_data["entries"]: self.feed.entries.append( { "title": entry.get("title", "No title"), "link": entry.get("link"), "published": entry.get("published"), "summary": self.html_to_markdown( entry.get("summary", "No summary") ), "id": entry.get("id"), "author": entry.get("author"), } ) self.feed.fetched_timestamp = datetime.now().isoformat() def feed_data2feed(self, data): self.load_feed_from_url(data=data) def parse_feed(self, url): self.load_feed_from_url(url=url) return self.feed def update_feed(self): self.load_feed_from_url() # Update the feed in the database self.user_arango.db.collection("rss_feeds").update( { "_key": self.feed._key, "fetched_timestamp": self.feed.fetched_timestamp, "entries": self.feed.entries, } ) return self.feed.entries def get_feed(self, feed_key=None, url=None, _id=None): if feed_key: arango_doc = self.base_arango.db.collection("rss_feeds").get(feed_key) elif url: arango_doc = self.base_arango.db.aql.execute( f"FOR doc IN rss_feeds FILTER doc.url == '{url}' LIMIT 1 RETURN doc", count=True).next() elif _id: arango_doc = self.base_arango.db.aql.execute( f"FOR doc IN rss_feeds FILTER doc.id == '{_id}' LIMIT 1 RETURN doc", count=True).next() if arango_doc: self.feed = RSSFeed() for attr in arango_doc: setattr(self.feed, attr, arango_doc[attr]) fetched_time = datetime.fromisoformat(self.feed.fetched_timestamp) if datetime.now() - fetched_time < timedelta(hours=1): return self.feed.entries else: return self.update_feed() def get_site_icon(self, url, html=None): try: if not html: raw = requests.get(url).text html = BeautifulSoup(raw, "html.parser") icon_link = html.find("link", rel="icon") if icon_link: icon_url = icon_link.get("href", None) if icon_url: return urljoin(url, icon_url) # Fallback to finding other common icon links icon_link = html.find("link", rel="shortcut icon") if icon_link: icon_url = icon_link.get("href", None) if icon_url: return urljoin(url, icon_url) return None except Exception as e: print(f"Error getting site icon: {e}") return None def get_rss_feeds(self): return list(self.user_arango.db.collection("rss_feeds").all()) def download_icon(self, icon_url, save_folder="external_icons"): try: if not os.path.exists(save_folder): os.makedirs(save_folder) response = requests.get(icon_url, stream=True) if response.status_code == 200: icon_name = os.path.basename(icon_url) icon_path = os.path.join(save_folder, icon_name) with open(icon_path, "wb") as f: for chunk in response.iter_content(1024): f.write(chunk) return icon_path else: print(f"Failed to download icon: {response.status_code}") return None except Exception as e: print(f"Error downloading icon: {e}") return None def html_to_markdown(self, html): soup = BeautifulSoup(html, "html.parser") for br in soup.find_all("br"): br.replace_with("\n") for strong in soup.find_all("strong"): strong.replace_with(f"**{strong.text}**") for em in soup.find_all("em"): em.replace_with(f"*{em.text}*") for p in soup.find_all("p"): p.replace_with(f"{p.text}\n\n") return soup.get_text() def get_full_content(self, url): result = requests.get(url) soup = BeautifulSoup(result.content, "html.parser") class RSSAnalyzer(BaseClass): def init(self, username): super().__init__(username=username) self.llm = LLM(system_message="You are reading RSS Feeds to analyze them.") self.user_arango = self.get_arango_db(username) self.rss_reader = RSSReader(username, self.base_arango, self.user_arango)