import os import urllib import streamlit as st from _base_class import StreamlitBaseClass import feedparser import requests from bs4 import BeautifulSoup from urllib.parse import urljoin from utils import fix_key from colorprinter.print_color import * from datetime import datetime, timedelta class RSSFeedsPage(StreamlitBaseClass): def __init__(self, username: str): super().__init__(username=username) self.page_name = "RSS Feeds" # Initialize attributes from session state if available for k, v in st.session_state.get(self.page_name, {}).items(): setattr(self, k, v) def run(self): if "selected_feed" not in st.session_state: st.session_state["selected_feed"] = None self.update_current_page(self.page_name) self.display_feed() self.sidebar_actions() # Persist state to session_state self.update_session_state(page_name=self.page_name) def select_rss_feeds(self): # Fetch RSS feeds from the user's ArangoDB collection rss_feeds = self.get_rss_feeds() if rss_feeds: feed_options = [feed["title"] for feed in rss_feeds] with st.sidebar: st.subheader("Show your feeds") selected_feed_title = st.selectbox( "Select a feed", options=feed_options, index=None ) if selected_feed_title: st.session_state["selected_feed"] = [ feed["_key"] for feed in rss_feeds if feed["title"] == selected_feed_title ][0] st.rerun() else: st.write("You have no RSS feeds added.") def get_rss_feeds(self): return list(self.user_arango.db.collection("rss_feeds").all()) def sidebar_actions(self): with st.sidebar: # Select a feed to show self.select_rss_feeds() st.subheader("Add a New RSS Feed") rss_url = st.text_input("Website URL or RSS Feed URL") if st.button("Discover Feeds"): if rss_url: with st.spinner("Discovering feeds..."): feeds = self.discover_feeds(rss_url) if feeds: st.session_state["discovered_feeds"] = feeds st.rerun() else: st.error("No RSS feeds found at the provided URL.") if "discovered_feeds" in st.session_state: st.subheader("Select a Feed to Add") feeds = st.session_state["discovered_feeds"] feed_options = [f"{feed['title']} ({feed['href']})" for feed in feeds] selected_feed = st.selectbox("Available Feeds", options=feed_options) selected_feed_url = feeds[feed_options.index(selected_feed)]["href"] if st.button("Preview Feed"): feed_data = feedparser.parse(selected_feed_url) st.write(f"{feed_data.feed.get('title', 'No title')}") description = html_to_markdown( feed_data.feed.get("description", "No description") ) st.write(f"_{description}_") for entry in feed_data.entries[:5]: print("ENTRY:") with st.expander(entry.title): summary = ( entry.summary if "summary" in entry else "No summary available" ) markdown_summary = html_to_markdown(summary) st.markdown(markdown_summary) if st.button( "Add RSS Feed", on_click=self.add_rss_feed, args=(selected_feed_url, feed_data, description), ): del st.session_state["discovered_feeds"] st.success("RSS Feed added.") st.rerun() def discover_feeds(self, url): try: if not url.startswith("http"): url = "https://" + url # Check if the input URL is already an RSS feed f = feedparser.parse(url) if len(f.entries) > 0: return [ { "href": url, "title": f.feed.get("title", "No title"), "icon": self.get_site_icon(url), } ] # If not, proceed to discover feeds from the webpage raw = requests.get(url).text result = [] possible_feeds = [] html = BeautifulSoup(raw, "html.parser") # Find the site icon icon_url = self.get_site_icon(url, html) # Find all tags with rel="alternate" and type containing "rss" or "xml" feed_urls = html.findAll("link", rel="alternate") for f in feed_urls: t = f.get("type", None) if t and ("rss" in t or "xml" in t): href = f.get("href", None) if href: possible_feeds.append(urljoin(url, href)) # Find all tags with href containing "rss", "xml", or "feed" parsed_url = urllib.parse.urlparse(url) base = parsed_url.scheme + "://" + parsed_url.hostname atags = html.findAll("a") for a in atags: href = a.get("href", None) if href and ("rss" in href or "xml" in href or "feed" in href): possible_feeds.append(urljoin(base, href)) # Validate the possible feeds using feedparser for feed_url in list(set(possible_feeds)): f = feedparser.parse(feed_url) if len(f.entries) > 0: result.append( { "href": feed_url, "title": f.feed.get("title", "No title"), "icon": icon_url, } ) return result except Exception as e: print(f"Error discovering feeds: {e}") return [] def add_rss_feed(self, url, feed_data, description): try: icon_url = feed_data["feed"]["image"]["href"] except Exception as e: icon_url = self.get_site_icon(url) title = feed_data["feed"].get("title", "No title") print_blue(title) icon_path = download_icon(icon_url) if icon_url else None _key = fix_key(url) now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string self.user_arango.db.collection("rss_feeds").insert( { "_key": _key, "url": url, "title": title, "icon_path": icon_path, "description": description, 'fetched_timestamp': now_timestamp, # Add the timestamp field 'feed_data': feed_data, }, overwrite=True, ) feed = self.get_feed_from_arango(_key) now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string if feed: self.update_feed(_key, feed) else: self.base_arango.db.collection("rss_feeds").insert( { "_key": _key, "url": url, "title": title, "icon_path": icon_path, "description": description, 'fetched_timestamp': now_timestamp, # Add the timestamp field "feed_data": feed_data, }, overwrite=True, overwrite_mode="update", ) def update_feed(self, feed_key, feed=None): """ Updates RSS feed that already exists in the ArangoDB base database. Args: feed_key (str): The key identifying the feed in the database. Returns: dict: The parsed feed data. Raises: Exception: If there is an error updating the feed in the database. """ if not feed: feed = self.get_feed_from_arango(feed_key) feed_data = feedparser.parse(feed["url"]) print_rainbow(feed_data['feed']) feed["feed_data"] = feed_data if self.username not in feed.get("users", []): feed["users"] = feed.get("users", []) + [self.username] fetched_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string # Update the fetched_timestamp in the database self.base_arango.db.collection("rss_feeds").update( { "_key": feed["_key"], "fetched_timestamp": fetched_timestamp, "feed_data": feed_data, } ) return feed_data def update_session_state(self, page_name=None): # Update session state if page_name: st.session_state[page_name] = self.__dict__ def get_site_icon(self, url, html=None): try: if not html: raw = requests.get(url).text html = BeautifulSoup(raw, "html.parser") icon_link = html.find("link", rel="icon") if icon_link: icon_url = icon_link.get("href", None) if icon_url: return urljoin(url, icon_url) # Fallback to finding other common icon links icon_link = html.find("link", rel="shortcut icon") if icon_link: icon_url = icon_link.get("href", None) if icon_url: return urljoin(url, icon_url) return None except Exception as e: print(f"Error getting site icon: {e}") return None def get_feed_from_arango(self, feed_key): """ Retrieve an RSS feed from the ArangoDB base databse. Args: feed_key (str): The key of the RSS feed to retrieve from the ArangoDB base database. Returns: dict: The RSS feed document retrieved from the ArangoDB base database. """ return self.base_arango.db.collection("rss_feeds").get(feed_key) def get_feed(self, feed_key): feed = self.get_feed_from_arango(feed_key) feed_data = feed["feed_data"] fetched_time = datetime.fromisoformat(feed['fetched_timestamp']) # Parse the timestamp string if datetime.now() - fetched_time < timedelta(hours=1): return feed_data else: return self.update_feed(feed_key) def display_feed(self): if st.session_state["selected_feed"]: feed_data = self.get_feed(st.session_state["selected_feed"]) st.title(feed_data['feed'].get("title", "No title")) st.write(feed_data['feed'].get("description", "No description")) st.write("**Recent Entries:**") for entry in feed_data['entries'][:5]: with st.expander(entry['title']): summary = ( entry['summary'] if "summary" in entry else "No summary available" ) markdown_summary = html_to_markdown(summary) st.markdown(markdown_summary) st.markdown(f"[Read more]({entry['link']})") def html_to_markdown(html): soup = BeautifulSoup(html, "html.parser") for br in soup.find_all("br"): br.replace_with("\n") for strong in soup.find_all("strong"): strong.replace_with(f"**{strong.text}**") for em in soup.find_all("em"): em.replace_with(f"*{em.text}*") for p in soup.find_all("p"): p.replace_with(f"{p.text}\n\n") return soup.get_text() def download_icon(icon_url, save_folder="external_icons"): try: if not os.path.exists(save_folder): os.makedirs(save_folder) response = requests.get(icon_url, stream=True) if response.status_code == 200: icon_name = os.path.basename(icon_url) icon_path = os.path.join(save_folder, icon_name) with open(icon_path, "wb") as f: for chunk in response.iter_content(1024): f.write(chunk) return icon_path else: print(f"Failed to download icon: {response.status_code}") return None except Exception as e: print(f"Error downloading icon: {e}") return None