You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

345 lines
13 KiB

import os
import urllib
import streamlit as st
from _base_class import StreamlitBaseClass
import feedparser
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from utils import fix_key
from colorprinter.print_color import *
from datetime import datetime, timedelta
class RSSFeedsPage(StreamlitBaseClass):
def __init__(self, username: str):
super().__init__(username=username)
self.page_name = "RSS Feeds"
# Initialize attributes from session state if available
for k, v in st.session_state.get(self.page_name, {}).items():
setattr(self, k, v)
def run(self):
if "selected_feed" not in st.session_state:
st.session_state["selected_feed"] = None
self.update_current_page(self.page_name)
self.display_feed()
self.sidebar_actions()
# Persist state to session_state
self.update_session_state(page_name=self.page_name)
def select_rss_feeds(self):
# Fetch RSS feeds from the user's ArangoDB collection
rss_feeds = self.get_rss_feeds()
if rss_feeds:
feed_options = [feed["title"] for feed in rss_feeds]
with st.sidebar:
st.subheader("Show your feeds")
selected_feed_title = st.selectbox(
"Select a feed", options=feed_options, index=None
)
if selected_feed_title:
st.session_state["selected_feed"] = [
feed["_key"]
for feed in rss_feeds
if feed["title"] == selected_feed_title
][0]
st.rerun()
else:
st.write("You have no RSS feeds added.")
def get_rss_feeds(self):
return list(self.user_arango.db.collection("rss_feeds").all())
def sidebar_actions(self):
with st.sidebar:
# Select a feed to show
self.select_rss_feeds()
st.subheader("Add a New RSS Feed")
rss_url = st.text_input("Website URL or RSS Feed URL")
if st.button("Discover Feeds"):
if rss_url:
with st.spinner("Discovering feeds..."):
feeds = self.discover_feeds(rss_url)
if feeds:
st.session_state["discovered_feeds"] = feeds
st.rerun()
else:
st.error("No RSS feeds found at the provided URL.")
if "discovered_feeds" in st.session_state:
st.subheader("Select a Feed to Add")
feeds = st.session_state["discovered_feeds"]
feed_options = [f"{feed['title']} ({feed['href']})" for feed in feeds]
selected_feed = st.selectbox("Available Feeds", options=feed_options)
selected_feed_url = feeds[feed_options.index(selected_feed)]["href"]
if st.button("Preview Feed"):
feed_data = feedparser.parse(selected_feed_url)
st.write(f"{feed_data.feed.get('title', 'No title')}")
description = html_to_markdown(
feed_data.feed.get("description", "No description")
)
st.write(f"_{description}_")
for entry in feed_data.entries[:5]:
print("ENTRY:")
with st.expander(entry.title):
summary = (
entry.summary
if "summary" in entry
else "No summary available"
)
markdown_summary = html_to_markdown(summary)
st.markdown(markdown_summary)
if st.button(
"Add RSS Feed",
on_click=self.add_rss_feed,
args=(selected_feed_url, feed_data, description),
):
del st.session_state["discovered_feeds"]
st.success("RSS Feed added.")
st.rerun()
def discover_feeds(self, url):
try:
if not url.startswith("http"):
url = "https://" + url
# Check if the input URL is already an RSS feed
f = feedparser.parse(url)
if len(f.entries) > 0:
return [
{
"href": url,
"title": f.feed.get("title", "No title"),
"icon": self.get_site_icon(url),
}
]
# If not, proceed to discover feeds from the webpage
raw = requests.get(url).text
result = []
possible_feeds = []
html = BeautifulSoup(raw, "html.parser")
# Find the site icon
icon_url = self.get_site_icon(url, html)
# Find all <link> tags with rel="alternate" and type containing "rss" or "xml"
feed_urls = html.findAll("link", rel="alternate")
for f in feed_urls:
t = f.get("type", None)
if t and ("rss" in t or "xml" in t):
href = f.get("href", None)
if href:
possible_feeds.append(urljoin(url, href))
# Find all <a> tags with href containing "rss", "xml", or "feed"
parsed_url = urllib.parse.urlparse(url)
base = parsed_url.scheme + "://" + parsed_url.hostname
atags = html.findAll("a")
for a in atags:
href = a.get("href", None)
if href and ("rss" in href or "xml" in href or "feed" in href):
possible_feeds.append(urljoin(base, href))
# Validate the possible feeds using feedparser
for feed_url in list(set(possible_feeds)):
f = feedparser.parse(feed_url)
if len(f.entries) > 0:
result.append(
{
"href": feed_url,
"title": f.feed.get("title", "No title"),
"icon": icon_url,
}
)
return result
except Exception as e:
print(f"Error discovering feeds: {e}")
return []
def add_rss_feed(self, url, feed_data, description):
try:
icon_url = feed_data["feed"]["image"]["href"]
except Exception as e:
icon_url = self.get_site_icon(url)
title = feed_data["feed"].get("title", "No title")
print_blue(title)
icon_path = download_icon(icon_url) if icon_url else None
_key = fix_key(url)
now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string
self.user_arango.db.collection("rss_feeds").insert(
{
"_key": _key,
"url": url,
"title": title,
"icon_path": icon_path,
"description": description,
'fetched_timestamp': now_timestamp, # Add the timestamp field
'feed_data': feed_data,
},
overwrite=True,
)
feed = self.get_feed_from_arango(_key)
now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string
if feed:
self.update_feed(_key, feed)
else:
self.base_arango.db.collection("rss_feeds").insert(
{
"_key": _key,
"url": url,
"title": title,
"icon_path": icon_path,
"description": description,
'fetched_timestamp': now_timestamp, # Add the timestamp field
"feed_data": feed_data,
},
overwrite=True,
overwrite_mode="update",
)
def update_feed(self, feed_key, feed=None):
"""
Updates RSS feed that already exists in the ArangoDB base database.
Args:
feed_key (str): The key identifying the feed in the database.
Returns:
dict: The parsed feed data.
Raises:
Exception: If there is an error updating the feed in the database.
"""
if not feed:
feed = self.get_feed_from_arango(feed_key)
feed_data = feedparser.parse(feed["url"])
print_rainbow(feed_data['feed'])
feed["feed_data"] = feed_data
if self.username not in feed.get("users", []):
feed["users"] = feed.get("users", []) + [self.username]
fetched_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string
# Update the fetched_timestamp in the database
self.base_arango.db.collection("rss_feeds").update(
{
"_key": feed["_key"],
"fetched_timestamp": fetched_timestamp,
"feed_data": feed_data,
}
)
return feed_data
def update_session_state(self, page_name=None):
# Update session state
if page_name:
st.session_state[page_name] = self.__dict__
def get_site_icon(self, url, html=None):
try:
if not html:
raw = requests.get(url).text
html = BeautifulSoup(raw, "html.parser")
icon_link = html.find("link", rel="icon")
if icon_link:
icon_url = icon_link.get("href", None)
if icon_url:
return urljoin(url, icon_url)
# Fallback to finding other common icon links
icon_link = html.find("link", rel="shortcut icon")
if icon_link:
icon_url = icon_link.get("href", None)
if icon_url:
return urljoin(url, icon_url)
return None
except Exception as e:
print(f"Error getting site icon: {e}")
return None
def get_feed_from_arango(self, feed_key):
"""
Retrieve an RSS feed from the ArangoDB base databse.
Args:
feed_key (str): The key of the RSS feed to retrieve from the ArangoDB base database.
Returns:
dict: The RSS feed document retrieved from the ArangoDB base database.
"""
return self.base_arango.db.collection("rss_feeds").get(feed_key)
def get_feed(self, feed_key):
feed = self.get_feed_from_arango(feed_key)
feed_data = feed["feed_data"]
fetched_time = datetime.fromisoformat(feed['fetched_timestamp']) # Parse the timestamp string
if datetime.now() - fetched_time < timedelta(hours=1):
return feed_data
else:
return self.update_feed(feed_key)
def display_feed(self):
if st.session_state["selected_feed"]:
feed_data = self.get_feed(st.session_state["selected_feed"])
st.title(feed_data['feed'].get("title", "No title"))
st.write(feed_data['feed'].get("description", "No description"))
st.write("**Recent Entries:**")
for entry in feed_data['entries'][:5]:
with st.expander(entry['title']):
summary = (
entry['summary'] if "summary" in entry else "No summary available"
)
markdown_summary = html_to_markdown(summary)
st.markdown(markdown_summary)
st.markdown(f"[Read more]({entry['link']})")
def html_to_markdown(html):
soup = BeautifulSoup(html, "html.parser")
for br in soup.find_all("br"):
br.replace_with("\n")
for strong in soup.find_all("strong"):
strong.replace_with(f"**{strong.text}**")
for em in soup.find_all("em"):
em.replace_with(f"*{em.text}*")
for p in soup.find_all("p"):
p.replace_with(f"{p.text}\n\n")
return soup.get_text()
def download_icon(icon_url, save_folder="external_icons"):
try:
if not os.path.exists(save_folder):
os.makedirs(save_folder)
response = requests.get(icon_url, stream=True)
if response.status_code == 200:
icon_name = os.path.basename(icon_url)
icon_path = os.path.join(save_folder, icon_name)
with open(icon_path, "wb") as f:
for chunk in response.iter_content(1024):
f.write(chunk)
return icon_path
else:
print(f"Failed to download icon: {response.status_code}")
return None
except Exception as e:
print(f"Error downloading icon: {e}")
return None