You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
9.0 KiB

# rss_reader.py
import feedparser
import requests
import urllib
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from datetime import datetime, timedelta
from utils import fix_key
import os
from _base_class import BaseClass
from _llm import LLM
from colorprinter.print_color import *
class RSSFeed:
def __init__(self):
self.url = None
self.title = None
self.icon_path = None
self.description = None
self.feed_data = None
self.fetched_timestamp = None
self.entries = []
class RSSReader(BaseClass):
def __init__(self, username):
super().__init__(username=username)
self.username = username
self.user_arango = self.get_arango(username)
self.feed: RSSFeed = None
self.arango_feed = None
def discover_feeds(self, url):
try:
if not url.startswith("http"):
url = "https://" + url
# Check if the input URL is already an RSS feed
f = feedparser.parse(url)
if len(f.entries) > 0:
return [
{
"href": url,
"title": f.feed.get("title", "No title"),
"icon": self.get_site_icon(url),
}
]
# If not, proceed to discover feeds from the webpage
raw = requests.get(url).text
result = []
possible_feeds = []
html = BeautifulSoup(raw, "html.parser")
# Find the site icon
icon_url = self.get_site_icon(url, html)
# Find all <link> tags with rel="alternate" and type containing "rss" or "xml"
feed_urls = html.findAll("link", rel="alternate")
for f in feed_urls:
t = f.get("type", None)
if t and ("rss" in t or "xml" in t):
href = f.get("href", None)
if href:
possible_feeds.append(urljoin(url, href))
# Find all <a> tags with href containing "rss", "xml", or "feed"
parsed_url = urllib.parse.urlparse(url)
base = parsed_url.scheme + "://" + parsed_url.hostname
atags = html.findAll("a")
for a in atags:
href = a.get("href", None)
if href and ("rss" in href or "xml" in href or "feed" in href):
possible_feeds.append(urljoin(base, href))
# Validate the possible feeds using feedparser
for feed_url in list(set(possible_feeds)):
f = feedparser.parse(feed_url)
if len(f.entries) > 0:
result.append(
{
"href": feed_url,
"title": f.feed.get("title", "No title"),
"icon": icon_url,
}
)
return result
except Exception as e:
print(f"Error discovering feeds: {e}")
return []
def add_rss_feed(self, url):
self.get_feed(url)
self.load_feed_from_url(url=url)
self.feed._key = fix_key(self.feed.url)
# Store feed data in base_arango's rss_feeds collection
self.base_arango.db.collection("rss_feeds").insert(self.feed.__dict__)
# Store a reference to the feed in user_arango's user_feeds collection
self.user_arango.db.collection("user_feeds").insert(
{
"_key": self.feed._key, # Use the same key to reference the feed
"feed_key": self.feed._key,
"subscribed_on": datetime.now().isoformat(),
# Add additional user-specific fields here
},
overwrite=True,
)
def load_feed_from_url(self, url=None, data=None):
if url:
self.feed = RSSFeed()
self.feed.url = url
full_feed_data = feedparser.parse(url)
elif data:
self.feed = RSSFeed()
self.feed.url = data.get("url", None)
full_feed_data = data
else:
full_feed_data = feedparser.parse(self.feed.url)
self.feed.title = full_feed_data["feed"].get("title", "No title")
self.feed.description = full_feed_data["feed"].get(
"description", "No description"
)
self.feed.icon_path = self.get_site_icon(self.feed.url)
self.feed.entries = []
for entry in full_feed_data["entries"]:
self.feed.entries.append(
{
"title": entry.get("title", "No title"),
"link": entry.get("link"),
"published": entry.get("published"),
"summary": self.html_to_markdown(
entry.get("summary", "No summary")
),
"id": entry.get("id"),
"author": entry.get("author"),
}
)
self.feed.fetched_timestamp = datetime.now().isoformat()
def feed_data2feed(self, data):
self.load_feed_from_url(data=data)
def parse_feed(self, url):
self.load_feed_from_url(url=url)
return self.feed
def update_feed(self):
self.load_feed_from_url()
# Update the feed in the database
self.user_arango.db.collection("rss_feeds").update(
{
"_key": self.feed._key,
"fetched_timestamp": self.feed.fetched_timestamp,
"entries": self.feed.entries,
}
)
return self.feed.entries
def get_feed(self, feed_key=None, url=None, _id=None):
if feed_key:
arango_doc = self.base_arango.db.collection("rss_feeds").get(feed_key)
elif url:
arango_doc = self.base_arango.db.aql.execute(
f"FOR doc IN rss_feeds FILTER doc.url == '{url}' LIMIT 1 RETURN doc", count=True).next()
elif _id:
arango_doc = self.base_arango.db.aql.execute(
f"FOR doc IN rss_feeds FILTER doc.id == '{_id}' LIMIT 1 RETURN doc", count=True).next()
if arango_doc:
self.feed = RSSFeed()
for attr in arango_doc:
setattr(self.feed, attr, arango_doc[attr])
fetched_time = datetime.fromisoformat(self.feed.fetched_timestamp)
if datetime.now() - fetched_time < timedelta(hours=1):
return self.feed.entries
else:
return self.update_feed()
def get_site_icon(self, url, html=None):
try:
if not html:
raw = requests.get(url).text
html = BeautifulSoup(raw, "html.parser")
icon_link = html.find("link", rel="icon")
if icon_link:
icon_url = icon_link.get("href", None)
if icon_url:
return urljoin(url, icon_url)
# Fallback to finding other common icon links
icon_link = html.find("link", rel="shortcut icon")
if icon_link:
icon_url = icon_link.get("href", None)
if icon_url:
return urljoin(url, icon_url)
return None
except Exception as e:
print(f"Error getting site icon: {e}")
return None
def get_rss_feeds(self):
return list(self.user_arango.db.collection("rss_feeds").all())
def download_icon(self, icon_url, save_folder="external_icons"):
try:
if not os.path.exists(save_folder):
os.makedirs(save_folder)
response = requests.get(icon_url, stream=True)
if response.status_code == 200:
icon_name = os.path.basename(icon_url)
icon_path = os.path.join(save_folder, icon_name)
with open(icon_path, "wb") as f:
for chunk in response.iter_content(1024):
f.write(chunk)
return icon_path
else:
print(f"Failed to download icon: {response.status_code}")
return None
except Exception as e:
print(f"Error downloading icon: {e}")
return None
def html_to_markdown(self, html):
soup = BeautifulSoup(html, "html.parser")
for br in soup.find_all("br"):
br.replace_with("\n")
for strong in soup.find_all("strong"):
strong.replace_with(f"**{strong.text}**")
for em in soup.find_all("em"):
em.replace_with(f"*{em.text}*")
for p in soup.find_all("p"):
p.replace_with(f"{p.text}\n\n")
return soup.get_text()
def get_full_content(self, url):
result = requests.get(url)
soup = BeautifulSoup(result.content, "html.parser")
class RSSAnalyzer(BaseClass):
def init(self, username):
super().__init__(username=username)
self.llm = LLM(system_message="You are reading RSS Feeds to analyze them.")
self.user_arango = self.get_arango_db(username)
self.rss_reader = RSSReader(username, self.base_arango, self.user_arango)