parent
01df43bba2
commit
00fd42b32d
19 changed files with 1474 additions and 249 deletions
@ -0,0 +1,21 @@ |
||||
# Chroma |
||||
CHROMA_CLIENT_AUTH_CREDENTIALS="overpass-alms-porker-file-seigneur-kiln" |
||||
CHROMA_SERVER_AUTHN_PROVIDER="chromadb.auth.basic_authn.BasicAuthenticationServerProvider" |
||||
CHROMA_AUTH_TOKEN_TRANSPORT_HEADER="X-Chroma-Token" |
||||
CHROMA_HOST="http://192.168.1.10:8007" |
||||
|
||||
CHROMA_CLIENT_AUTH_CREDENTIALS="overpass-alms-porker-file-seigneur-kiln" |
||||
CHROMA_SERVER_AUTHN_PROVIDER="chromadb.auth.basic_authn.BasicAuthenticationServerProvider" |
||||
CHROMA_AUTH_TOKEN_TRANSPORT_HEADER="X-Chroma-Token" |
||||
_CHROMA_HOST="https://lasseedfast.se/chroma_ev_cars/" |
||||
|
||||
# Arango |
||||
ARANGO_HOST="http://192.168.1.10:8531" |
||||
ARANGO_USER="admin" |
||||
ARANGO_PASSWORD="raHzaw-5vyjqo-xisfec" |
||||
ARANGO_DB="base" |
||||
ARANGO_PWD_ENV_MANAGER="jagskoterenv(Y)" |
||||
ARANGO_ROOT_USER='root' |
||||
ARANGO_ROOT_PASSWORD='gyhqed-kiwNac-9buhme' |
||||
|
||||
MAILERSEND_API_KEY="mlsn.71de3eb2dbcb733bd4ee509d1c95ccfc8939fd647cba9e3a0f631f60f900bd85" |
||||
@ -0,0 +1 @@ |
||||
from pdf_highlighter import Highlighter |
||||
@ -0,0 +1,260 @@ |
||||
# rss_reader.py |
||||
import feedparser |
||||
import requests |
||||
import urllib |
||||
from bs4 import BeautifulSoup |
||||
from urllib.parse import urljoin |
||||
from datetime import datetime, timedelta |
||||
from utils import fix_key |
||||
import os |
||||
from _base_class import BaseClass |
||||
from _llm import LLM |
||||
from colorprinter.print_color import * |
||||
|
||||
|
||||
class RSSFeed: |
||||
def __init__(self): |
||||
self.url = None |
||||
self.title = None |
||||
self.icon_path = None |
||||
self.description = None |
||||
self.feed_data = None |
||||
self.fetched_timestamp = None |
||||
self.entries = [] |
||||
|
||||
|
||||
class RSSReader(BaseClass): |
||||
def __init__(self, username): |
||||
super().__init__(username=username) |
||||
self.username = username |
||||
self.user_arango = self.get_arango(username) |
||||
self.feed: RSSFeed = None |
||||
self.arango_feed = None |
||||
|
||||
def discover_feeds(self, url): |
||||
try: |
||||
if not url.startswith("http"): |
||||
url = "https://" + url |
||||
|
||||
# Check if the input URL is already an RSS feed |
||||
f = feedparser.parse(url) |
||||
if len(f.entries) > 0: |
||||
return [ |
||||
{ |
||||
"href": url, |
||||
"title": f.feed.get("title", "No title"), |
||||
"icon": self.get_site_icon(url), |
||||
} |
||||
] |
||||
|
||||
# If not, proceed to discover feeds from the webpage |
||||
raw = requests.get(url).text |
||||
result = [] |
||||
possible_feeds = [] |
||||
html = BeautifulSoup(raw, "html.parser") |
||||
|
||||
# Find the site icon |
||||
icon_url = self.get_site_icon(url, html) |
||||
|
||||
# Find all <link> tags with rel="alternate" and type containing "rss" or "xml" |
||||
feed_urls = html.findAll("link", rel="alternate") |
||||
for f in feed_urls: |
||||
t = f.get("type", None) |
||||
if t and ("rss" in t or "xml" in t): |
||||
href = f.get("href", None) |
||||
if href: |
||||
possible_feeds.append(urljoin(url, href)) |
||||
|
||||
# Find all <a> tags with href containing "rss", "xml", or "feed" |
||||
parsed_url = urllib.parse.urlparse(url) |
||||
base = parsed_url.scheme + "://" + parsed_url.hostname |
||||
atags = html.findAll("a") |
||||
for a in atags: |
||||
href = a.get("href", None) |
||||
if href and ("rss" in href or "xml" in href or "feed" in href): |
||||
possible_feeds.append(urljoin(base, href)) |
||||
|
||||
# Validate the possible feeds using feedparser |
||||
for feed_url in list(set(possible_feeds)): |
||||
f = feedparser.parse(feed_url) |
||||
if len(f.entries) > 0: |
||||
result.append( |
||||
{ |
||||
"href": feed_url, |
||||
"title": f.feed.get("title", "No title"), |
||||
"icon": icon_url, |
||||
} |
||||
) |
||||
|
||||
return result |
||||
except Exception as e: |
||||
print(f"Error discovering feeds: {e}") |
||||
return [] |
||||
|
||||
def add_rss_feed(self, url): |
||||
|
||||
self.get_feed(url) |
||||
|
||||
self.load_feed_from_url(url=url) |
||||
self.feed._key = fix_key(self.feed.url) |
||||
|
||||
# Store feed data in base_arango's rss_feeds collection |
||||
self.base_arango.db.collection("rss_feeds").insert(self.feed.__dict__) |
||||
|
||||
# Store a reference to the feed in user_arango's user_feeds collection |
||||
self.user_arango.db.collection("user_feeds").insert( |
||||
{ |
||||
"_key": self.feed._key, # Use the same key to reference the feed |
||||
"feed_key": self.feed._key, |
||||
"subscribed_on": datetime.now().isoformat(), |
||||
# Add additional user-specific fields here |
||||
}, |
||||
overwrite=True, |
||||
) |
||||
|
||||
def load_feed_from_url(self, url=None, data=None): |
||||
if url: |
||||
self.feed = RSSFeed() |
||||
self.feed.url = url |
||||
full_feed_data = feedparser.parse(url) |
||||
elif data: |
||||
self.feed = RSSFeed() |
||||
self.feed.url = data.get("url", None) |
||||
full_feed_data = data |
||||
else: |
||||
full_feed_data = feedparser.parse(self.feed.url) |
||||
|
||||
self.feed.title = full_feed_data["feed"].get("title", "No title") |
||||
self.feed.description = full_feed_data["feed"].get( |
||||
"description", "No description" |
||||
) |
||||
self.feed.icon_path = self.get_site_icon(self.feed.url) |
||||
self.feed.entries = [] |
||||
|
||||
for entry in full_feed_data["entries"]: |
||||
self.feed.entries.append( |
||||
{ |
||||
"title": entry.get("title", "No title"), |
||||
"link": entry.get("link"), |
||||
"published": entry.get("published"), |
||||
"summary": self.html_to_markdown( |
||||
entry.get("summary", "No summary") |
||||
), |
||||
"id": entry.get("id"), |
||||
"author": entry.get("author"), |
||||
} |
||||
) |
||||
self.feed.fetched_timestamp = datetime.now().isoformat() |
||||
|
||||
def feed_data2feed(self, data): |
||||
self.load_feed_from_url(data=data) |
||||
|
||||
def parse_feed(self, url): |
||||
self.load_feed_from_url(url=url) |
||||
return self.feed |
||||
|
||||
def update_feed(self): |
||||
self.load_feed_from_url() |
||||
# Update the feed in the database |
||||
self.user_arango.db.collection("rss_feeds").update( |
||||
{ |
||||
"_key": self.feed._key, |
||||
"fetched_timestamp": self.feed.fetched_timestamp, |
||||
"entries": self.feed.entries, |
||||
} |
||||
) |
||||
return self.feed.entries |
||||
|
||||
def get_feed(self, feed_key=None, url=None, _id=None): |
||||
if feed_key: |
||||
arango_doc = self.base_arango.db.collection("rss_feeds").get(feed_key) |
||||
elif url: |
||||
arango_doc = self.base_arango.db.aql.execute( |
||||
f"FOR doc IN rss_feeds FILTER doc.url == '{url}' LIMIT 1 RETURN doc", count=True).next() |
||||
elif _id: |
||||
arango_doc = self.base_arango.db.aql.execute( |
||||
f"FOR doc IN rss_feeds FILTER doc.id == '{_id}' LIMIT 1 RETURN doc", count=True).next() |
||||
|
||||
if arango_doc: |
||||
self.feed = RSSFeed() |
||||
for attr in arango_doc: |
||||
setattr(self.feed, attr, arango_doc[attr]) |
||||
|
||||
fetched_time = datetime.fromisoformat(self.feed.fetched_timestamp) |
||||
|
||||
if datetime.now() - fetched_time < timedelta(hours=1): |
||||
return self.feed.entries |
||||
else: |
||||
return self.update_feed() |
||||
|
||||
def get_site_icon(self, url, html=None): |
||||
try: |
||||
if not html: |
||||
raw = requests.get(url).text |
||||
html = BeautifulSoup(raw, "html.parser") |
||||
|
||||
icon_link = html.find("link", rel="icon") |
||||
if icon_link: |
||||
icon_url = icon_link.get("href", None) |
||||
if icon_url: |
||||
return urljoin(url, icon_url) |
||||
|
||||
# Fallback to finding other common icon links |
||||
icon_link = html.find("link", rel="shortcut icon") |
||||
if icon_link: |
||||
icon_url = icon_link.get("href", None) |
||||
if icon_url: |
||||
return urljoin(url, icon_url) |
||||
|
||||
return None |
||||
except Exception as e: |
||||
print(f"Error getting site icon: {e}") |
||||
return None |
||||
|
||||
def get_rss_feeds(self): |
||||
return list(self.user_arango.db.collection("rss_feeds").all()) |
||||
|
||||
def download_icon(self, icon_url, save_folder="external_icons"): |
||||
try: |
||||
if not os.path.exists(save_folder): |
||||
os.makedirs(save_folder) |
||||
|
||||
response = requests.get(icon_url, stream=True) |
||||
if response.status_code == 200: |
||||
icon_name = os.path.basename(icon_url) |
||||
icon_path = os.path.join(save_folder, icon_name) |
||||
with open(icon_path, "wb") as f: |
||||
for chunk in response.iter_content(1024): |
||||
f.write(chunk) |
||||
return icon_path |
||||
else: |
||||
print(f"Failed to download icon: {response.status_code}") |
||||
return None |
||||
except Exception as e: |
||||
print(f"Error downloading icon: {e}") |
||||
return None |
||||
|
||||
def html_to_markdown(self, html): |
||||
soup = BeautifulSoup(html, "html.parser") |
||||
for br in soup.find_all("br"): |
||||
br.replace_with("\n") |
||||
for strong in soup.find_all("strong"): |
||||
strong.replace_with(f"**{strong.text}**") |
||||
for em in soup.find_all("em"): |
||||
em.replace_with(f"*{em.text}*") |
||||
for p in soup.find_all("p"): |
||||
p.replace_with(f"{p.text}\n\n") |
||||
return soup.get_text() |
||||
|
||||
def get_full_content(self, url): |
||||
|
||||
result = requests.get(url) |
||||
soup = BeautifulSoup(result.content, "html.parser") |
||||
|
||||
|
||||
class RSSAnalyzer(BaseClass): |
||||
def init(self, username): |
||||
super().__init__(username=username) |
||||
self.llm = LLM(system_message="You are reading RSS Feeds to analyze them.") |
||||
self.user_arango = self.get_arango_db(username) |
||||
self.rss_reader = RSSReader(username, self.base_arango, self.user_arango) |
||||
@ -0,0 +1,6 @@ |
||||
from _arango import ArangoDB |
||||
|
||||
|
||||
for db in ['lasse', 'nisse', 'torill', 'irma']: |
||||
arango = ArangoDB(db_name=db) |
||||
arango.db.create_collection('rss_feeds') |
||||
@ -0,0 +1,192 @@ |
||||
import yaml |
||||
import sys |
||||
import bcrypt |
||||
from _arango import ArangoDB |
||||
import os |
||||
import dotenv |
||||
import getpass |
||||
import argparse |
||||
import string |
||||
import secrets |
||||
from utils import fix_key |
||||
from colorprinter.print_color import * |
||||
|
||||
dotenv.load_dotenv() |
||||
|
||||
|
||||
def read_yaml(file_path): |
||||
with open(file_path, "r") as file: |
||||
return yaml.safe_load(file) |
||||
|
||||
|
||||
def write_yaml(file_path, data): |
||||
with open(file_path, "w") as file: |
||||
yaml.safe_dump(data, file) |
||||
|
||||
|
||||
def add_user(data, username, email, name, password): |
||||
# Check for existing username |
||||
if username in data["credentials"]["usernames"]: |
||||
print(f"Error: Username '{username}' already exists.") |
||||
sys.exit(1) |
||||
|
||||
# Check for existing email |
||||
for user in data["credentials"]["usernames"].values(): |
||||
if user["email"] == email: |
||||
print(f"Error: Email '{email}' already exists.") |
||||
sys.exit(1) |
||||
|
||||
# Hash the password using bcrypt |
||||
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode( |
||||
"utf-8" |
||||
) |
||||
|
||||
# Add the new user |
||||
data["credentials"]["usernames"][username] = { |
||||
"email": email, |
||||
"name": name, |
||||
"password": hashed_password, |
||||
} |
||||
|
||||
|
||||
def make_arango(username): |
||||
root_user = os.getenv("ARANGO_ROOT_USER") |
||||
root_password = os.getenv("ARANGO_ROOT_PASSWORD") |
||||
arango = ArangoDB(user=root_user, password=root_password, db_name="_system") |
||||
|
||||
if not arango.db.has_database(username): |
||||
arango.db.create_database( |
||||
username, |
||||
users=[ |
||||
{ |
||||
"username": os.getenv("ARANGO_USER"), |
||||
"password": os.getenv("ARANGO_PASSWORD"), |
||||
"active": True, |
||||
"extra": {}, |
||||
} |
||||
], |
||||
) |
||||
arango = ArangoDB(user=root_user, password=root_password, db_name=username) |
||||
for collection in [ |
||||
"projects", |
||||
"favorite_articles", |
||||
"article_collections", |
||||
"settings", |
||||
"chats", |
||||
"notes", |
||||
"other_documents", |
||||
"rss_feeds", |
||||
]: |
||||
if not arango.db.has_collection(collection): |
||||
arango.db.create_collection(collection) |
||||
user_arango = ArangoDB(db_name=username) |
||||
user_arango.db.collection("settings").insert( |
||||
{"current_page": "Bot Chat", "current_project": None} |
||||
) |
||||
|
||||
|
||||
def generate_random_password(length=16): |
||||
characters = string.ascii_letters + string.digits |
||||
password = "-".join( |
||||
"".join(secrets.choice(characters) for _ in range(6)) for _ in range(3) |
||||
) |
||||
return password |
||||
|
||||
def delete_user(data, username): |
||||
# Check if the user exists |
||||
if username not in data["credentials"]["usernames"]: |
||||
print(f"Error: Username '{username}' does not exist.") |
||||
sys.exit(1) |
||||
|
||||
# Remove the user from the YAML data |
||||
del data["credentials"]["usernames"][username] |
||||
|
||||
# Remove the user's database in ArangoDB |
||||
root_user = os.getenv("ARANGO_ROOT_USER") |
||||
root_password = os.getenv("ARANGO_ROOT_PASSWORD") |
||||
base_arango = ArangoDB(user=root_user, password=root_password, db_name="base") |
||||
# Remove the user's database in ArangoDB |
||||
root_user = os.getenv("ARANGO_ROOT_USER") |
||||
root_password = os.getenv("ARANGO_ROOT_PASSWORD") |
||||
arango = ArangoDB(user=root_user, password=root_password, db_name="_system") |
||||
if arango.db.has_database(username): |
||||
arango.db.delete_database(username) |
||||
|
||||
# Remove user access from documents in relevant collections |
||||
collections = ["sci_articles", "other_documents"] |
||||
for collection_name in collections: |
||||
documents = base_arango.db.aql.execute( |
||||
""" |
||||
FOR doc IN @@collection_name |
||||
FILTER @username IN doc.user_access |
||||
RETURN {'_id': doc._id, 'user_access': doc.user_access} |
||||
""", |
||||
bind_vars={"username": username, "@collection_name": collection_name}, |
||||
) |
||||
for document in documents: |
||||
if 'user_access' in document: |
||||
# Remove username from the list user_access |
||||
document['user_access'].remove(username) |
||||
base_arango.db.collection(collection_name).update(document) |
||||
|
||||
print_green(f"User {username} deleted successfully.") |
||||
|
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser(description="Add or delete a user.") |
||||
parser.add_argument("--user", help="Username") |
||||
parser.add_argument("--email", help="Email address") |
||||
parser.add_argument("--name", help="Full name") |
||||
parser.add_argument("--password", help="Password") |
||||
parser.add_argument("--delete", action="store_true", help="Delete user") |
||||
|
||||
args = parser.parse_args() |
||||
|
||||
yaml_file = "streamlit_users.yaml" |
||||
data = read_yaml(yaml_file) |
||||
|
||||
if args.delete: |
||||
if args.user: |
||||
username = args.user |
||||
delete_user(data, username) |
||||
write_yaml(yaml_file, data) |
||||
else: |
||||
print("Error: Username is required to delete a user.") |
||||
sys.exit(1) |
||||
else: |
||||
if args.user and args.email and args.name: |
||||
username = args.user |
||||
email = args.email |
||||
name = args.name |
||||
if args.password and len(args.password) >= 8: |
||||
password = args.password |
||||
else: |
||||
password = generate_random_password() |
||||
print_yellow("Generated password:", password) |
||||
else: |
||||
username = input("Enter username: ") |
||||
email = input("Enter email: ") |
||||
name = input("Enter name: ") |
||||
password = getpass.getpass("Enter password: ") |
||||
if not password or password == "": |
||||
password = generate_random_password() |
||||
print_yellow("Generated password:", password) |
||||
|
||||
if username == 'test': |
||||
delete_user(data, username) |
||||
|
||||
email = email.lower().strip() |
||||
checked_username = fix_key(username) |
||||
if checked_username != username: |
||||
username = checked_username |
||||
print_red(f"Username '{username}' contains invalid characters.") |
||||
print_yellow(f"Using '{checked_username}' instead.") |
||||
|
||||
add_user(data, username, email, name, password) |
||||
make_arango(username) |
||||
write_yaml(yaml_file, data) |
||||
print_green(f"User {username} added successfully.") |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
@ -1,97 +0,0 @@ |
||||
import yaml |
||||
import sys |
||||
import bcrypt |
||||
from _arango import ArangoDB |
||||
import os |
||||
import dotenv |
||||
import getpass |
||||
|
||||
dotenv.load_dotenv() |
||||
|
||||
|
||||
def read_yaml(file_path): |
||||
with open(file_path, "r") as file: |
||||
return yaml.safe_load(file) |
||||
|
||||
|
||||
def write_yaml(file_path, data): |
||||
with open(file_path, "w") as file: |
||||
yaml.safe_dump(data, file) |
||||
|
||||
|
||||
def add_user(data, username, email, name, password): |
||||
# Check for existing username |
||||
if username in data["credentials"]["usernames"]: |
||||
print(f"Error: Username '{username}' already exists.") |
||||
sys.exit(1) |
||||
|
||||
# Check for existing email |
||||
for user in data["credentials"]["usernames"].values(): |
||||
if user["email"] == email: |
||||
print(f"Error: Email '{email}' already exists.") |
||||
sys.exit(1) |
||||
|
||||
# Hash the password using bcrypt |
||||
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode( |
||||
"utf-8" |
||||
) |
||||
|
||||
# Add the new user |
||||
data["credentials"]["usernames"][username] = { |
||||
"email": email, |
||||
"name": name, |
||||
"password": hashed_password, |
||||
} |
||||
|
||||
|
||||
def make_arango(username): |
||||
root_user = os.getenv("ARANGO_ROOT_USER") |
||||
root_password = os.getenv("ARANGO_ROOT_PASSWORD") |
||||
arango = ArangoDB(user=root_user, password=root_password, db_name="_system") |
||||
|
||||
if not arango.db.has_database(username): |
||||
arango.db.create_database( |
||||
username, |
||||
users=[ |
||||
{ |
||||
"username": os.getenv("ARANGO_USER"), |
||||
"password": os.getenv("ARANGO_PASSWORD"), |
||||
"active": True, |
||||
"extra": {}, |
||||
} |
||||
] |
||||
) |
||||
arango = ArangoDB(user=root_user, password=root_password, db_name=username) |
||||
for collection in ["projects", "favorite_articles", "article_collections", "settings", 'chats', 'notes', 'other_documents']: |
||||
if not arango.db.has_collection(collection): |
||||
arango.db.create_collection(collection) |
||||
user_arango = ArangoDB(db_name=username) |
||||
user_arango.db.collection("settings").insert( |
||||
{"current_page": 'Bot Chat', "current_project": None} |
||||
) |
||||
|
||||
|
||||
def main(): |
||||
|
||||
yaml_file = "streamlit_users.yaml" |
||||
if len(sys.argv) == 5: |
||||
username = sys.argv[1] |
||||
email = sys.argv[2] |
||||
name = sys.argv[3] |
||||
password = sys.argv[4] |
||||
else: |
||||
username = input("Enter username: ") |
||||
email = input("Enter email: ") |
||||
name = input("Enter name: ") |
||||
password = getpass.getpass("Enter password: ") |
||||
|
||||
|
||||
data = read_yaml(yaml_file) |
||||
add_user(data, username, email, name, password) |
||||
make_arango(username) |
||||
write_yaml(yaml_file, data) |
||||
print(f"User {username} added successfully.") |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
@ -0,0 +1,345 @@ |
||||
import os |
||||
import urllib |
||||
import streamlit as st |
||||
from _base_class import BaseClass |
||||
import feedparser |
||||
import requests |
||||
from bs4 import BeautifulSoup |
||||
from urllib.parse import urljoin |
||||
from utils import fix_key |
||||
from colorprinter.print_color import * |
||||
from datetime import datetime, timedelta |
||||
|
||||
|
||||
class RSSFeedsPage(BaseClass): |
||||
def __init__(self, username: str): |
||||
super().__init__(username=username) |
||||
self.page_name = "RSS Feeds" |
||||
|
||||
# Initialize attributes from session state if available |
||||
for k, v in st.session_state.get(self.page_name, {}).items(): |
||||
setattr(self, k, v) |
||||
|
||||
def run(self): |
||||
if "selected_feed" not in st.session_state: |
||||
st.session_state["selected_feed"] = None |
||||
self.update_current_page(self.page_name) |
||||
self.display_feed() |
||||
|
||||
self.sidebar_actions() |
||||
|
||||
# Persist state to session_state |
||||
self.update_session_state(page_name=self.page_name) |
||||
|
||||
def select_rss_feeds(self): |
||||
# Fetch RSS feeds from the user's ArangoDB collection |
||||
rss_feeds = self.get_rss_feeds() |
||||
if rss_feeds: |
||||
feed_options = [feed["title"] for feed in rss_feeds] |
||||
with st.sidebar: |
||||
st.subheader("Show your feeds") |
||||
selected_feed_title = st.selectbox( |
||||
"Select a feed", options=feed_options, index=None |
||||
) |
||||
if selected_feed_title: |
||||
st.session_state["selected_feed"] = [ |
||||
feed["_key"] |
||||
for feed in rss_feeds |
||||
if feed["title"] == selected_feed_title |
||||
][0] |
||||
st.rerun() |
||||
|
||||
else: |
||||
st.write("You have no RSS feeds added.") |
||||
|
||||
def get_rss_feeds(self): |
||||
return list(self.user_arango.db.collection("rss_feeds").all()) |
||||
|
||||
def sidebar_actions(self): |
||||
with st.sidebar: |
||||
# Select a feed to show |
||||
self.select_rss_feeds() |
||||
st.subheader("Add a New RSS Feed") |
||||
rss_url = st.text_input("Website URL or RSS Feed URL") |
||||
if st.button("Discover Feeds"): |
||||
if rss_url: |
||||
with st.spinner("Discovering feeds..."): |
||||
feeds = self.discover_feeds(rss_url) |
||||
if feeds: |
||||
st.session_state["discovered_feeds"] = feeds |
||||
st.rerun() |
||||
else: |
||||
st.error("No RSS feeds found at the provided URL.") |
||||
if "discovered_feeds" in st.session_state: |
||||
st.subheader("Select a Feed to Add") |
||||
feeds = st.session_state["discovered_feeds"] |
||||
feed_options = [f"{feed['title']} ({feed['href']})" for feed in feeds] |
||||
selected_feed = st.selectbox("Available Feeds", options=feed_options) |
||||
selected_feed_url = feeds[feed_options.index(selected_feed)]["href"] |
||||
if st.button("Preview Feed"): |
||||
feed_data = feedparser.parse(selected_feed_url) |
||||
st.write(f"{feed_data.feed.get('title', 'No title')}") |
||||
description = html_to_markdown( |
||||
feed_data.feed.get("description", "No description") |
||||
) |
||||
st.write(f"_{description}_") |
||||
for entry in feed_data.entries[:5]: |
||||
print("ENTRY:") |
||||
with st.expander(entry.title): |
||||
summary = ( |
||||
entry.summary |
||||
if "summary" in entry |
||||
else "No summary available" |
||||
) |
||||
markdown_summary = html_to_markdown(summary) |
||||
st.markdown(markdown_summary) |
||||
if st.button( |
||||
"Add RSS Feed", |
||||
on_click=self.add_rss_feed, |
||||
args=(selected_feed_url, feed_data, description), |
||||
): |
||||
|
||||
del st.session_state["discovered_feeds"] |
||||
st.success("RSS Feed added.") |
||||
st.rerun() |
||||
|
||||
def discover_feeds(self, url): |
||||
try: |
||||
if not url.startswith("http"): |
||||
url = "https://" + url |
||||
|
||||
# Check if the input URL is already an RSS feed |
||||
f = feedparser.parse(url) |
||||
if len(f.entries) > 0: |
||||
return [ |
||||
{ |
||||
"href": url, |
||||
"title": f.feed.get("title", "No title"), |
||||
"icon": self.get_site_icon(url), |
||||
} |
||||
] |
||||
|
||||
# If not, proceed to discover feeds from the webpage |
||||
raw = requests.get(url).text |
||||
result = [] |
||||
possible_feeds = [] |
||||
html = BeautifulSoup(raw, "html.parser") |
||||
|
||||
# Find the site icon |
||||
icon_url = self.get_site_icon(url, html) |
||||
|
||||
# Find all <link> tags with rel="alternate" and type containing "rss" or "xml" |
||||
feed_urls = html.findAll("link", rel="alternate") |
||||
for f in feed_urls: |
||||
t = f.get("type", None) |
||||
if t and ("rss" in t or "xml" in t): |
||||
href = f.get("href", None) |
||||
if href: |
||||
possible_feeds.append(urljoin(url, href)) |
||||
|
||||
# Find all <a> tags with href containing "rss", "xml", or "feed" |
||||
parsed_url = urllib.parse.urlparse(url) |
||||
base = parsed_url.scheme + "://" + parsed_url.hostname |
||||
atags = html.findAll("a") |
||||
for a in atags: |
||||
href = a.get("href", None) |
||||
if href and ("rss" in href or "xml" in href or "feed" in href): |
||||
possible_feeds.append(urljoin(base, href)) |
||||
|
||||
# Validate the possible feeds using feedparser |
||||
for feed_url in list(set(possible_feeds)): |
||||
f = feedparser.parse(feed_url) |
||||
if len(f.entries) > 0: |
||||
result.append( |
||||
{ |
||||
"href": feed_url, |
||||
"title": f.feed.get("title", "No title"), |
||||
"icon": icon_url, |
||||
} |
||||
) |
||||
|
||||
return result |
||||
except Exception as e: |
||||
print(f"Error discovering feeds: {e}") |
||||
return [] |
||||
|
||||
|
||||
def add_rss_feed(self, url, feed_data, description): |
||||
try: |
||||
icon_url = feed_data["feed"]["image"]["href"] |
||||
except Exception as e: |
||||
icon_url = self.get_site_icon(url) |
||||
|
||||
title = feed_data["feed"].get("title", "No title") |
||||
print_blue(title) |
||||
icon_path = download_icon(icon_url) if icon_url else None |
||||
_key = fix_key(url) |
||||
now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string |
||||
|
||||
self.user_arango.db.collection("rss_feeds").insert( |
||||
{ |
||||
"_key": _key, |
||||
"url": url, |
||||
"title": title, |
||||
"icon_path": icon_path, |
||||
"description": description, |
||||
'fetched_timestamp': now_timestamp, # Add the timestamp field |
||||
'feed_data': feed_data, |
||||
}, |
||||
overwrite=True, |
||||
) |
||||
|
||||
feed = self.get_feed_from_arango(_key) |
||||
now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string |
||||
if feed: |
||||
self.update_feed(_key, feed) |
||||
else: |
||||
self.base_arango.db.collection("rss_feeds").insert( |
||||
{ |
||||
"_key": _key, |
||||
"url": url, |
||||
"title": title, |
||||
"icon_path": icon_path, |
||||
"description": description, |
||||
'fetched_timestamp': now_timestamp, # Add the timestamp field |
||||
"feed_data": feed_data, |
||||
}, |
||||
overwrite=True, |
||||
overwrite_mode="update", |
||||
) |
||||
def update_feed(self, feed_key, feed=None): |
||||
""" |
||||
Updates RSS feed that already exists in the ArangoDB base database. |
||||
|
||||
Args: |
||||
feed_key (str): The key identifying the feed in the database. |
||||
|
||||
Returns: |
||||
dict: The parsed feed data. |
||||
|
||||
Raises: |
||||
Exception: If there is an error updating the feed in the database. |
||||
""" |
||||
if not feed: |
||||
feed = self.get_feed_from_arango(feed_key) |
||||
|
||||
feed_data = feedparser.parse(feed["url"]) |
||||
print_rainbow(feed_data['feed']) |
||||
feed["feed_data"] = feed_data |
||||
if self.username not in feed.get("users", []): |
||||
feed["users"] = feed.get("users", []) + [self.username] |
||||
fetched_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string |
||||
|
||||
# Update the fetched_timestamp in the database |
||||
self.base_arango.db.collection("rss_feeds").update( |
||||
{ |
||||
"_key": feed["_key"], |
||||
"fetched_timestamp": fetched_timestamp, |
||||
"feed_data": feed_data, |
||||
} |
||||
) |
||||
return feed_data |
||||
|
||||
|
||||
def update_session_state(self, page_name=None): |
||||
# Update session state |
||||
if page_name: |
||||
st.session_state[page_name] = self.__dict__ |
||||
|
||||
def get_site_icon(self, url, html=None): |
||||
try: |
||||
if not html: |
||||
raw = requests.get(url).text |
||||
html = BeautifulSoup(raw, "html.parser") |
||||
|
||||
icon_link = html.find("link", rel="icon") |
||||
if icon_link: |
||||
icon_url = icon_link.get("href", None) |
||||
if icon_url: |
||||
return urljoin(url, icon_url) |
||||
|
||||
# Fallback to finding other common icon links |
||||
icon_link = html.find("link", rel="shortcut icon") |
||||
if icon_link: |
||||
icon_url = icon_link.get("href", None) |
||||
if icon_url: |
||||
return urljoin(url, icon_url) |
||||
|
||||
return None |
||||
except Exception as e: |
||||
print(f"Error getting site icon: {e}") |
||||
return None |
||||
|
||||
def get_feed_from_arango(self, feed_key): |
||||
""" |
||||
Retrieve an RSS feed from the ArangoDB base databse. |
||||
|
||||
Args: |
||||
feed_key (str): The key of the RSS feed to retrieve from the ArangoDB base database. |
||||
|
||||
Returns: |
||||
dict: The RSS feed document retrieved from the ArangoDB base database. |
||||
""" |
||||
return self.base_arango.db.collection("rss_feeds").get(feed_key) |
||||
|
||||
|
||||
def get_feed(self, feed_key): |
||||
feed = self.get_feed_from_arango(feed_key) |
||||
feed_data = feed["feed_data"] |
||||
fetched_time = datetime.fromisoformat(feed['fetched_timestamp']) # Parse the timestamp string |
||||
|
||||
if datetime.now() - fetched_time < timedelta(hours=1): |
||||
return feed_data |
||||
else: |
||||
return self.update_feed(feed_key) |
||||
|
||||
|
||||
def display_feed(self): |
||||
if st.session_state["selected_feed"]: |
||||
feed_data = self.get_feed(st.session_state["selected_feed"]) |
||||
|
||||
st.title(feed_data['feed'].get("title", "No title")) |
||||
st.write(feed_data['feed'].get("description", "No description")) |
||||
st.write("**Recent Entries:**") |
||||
for entry in feed_data['entries'][:5]: |
||||
with st.expander(entry['title']): |
||||
summary = ( |
||||
entry['summary'] if "summary" in entry else "No summary available" |
||||
) |
||||
markdown_summary = html_to_markdown(summary) |
||||
st.markdown(markdown_summary) |
||||
st.markdown(f"[Read more]({entry['link']})") |
||||
|
||||
|
||||
def html_to_markdown(html): |
||||
soup = BeautifulSoup(html, "html.parser") |
||||
for br in soup.find_all("br"): |
||||
br.replace_with("\n") |
||||
for strong in soup.find_all("strong"): |
||||
strong.replace_with(f"**{strong.text}**") |
||||
for em in soup.find_all("em"): |
||||
em.replace_with(f"*{em.text}*") |
||||
for p in soup.find_all("p"): |
||||
p.replace_with(f"{p.text}\n\n") |
||||
return soup.get_text() |
||||
|
||||
|
||||
def download_icon(icon_url, save_folder="external_icons"): |
||||
try: |
||||
if not os.path.exists(save_folder): |
||||
os.makedirs(save_folder) |
||||
|
||||
response = requests.get(icon_url, stream=True) |
||||
if response.status_code == 200: |
||||
icon_name = os.path.basename(icon_url) |
||||
icon_path = os.path.join(save_folder, icon_name) |
||||
with open(icon_path, "wb") as f: |
||||
for chunk in response.iter_content(1024): |
||||
f.write(chunk) |
||||
return icon_path |
||||
else: |
||||
print(f"Failed to download icon: {response.status_code}") |
||||
return None |
||||
except Exception as e: |
||||
print(f"Error downloading icon: {e}") |
||||
return None |
||||
@ -0,0 +1,91 @@ |
||||
import asyncio |
||||
import re |
||||
from pdf_highlighter import Highlighter |
||||
from _chromadb import ChromaDB |
||||
from _llm import LLM |
||||
import ollama |
||||
from colorprinter.print_color import * |
||||
from concurrent.futures import ThreadPoolExecutor |
||||
|
||||
# Wrap the synchronous generate method |
||||
async def async_generate(llm, prompt): |
||||
loop = asyncio.get_event_loop() |
||||
with ThreadPoolExecutor() as pool: |
||||
return await loop.run_in_executor(pool, llm.generate, prompt) |
||||
|
||||
|
||||
# Define the main asynchronous function to highlight the PDFs |
||||
async def highlight_pdf(data): |
||||
# Use the highlight method to highlight the relevant sentences in the PDFs |
||||
highlighted_pdf_buffer = await highlighter.highlight( |
||||
data=data, zero_indexed_pages=True # Pages are zero-based (e.g., 0, 1, 2, ...) |
||||
) |
||||
|
||||
# Save the highlighted PDF to a new file |
||||
with open("highlighted_combined_documents.pdf", "wb") as f: |
||||
f.write(highlighted_pdf_buffer.getbuffer()) |
||||
print_green("PDF highlighting completed successfully!") |
||||
|
||||
|
||||
# Initialize ChromaDB client |
||||
chromadb = ChromaDB() |
||||
|
||||
# Define the query to fetch relevant text snippets and metadata from ChromaDB |
||||
query = "How are climate researchers advocating for change in the society?" |
||||
|
||||
|
||||
# Perform the query on ChromaDB |
||||
result = chromadb.query(query, collection="sci_articles", n_results=5) |
||||
# Use zip to combine the lists into a list of dictionaries |
||||
results = [ |
||||
{"id": id_, "metadata": metadata, "document": document, "distance": distance} |
||||
for id_, metadata, document, distance in zip( |
||||
result["ids"][0], |
||||
result["metadatas"][0], |
||||
result["documents"][0], |
||||
result["distances"][0], |
||||
) |
||||
] |
||||
|
||||
for r in results: |
||||
print_rainbow(r["metadata"]) |
||||
print_yellow(type(r["metadata"]['pages'])) |
||||
# Ask a LLM a question about the text snippets |
||||
llm = LLM(model="small") |
||||
documents_string = "\n\n---\n\n".join(result["documents"][0]) |
||||
answer = llm.generate( |
||||
f'''{query} Write your answer from the information below?\n\n"""{documents_string}"""\n\n{query}''' |
||||
) |
||||
print_green(answer) |
||||
# Now you want to highlight relevant information in the PDFs to understand what the LLM is using! |
||||
|
||||
# Each result from ChromaDB contains the PDF filename and the pages where the text is found |
||||
data = [] |
||||
for result in results: |
||||
pages = result["metadata"].get("pages") |
||||
try: |
||||
pages = [int(pages)] |
||||
except: |
||||
# Use re to extraxt the page numbers separated by commas |
||||
pages = list(map(int, re.findall(r"\d+", pages))) |
||||
|
||||
data.append( |
||||
{ |
||||
"user_input": query, |
||||
"pdf_filename": result["metadata"]["_id"], |
||||
"pages": pages, |
||||
'chunk': result['document'] |
||||
} |
||||
) |
||||
|
||||
# Initialize the Highlighter |
||||
highlighter = Highlighter( |
||||
llm=llm, # Pass the LLM to the Highlighter |
||||
comment=False, # Enable comments to understand the context |
||||
use_llm=False |
||||
) |
||||
|
||||
|
||||
|
||||
# Run the main function using asyncio |
||||
asyncio.run(highlight_pdf(data)) |
||||
@ -0,0 +1,32 @@ |
||||
import os |
||||
import base64 |
||||
from ollama import Client |
||||
import env_manager |
||||
from colorprinter.print_color import * |
||||
env_manager.set_env() |
||||
|
||||
# Encode the credentials |
||||
credentials = f"{os.getenv('LLM_API_USER')}:{os.getenv('LLM_API_PWD_LASSE')}" |
||||
encoded_credentials = base64.b64encode(credentials.encode()).decode() |
||||
|
||||
# Set up the headers with authentication details |
||||
headers = { |
||||
'Authorization': f'Basic {encoded_credentials}' |
||||
} |
||||
|
||||
# Get the host URL (base URL only) |
||||
host_url = os.getenv("LLM_API_URL").rstrip('/api/chat/') |
||||
|
||||
|
||||
# Initialize the client with the host and headers |
||||
client = Client( |
||||
host=host_url, |
||||
headers=headers |
||||
) |
||||
|
||||
# Example usage of the client |
||||
try: |
||||
response = client.chat(model=os.getenv('LLM_MODEL') , messages=[{'role': 'user', 'content': 'Why is the sky blue?'}]) |
||||
print_rainbow(response) |
||||
except Exception as e: |
||||
print(f"Error: {e}") |
||||
Loading…
Reference in new issue