You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
260 lines
9.0 KiB
260 lines
9.0 KiB
# rss_reader.py |
|
import feedparser |
|
import requests |
|
import urllib |
|
from bs4 import BeautifulSoup |
|
from urllib.parse import urljoin |
|
from datetime import datetime, timedelta |
|
from utils import fix_key |
|
import os |
|
from _base_class import BaseClass |
|
from _llm import LLM |
|
from colorprinter.print_color import * |
|
|
|
|
|
class RSSFeed: |
|
def __init__(self): |
|
self.url = None |
|
self.title = None |
|
self.icon_path = None |
|
self.description = None |
|
self.feed_data = None |
|
self.fetched_timestamp = None |
|
self.entries = [] |
|
|
|
|
|
class RSSReader(BaseClass): |
|
def __init__(self, username): |
|
super().__init__(username=username) |
|
self.username = username |
|
self.user_arango = self.get_arango(username) |
|
self.feed: RSSFeed = None |
|
self.arango_feed = None |
|
|
|
def discover_feeds(self, url): |
|
try: |
|
if not url.startswith("http"): |
|
url = "https://" + url |
|
|
|
# Check if the input URL is already an RSS feed |
|
f = feedparser.parse(url) |
|
if len(f.entries) > 0: |
|
return [ |
|
{ |
|
"href": url, |
|
"title": f.feed.get("title", "No title"), |
|
"icon": self.get_site_icon(url), |
|
} |
|
] |
|
|
|
# If not, proceed to discover feeds from the webpage |
|
raw = requests.get(url).text |
|
result = [] |
|
possible_feeds = [] |
|
html = BeautifulSoup(raw, "html.parser") |
|
|
|
# Find the site icon |
|
icon_url = self.get_site_icon(url, html) |
|
|
|
# Find all <link> tags with rel="alternate" and type containing "rss" or "xml" |
|
feed_urls = html.findAll("link", rel="alternate") |
|
for f in feed_urls: |
|
t = f.get("type", None) |
|
if t and ("rss" in t or "xml" in t): |
|
href = f.get("href", None) |
|
if href: |
|
possible_feeds.append(urljoin(url, href)) |
|
|
|
# Find all <a> tags with href containing "rss", "xml", or "feed" |
|
parsed_url = urllib.parse.urlparse(url) |
|
base = parsed_url.scheme + "://" + parsed_url.hostname |
|
atags = html.findAll("a") |
|
for a in atags: |
|
href = a.get("href", None) |
|
if href and ("rss" in href or "xml" in href or "feed" in href): |
|
possible_feeds.append(urljoin(base, href)) |
|
|
|
# Validate the possible feeds using feedparser |
|
for feed_url in list(set(possible_feeds)): |
|
f = feedparser.parse(feed_url) |
|
if len(f.entries) > 0: |
|
result.append( |
|
{ |
|
"href": feed_url, |
|
"title": f.feed.get("title", "No title"), |
|
"icon": icon_url, |
|
} |
|
) |
|
|
|
return result |
|
except Exception as e: |
|
print(f"Error discovering feeds: {e}") |
|
return [] |
|
|
|
def add_rss_feed(self, url): |
|
|
|
self.get_feed(url) |
|
|
|
self.load_feed_from_url(url=url) |
|
self.feed._key = fix_key(self.feed.url) |
|
|
|
# Store feed data in base_arango's rss_feeds collection |
|
self.base_arango.db.collection("rss_feeds").insert(self.feed.__dict__) |
|
|
|
# Store a reference to the feed in user_arango's user_feeds collection |
|
self.user_arango.db.collection("user_feeds").insert( |
|
{ |
|
"_key": self.feed._key, # Use the same key to reference the feed |
|
"feed_key": self.feed._key, |
|
"subscribed_on": datetime.now().isoformat(), |
|
# Add additional user-specific fields here |
|
}, |
|
overwrite=True, |
|
) |
|
|
|
def load_feed_from_url(self, url=None, data=None): |
|
if url: |
|
self.feed = RSSFeed() |
|
self.feed.url = url |
|
full_feed_data = feedparser.parse(url) |
|
elif data: |
|
self.feed = RSSFeed() |
|
self.feed.url = data.get("url", None) |
|
full_feed_data = data |
|
else: |
|
full_feed_data = feedparser.parse(self.feed.url) |
|
|
|
self.feed.title = full_feed_data["feed"].get("title", "No title") |
|
self.feed.description = full_feed_data["feed"].get( |
|
"description", "No description" |
|
) |
|
self.feed.icon_path = self.get_site_icon(self.feed.url) |
|
self.feed.entries = [] |
|
|
|
for entry in full_feed_data["entries"]: |
|
self.feed.entries.append( |
|
{ |
|
"title": entry.get("title", "No title"), |
|
"link": entry.get("link"), |
|
"published": entry.get("published"), |
|
"summary": self.html_to_markdown( |
|
entry.get("summary", "No summary") |
|
), |
|
"id": entry.get("id"), |
|
"author": entry.get("author"), |
|
} |
|
) |
|
self.feed.fetched_timestamp = datetime.now().isoformat() |
|
|
|
def feed_data2feed(self, data): |
|
self.load_feed_from_url(data=data) |
|
|
|
def parse_feed(self, url): |
|
self.load_feed_from_url(url=url) |
|
return self.feed |
|
|
|
def update_feed(self): |
|
self.load_feed_from_url() |
|
# Update the feed in the database |
|
self.user_arango.db.collection("rss_feeds").update( |
|
{ |
|
"_key": self.feed._key, |
|
"fetched_timestamp": self.feed.fetched_timestamp, |
|
"entries": self.feed.entries, |
|
} |
|
) |
|
return self.feed.entries |
|
|
|
def get_feed(self, feed_key=None, url=None, _id=None): |
|
if feed_key: |
|
arango_doc = self.base_arango.db.collection("rss_feeds").get(feed_key) |
|
elif url: |
|
arango_doc = self.base_arango.db.aql.execute( |
|
f"FOR doc IN rss_feeds FILTER doc.url == '{url}' LIMIT 1 RETURN doc", count=True).next() |
|
elif _id: |
|
arango_doc = self.base_arango.db.aql.execute( |
|
f"FOR doc IN rss_feeds FILTER doc.id == '{_id}' LIMIT 1 RETURN doc", count=True).next() |
|
|
|
if arango_doc: |
|
self.feed = RSSFeed() |
|
for attr in arango_doc: |
|
setattr(self.feed, attr, arango_doc[attr]) |
|
|
|
fetched_time = datetime.fromisoformat(self.feed.fetched_timestamp) |
|
|
|
if datetime.now() - fetched_time < timedelta(hours=1): |
|
return self.feed.entries |
|
else: |
|
return self.update_feed() |
|
|
|
def get_site_icon(self, url, html=None): |
|
try: |
|
if not html: |
|
raw = requests.get(url).text |
|
html = BeautifulSoup(raw, "html.parser") |
|
|
|
icon_link = html.find("link", rel="icon") |
|
if icon_link: |
|
icon_url = icon_link.get("href", None) |
|
if icon_url: |
|
return urljoin(url, icon_url) |
|
|
|
# Fallback to finding other common icon links |
|
icon_link = html.find("link", rel="shortcut icon") |
|
if icon_link: |
|
icon_url = icon_link.get("href", None) |
|
if icon_url: |
|
return urljoin(url, icon_url) |
|
|
|
return None |
|
except Exception as e: |
|
print(f"Error getting site icon: {e}") |
|
return None |
|
|
|
def get_rss_feeds(self): |
|
return list(self.user_arango.db.collection("rss_feeds").all()) |
|
|
|
def download_icon(self, icon_url, save_folder="external_icons"): |
|
try: |
|
if not os.path.exists(save_folder): |
|
os.makedirs(save_folder) |
|
|
|
response = requests.get(icon_url, stream=True) |
|
if response.status_code == 200: |
|
icon_name = os.path.basename(icon_url) |
|
icon_path = os.path.join(save_folder, icon_name) |
|
with open(icon_path, "wb") as f: |
|
for chunk in response.iter_content(1024): |
|
f.write(chunk) |
|
return icon_path |
|
else: |
|
print(f"Failed to download icon: {response.status_code}") |
|
return None |
|
except Exception as e: |
|
print(f"Error downloading icon: {e}") |
|
return None |
|
|
|
def html_to_markdown(self, html): |
|
soup = BeautifulSoup(html, "html.parser") |
|
for br in soup.find_all("br"): |
|
br.replace_with("\n") |
|
for strong in soup.find_all("strong"): |
|
strong.replace_with(f"**{strong.text}**") |
|
for em in soup.find_all("em"): |
|
em.replace_with(f"*{em.text}*") |
|
for p in soup.find_all("p"): |
|
p.replace_with(f"{p.text}\n\n") |
|
return soup.get_text() |
|
|
|
def get_full_content(self, url): |
|
|
|
result = requests.get(url) |
|
soup = BeautifulSoup(result.content, "html.parser") |
|
|
|
|
|
class RSSAnalyzer(BaseClass): |
|
def init(self, username): |
|
super().__init__(username=username) |
|
self.llm = LLM(system_message="You are reading RSS Feeds to analyze them.") |
|
self.user_arango = self.get_arango_db(username) |
|
self.rss_reader = RSSReader(username, self.base_arango, self.user_arango)
|
|
|